001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.launch; 020 021import com.google.protobuf.GeneratedMessage; 022import com.google.protobuf.InvalidProtocolBufferException; 023import org.apache.reef.annotations.Provided; 024import org.apache.reef.annotations.audience.ClientSide; 025import org.apache.reef.annotations.audience.DriverSide; 026import org.apache.reef.annotations.audience.EvaluatorSide; 027import org.apache.reef.annotations.audience.Private; 028import org.apache.reef.proto.ClientRuntimeProtocol; 029import org.apache.reef.proto.EvaluatorRuntimeProtocol; 030import org.apache.reef.proto.REEFProtocol; 031import org.apache.reef.proto.ReefServiceProtos; 032import org.apache.reef.wake.remote.Codec; 033 034import javax.inject.Inject; 035 036/** 037 * Codec for REEF's control flow messages. 038 */ 039@Private 040@Provided 041@ClientSide 042@DriverSide 043@EvaluatorSide 044public final class REEFMessageCodec implements Codec<GeneratedMessage> { 045 046 047 @Inject 048 private REEFMessageCodec() { 049 } 050 051 @Override 052 public GeneratedMessage decode(final byte[] bytes) { 053 try { 054 final REEFProtocol.REEFMessage message = REEFProtocol.REEFMessage.parseFrom(bytes); 055 if (message.hasJobControl()) { 056 return message.getJobControl(); 057 } else if (message.hasRuntimeError()) { 058 return message.getRuntimeError(); 059 } else if (message.hasJobStatus()) { 060 return message.getJobStatus(); 061 } else if (message.hasEvaluatorControl()) { 062 return message.getEvaluatorControl(); 063 } else if (message.hasEvaluatorHeartBeat()) { 064 return message.getEvaluatorHeartBeat(); 065 } 066 throw new RuntimeException("Unable to decode a message: " + message.toString()); 067 } catch (final InvalidProtocolBufferException e) { 068 throw new RuntimeException("Unable to decode a message", e); 069 } 070 } 071 072 @Override 073 public byte[] encode(final GeneratedMessage msg) { 074 final REEFProtocol.REEFMessage.Builder message = REEFProtocol.REEFMessage.newBuilder(); 075 076 if (msg instanceof ClientRuntimeProtocol.JobControlProto) { 077 message.setJobControl((ClientRuntimeProtocol.JobControlProto) msg); 078 } else if (msg instanceof ReefServiceProtos.RuntimeErrorProto) { 079 message.setRuntimeError((ReefServiceProtos.RuntimeErrorProto) msg); 080 } else if (msg instanceof ReefServiceProtos.JobStatusProto) { 081 message.setJobStatus((ReefServiceProtos.JobStatusProto) msg); 082 } else if (msg instanceof EvaluatorRuntimeProtocol.EvaluatorControlProto) { 083 message.setEvaluatorControl((EvaluatorRuntimeProtocol.EvaluatorControlProto) msg); 084 } else if (msg instanceof EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto) { 085 message.setEvaluatorHeartBeat((EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto) msg); 086 } else { 087 throw new RuntimeException("Unable to serialize: " + msg); 088 } 089 090 return message.build().toByteArray(); 091 } 092}