001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.data.loading.impl; 020 021import org.apache.avro.io.*; 022import org.apache.avro.specific.SpecificDatumReader; 023import org.apache.avro.specific.SpecificDatumWriter; 024import org.apache.reef.annotations.audience.Private; 025import org.apache.reef.driver.evaluator.EvaluatorRequest; 026import org.apache.reef.io.data.loading.avro.AvroEvaluatorRequest; 027import org.apache.reef.webserver.AvroHttpSerializer; 028 029import java.io.ByteArrayOutputStream; 030import java.io.IOException; 031import java.util.ArrayList; 032import java.util.List; 033 034/** 035 * Serialize and deserialize EvaluatorRequest objects using Avro. 036 */ 037@Private 038public final class AvroEvaluatorRequestSerializer { 039 private static AvroEvaluatorRequest toAvro(final EvaluatorRequest request) { 040 final List<CharSequence> nodeNames = new ArrayList<>(); 041 for (final String nodeName : request.getNodeNames()) { 042 nodeNames.add(nodeName); 043 } 044 045 final List<CharSequence> rackNames = new ArrayList<>(); 046 for (final String rackName : request.getRackNames()) { 047 rackNames.add(rackName); 048 } 049 050 return AvroEvaluatorRequest.newBuilder() 051 .setCores(request.getNumberOfCores()) 052 .setMegaBytes(request.getMegaBytes()) 053 .setNumber(request.getNumber()) 054 .setNodeNames(nodeNames) 055 .setRackNames(rackNames) 056 .build(); 057 } 058 059 private static EvaluatorRequest fromAvro(final AvroEvaluatorRequest avroRequest) { 060 final EvaluatorRequest.Builder builder = EvaluatorRequest.newBuilder() 061 .setNumberOfCores(avroRequest.getCores()) 062 .setMemory(avroRequest.getMegaBytes()) 063 .setNumber(avroRequest.getNumber()); 064 for (final CharSequence nodeName : avroRequest.getNodeNames()) { 065 builder.addNodeName(nodeName.toString()); 066 } 067 for (final CharSequence rackName : avroRequest.getRackNames()) { 068 builder.addRackName(rackName.toString()); 069 } 070 return builder.build(); 071 } 072 073 /** 074 * Serialize EvaluatorRequest. 075 */ 076 public static String toString(final EvaluatorRequest request) { 077 AvroEvaluatorRequest avroRequest = toAvro(request); 078 final DatumWriter<AvroEvaluatorRequest> datumWriter = new SpecificDatumWriter<>(AvroEvaluatorRequest.class); 079 try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) { 080 final JsonEncoder encoder = EncoderFactory.get().jsonEncoder(avroRequest.getSchema(), out); 081 datumWriter.write(avroRequest, encoder); 082 encoder.flush(); 083 out.close(); 084 return out.toString(AvroHttpSerializer.JSON_CHARSET); 085 } catch (final IOException ex) { 086 throw new RuntimeException("Unable to serialize compute request", ex); 087 } 088 } 089 090 /** 091 * Deserialize EvaluatorRequest. 092 */ 093 public static EvaluatorRequest fromString(final String serializedRequest) { 094 try { 095 final Decoder decoder = 096 DecoderFactory.get().jsonDecoder(AvroEvaluatorRequest.getClassSchema(), serializedRequest); 097 final SpecificDatumReader<AvroEvaluatorRequest> reader = new SpecificDatumReader<>(AvroEvaluatorRequest.class); 098 return fromAvro(reader.read(null, decoder)); 099 } catch (final IOException ex) { 100 throw new RuntimeException("Unable to deserialize compute request", ex); 101 } 102 } 103 104 /** 105 * Empty private constructor to prohibit instantiation of utility class. 106 */ 107 private AvroEvaluatorRequestSerializer() { 108 } 109}