001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.driver.context; 020 021import com.google.protobuf.ByteString; 022import org.apache.reef.annotations.audience.DriverSide; 023import org.apache.reef.annotations.audience.Private; 024import org.apache.reef.driver.context.ActiveContext; 025import org.apache.reef.driver.context.ClosedContext; 026import org.apache.reef.driver.context.FailedContext; 027import org.apache.reef.driver.evaluator.EvaluatorDescriptor; 028import org.apache.reef.proto.EvaluatorRuntimeProtocol; 029import org.apache.reef.proto.ReefServiceProtos; 030import org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher; 031import org.apache.reef.runtime.common.utils.ExceptionCodec; 032import org.apache.reef.tang.Configuration; 033import org.apache.reef.tang.formats.ConfigurationSerializer; 034import org.apache.reef.util.Optional; 035 036import java.util.logging.Level; 037import java.util.logging.Logger; 038 039/** 040 * Driver-side representation of a Context on an Evaluator. 041 */ 042@DriverSide 043@Private 044public final class EvaluatorContext implements ActiveContext { 045 046 private final static Logger LOG = Logger.getLogger(EvaluatorContext.class.getName()); 047 048 private final String contextIdentifier; 049 private final String evaluatorIdentifier; 050 private final EvaluatorDescriptor evaluatorDescriptor; 051 052 private final Optional<String> parentID; 053 private final ConfigurationSerializer configurationSerializer; 054 private final ContextControlHandler contextControlHandler; 055 private final ExceptionCodec exceptionCodec; 056 private final ContextRepresenters contextRepresenters; 057 058 private boolean isClosed = false; 059 060 public EvaluatorContext(final String contextIdentifier, 061 final String evaluatorIdentifier, 062 final EvaluatorDescriptor evaluatorDescriptor, 063 final Optional<String> parentID, 064 final ConfigurationSerializer configurationSerializer, 065 final ContextControlHandler contextControlHandler, 066 final EvaluatorMessageDispatcher messageDispatcher, 067 final ExceptionCodec exceptionCodec, 068 final ContextRepresenters contextRepresenters) { 069 070 this.contextIdentifier = contextIdentifier; 071 this.evaluatorIdentifier = evaluatorIdentifier; 072 this.evaluatorDescriptor = evaluatorDescriptor; 073 this.parentID = parentID; 074 this.configurationSerializer = configurationSerializer; 075 this.contextControlHandler = contextControlHandler; 076 this.exceptionCodec = exceptionCodec; 077 this.contextRepresenters = contextRepresenters; 078 079 LOG.log(Level.FINE, "Instantiated 'EvaluatorContext'"); 080 } 081 082 @Override 083 public synchronized void close() { 084 085 if (this.isClosed) { 086 throw new RuntimeException("Active context already closed"); 087 } 088 089 LOG.log(Level.FINEST, "Submit close context: RunningEvaluator id[{0}] for context id[{1}]", 090 new Object[]{getEvaluatorId(), getId()}); 091 092 final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto = 093 EvaluatorRuntimeProtocol.ContextControlProto.newBuilder() 094 .setRemoveContext( 095 EvaluatorRuntimeProtocol.RemoveContextProto.newBuilder() 096 .setContextId(getId()) 097 .build()) 098 .build(); 099 100 this.contextControlHandler.send(contextControlProto); 101 this.isClosed = true; 102 } 103 104 @Override 105 public synchronized void sendMessage(final byte[] message) { 106 107 if (this.isClosed) { 108 throw new RuntimeException("Active context already closed"); 109 } 110 111 LOG.log(Level.FINEST, "Send message: RunningEvaluator id[{0}] for context id[{1}]", 112 new Object[]{getEvaluatorId(), getId()}); 113 114 final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto = 115 EvaluatorRuntimeProtocol.ContextControlProto.newBuilder() 116 .setContextMessage(EvaluatorRuntimeProtocol.ContextMessageProto.newBuilder() 117 .setContextId(this.contextIdentifier) 118 .setMessage(ByteString.copyFrom(message)) 119 .build()) 120 .build(); 121 122 this.contextControlHandler.send(contextControlProto); 123 } 124 125 @Override 126 public synchronized void submitTask(final Configuration taskConf) { 127 128 if (this.isClosed) { 129 throw new RuntimeException("Active context already closed"); 130 } 131 132 LOG.log(Level.FINEST, "Submit task: RunningEvaluator id[{0}] for context id[{1}]", 133 new Object[]{getEvaluatorId(), getId()}); 134 135 final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto = 136 EvaluatorRuntimeProtocol.ContextControlProto.newBuilder() 137 .setStartTask( 138 EvaluatorRuntimeProtocol.StartTaskProto.newBuilder() 139 .setContextId(this.contextIdentifier) 140 .setConfiguration(this.configurationSerializer.toString(taskConf)) 141 .build()) 142 .build(); 143 144 this.contextControlHandler.send(contextControlProto); 145 } 146 147 @Override 148 public synchronized void submitContext(final Configuration contextConfiguration) { 149 150 if (this.isClosed) { 151 throw new RuntimeException("Active context already closed"); 152 } 153 154 LOG.log(Level.FINEST, "Submit new context: RunningEvaluator id[{0}] for context id[{1}]", 155 new Object[]{getEvaluatorId(), getId()}); 156 157 final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto = 158 EvaluatorRuntimeProtocol.ContextControlProto.newBuilder() 159 .setAddContext( 160 EvaluatorRuntimeProtocol.AddContextProto.newBuilder() 161 .setParentContextId(getId()) 162 .setContextConfiguration(this.configurationSerializer.toString(contextConfiguration)) 163 .build()) 164 .build(); 165 166 this.contextControlHandler.send(contextControlProto); 167 } 168 169 @Override 170 public synchronized void submitContextAndService( 171 final Configuration contextConfiguration, final Configuration serviceConfiguration) { 172 173 if (this.isClosed) { 174 throw new RuntimeException("Active context already closed"); 175 } 176 177 LOG.log(Level.FINEST, "Submit new context: RunningEvaluator id[{0}] for context id[{1}]", 178 new Object[]{getEvaluatorId(), getId()}); 179 180 final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto = 181 EvaluatorRuntimeProtocol.ContextControlProto.newBuilder() 182 .setAddContext( 183 EvaluatorRuntimeProtocol.AddContextProto.newBuilder() 184 .setParentContextId(getId()) 185 .setContextConfiguration(this.configurationSerializer.toString(contextConfiguration)) 186 .setServiceConfiguration(this.configurationSerializer.toString(serviceConfiguration)) 187 .build()) 188 .build(); 189 190 this.contextControlHandler.send(contextControlProto); 191 } 192 193 @Override 194 public String getEvaluatorId() { 195 return this.evaluatorIdentifier; 196 } 197 198 @Override 199 public Optional<String> getParentId() { 200 return this.parentID; 201 } 202 203 @Override 204 public EvaluatorDescriptor getEvaluatorDescriptor() { 205 return this.evaluatorDescriptor; 206 } 207 208 @Override 209 public String getId() { 210 return this.contextIdentifier; 211 } 212 213 @Override 214 public String toString() { 215 return "EvaluatorContext{" + 216 "contextIdentifier='" + this.contextIdentifier + '\'' + 217 ", evaluatorIdentifier='" + this.evaluatorIdentifier + '\'' + 218 ", parentID=" + this.parentID + '}'; 219 } 220 221 public synchronized final ClosedContext getClosedContext(final ActiveContext parentContext) { 222 return new ClosedContextImpl( 223 parentContext, this.getId(), this.getEvaluatorId(), this.getEvaluatorDescriptor()); 224 } 225 226 /** 227 * @return a FailedContext for the case of an EvaluatorFailure. 228 */ 229 public synchronized FailedContext getFailedContextForEvaluatorFailure() { 230 231 final String id = this.getId(); 232 final Optional<String> description = Optional.empty(); 233 final Optional<byte[]> data = Optional.empty(); 234 final Optional<Throwable> cause = Optional.empty(); 235 final String message = "Evaluator Failure"; 236 237 final Optional<ActiveContext> parentContext = getParentId().isPresent() ? 238 Optional.<ActiveContext>of(this.contextRepresenters.getContext(getParentId().get())) : 239 Optional.<ActiveContext>empty(); 240 241 final EvaluatorDescriptor evaluatorDescriptor = getEvaluatorDescriptor(); 242 final String evaluatorID = getEvaluatorId(); 243 244 return new FailedContextImpl( 245 id, message, description, cause, data, parentContext, evaluatorDescriptor, evaluatorID); 246 } 247 248 public synchronized FailedContext getFailedContext( 249 final ReefServiceProtos.ContextStatusProto contextStatusProto) { 250 251 assert (ReefServiceProtos.ContextStatusProto.State.FAIL == contextStatusProto.getContextState()); 252 253 final String id = this.getId(); 254 final Optional<String> description = Optional.empty(); 255 256 final Optional<byte[]> data = contextStatusProto.hasError() ? 257 Optional.of(contextStatusProto.getError().toByteArray()) : 258 Optional.<byte[]>empty(); 259 260 final Optional<Throwable> cause = data.isPresent() ? 261 this.exceptionCodec.fromBytes(data) : 262 Optional.<Throwable>empty(); 263 264 final String message = cause.isPresent() ? cause.get().getMessage() : "No message given"; 265 266 final Optional<ActiveContext> parentContext = getParentId().isPresent() ? 267 Optional.<ActiveContext>of(this.contextRepresenters.getContext(getParentId().get())) : 268 Optional.<ActiveContext>empty(); 269 270 final EvaluatorDescriptor evaluatorDescriptor = getEvaluatorDescriptor(); 271 final String evaluatorID = getEvaluatorId(); 272 273 return new FailedContextImpl( 274 id, message, description, cause, data, parentContext, evaluatorDescriptor, evaluatorID); 275 } 276 277 public synchronized boolean isRootContext() { 278 return !this.parentID.isPresent(); 279 } 280}