001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.data.loading.impl;
020
021import org.apache.avro.io.*;
022import org.apache.avro.specific.SpecificDatumReader;
023import org.apache.avro.specific.SpecificDatumWriter;
024import org.apache.reef.annotations.audience.Private;
025import org.apache.reef.driver.evaluator.EvaluatorRequest;
026import org.apache.reef.io.data.loading.avro.AvroEvaluatorRequest;
027import org.apache.reef.webserver.AvroHttpSerializer;
028
029import java.io.ByteArrayOutputStream;
030import java.io.IOException;
031import java.util.ArrayList;
032import java.util.List;
033
034/**
035 * Serialize and deserialize EvaluatorRequest objects using Avro.
036 */
037@Private
038public final class AvroEvaluatorRequestSerializer {
039  private static AvroEvaluatorRequest toAvro(final EvaluatorRequest request) {
040    final List<CharSequence> nodeNames = new ArrayList<>();
041    for (final String nodeName : request.getNodeNames()) {
042      nodeNames.add(nodeName);
043    }
044
045    final List<CharSequence> rackNames = new ArrayList<>();
046    for (final String rackName : request.getRackNames()) {
047      rackNames.add(rackName);
048    }
049
050    return AvroEvaluatorRequest.newBuilder()
051        .setCores(request.getNumberOfCores())
052        .setMegaBytes(request.getMegaBytes())
053        .setNumber(request.getNumber())
054        .setNodeNames(nodeNames)
055        .setRackNames(rackNames)
056        .build();
057  }
058
059  private static EvaluatorRequest fromAvro(final AvroEvaluatorRequest avroRequest) {
060    final EvaluatorRequest.Builder builder = EvaluatorRequest.newBuilder()
061        .setNumberOfCores(avroRequest.getCores())
062        .setMemory(avroRequest.getMegaBytes())
063        .setNumber(avroRequest.getNumber());
064    for (final CharSequence nodeName : avroRequest.getNodeNames()) {
065      builder.addNodeName(nodeName.toString());
066    }
067    for (final CharSequence rackName : avroRequest.getRackNames()) {
068      builder.addRackName(rackName.toString());
069    }
070    return builder.build();
071  }
072
073  /**
074   * Serialize EvaluatorRequest.
075   */
076  public static String toString(final EvaluatorRequest request) {
077    AvroEvaluatorRequest avroRequest = toAvro(request);
078    final DatumWriter<AvroEvaluatorRequest> datumWriter = new SpecificDatumWriter<>(AvroEvaluatorRequest.class);
079    try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) {
080      final JsonEncoder encoder = EncoderFactory.get().jsonEncoder(avroRequest.getSchema(), out);
081      datumWriter.write(avroRequest, encoder);
082      encoder.flush();
083      out.close();
084      return out.toString(AvroHttpSerializer.JSON_CHARSET);
085    } catch (final IOException ex) {
086      throw new RuntimeException("Unable to serialize compute request", ex);
087    }
088  }
089
090  /**
091   * Deserialize EvaluatorRequest.
092   */
093  public static EvaluatorRequest fromString(final String serializedRequest) {
094    try {
095      final Decoder decoder =
096          DecoderFactory.get().jsonDecoder(AvroEvaluatorRequest.getClassSchema(), serializedRequest);
097      final SpecificDatumReader<AvroEvaluatorRequest> reader = new SpecificDatumReader<>(AvroEvaluatorRequest.class);
098      return fromAvro(reader.read(null, decoder));
099    } catch (final IOException ex) {
100      throw new RuntimeException("Unable to deserialize compute request", ex);
101    }
102  }
103
104  /**
105   * Empty private constructor to prohibit instantiation of utility class.
106   */
107  private AvroEvaluatorRequestSerializer() {
108  }
109}