001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.driver.context; 020 021import com.google.protobuf.ByteString; 022import org.apache.reef.annotations.audience.DriverSide; 023import org.apache.reef.annotations.audience.Private; 024import org.apache.reef.driver.context.ActiveContext; 025import org.apache.reef.driver.context.ClosedContext; 026import org.apache.reef.driver.context.FailedContext; 027import org.apache.reef.driver.evaluator.EvaluatorDescriptor; 028import org.apache.reef.proto.EvaluatorRuntimeProtocol; 029import org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher; 030import org.apache.reef.runtime.common.driver.evaluator.pojos.ContextState; 031import org.apache.reef.runtime.common.driver.evaluator.pojos.ContextStatusPOJO; 032import org.apache.reef.runtime.common.utils.ExceptionCodec; 033import org.apache.reef.tang.Configuration; 034import org.apache.reef.tang.formats.ConfigurationSerializer; 035import org.apache.reef.util.Optional; 036 037import java.util.logging.Level; 038import java.util.logging.Logger; 039 040/** 041 * Driver-side representation of a Context on an Evaluator. 042 */ 043@DriverSide 044@Private 045public final class EvaluatorContext implements ActiveContext { 046 047 private static final Logger LOG = Logger.getLogger(EvaluatorContext.class.getName()); 048 049 private final String contextIdentifier; 050 private final String evaluatorIdentifier; 051 private final EvaluatorDescriptor evaluatorDescriptor; 052 053 private final Optional<String> parentID; 054 private final ConfigurationSerializer configurationSerializer; 055 private final ContextControlHandler contextControlHandler; 056 private final ExceptionCodec exceptionCodec; 057 private final ContextRepresenters contextRepresenters; 058 059 private boolean isClosed = false; 060 061 public EvaluatorContext(final String contextIdentifier, 062 final String evaluatorIdentifier, 063 final EvaluatorDescriptor evaluatorDescriptor, 064 final Optional<String> parentID, 065 final ConfigurationSerializer configurationSerializer, 066 final ContextControlHandler contextControlHandler, 067 final EvaluatorMessageDispatcher messageDispatcher, 068 final ExceptionCodec exceptionCodec, 069 final ContextRepresenters contextRepresenters) { 070 071 this.contextIdentifier = contextIdentifier; 072 this.evaluatorIdentifier = evaluatorIdentifier; 073 this.evaluatorDescriptor = evaluatorDescriptor; 074 this.parentID = parentID; 075 this.configurationSerializer = configurationSerializer; 076 this.contextControlHandler = contextControlHandler; 077 this.exceptionCodec = exceptionCodec; 078 this.contextRepresenters = contextRepresenters; 079 080 LOG.log(Level.FINE, "Instantiated 'EvaluatorContext'"); 081 } 082 083 @Override 084 public synchronized void close() { 085 086 if (this.isClosed) { 087 throw new RuntimeException("Active context already closed"); 088 } 089 090 LOG.log(Level.FINEST, "Submit close context: RunningEvaluator id[{0}] for context id[{1}]", 091 new Object[]{getEvaluatorId(), getId()}); 092 093 final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto = 094 EvaluatorRuntimeProtocol.ContextControlProto.newBuilder() 095 .setRemoveContext( 096 EvaluatorRuntimeProtocol.RemoveContextProto.newBuilder() 097 .setContextId(getId()) 098 .build()) 099 .build(); 100 101 this.contextControlHandler.send(contextControlProto); 102 this.isClosed = true; 103 } 104 105 @Override 106 public synchronized void sendMessage(final byte[] message) { 107 108 if (this.isClosed) { 109 throw new RuntimeException("Active context already closed"); 110 } 111 112 LOG.log(Level.FINEST, "Send message: RunningEvaluator id[{0}] for context id[{1}]", 113 new Object[]{getEvaluatorId(), getId()}); 114 115 final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto = 116 EvaluatorRuntimeProtocol.ContextControlProto.newBuilder() 117 .setContextMessage(EvaluatorRuntimeProtocol.ContextMessageProto.newBuilder() 118 .setContextId(this.contextIdentifier) 119 .setMessage(ByteString.copyFrom(message)) 120 .build()) 121 .build(); 122 123 this.contextControlHandler.send(contextControlProto); 124 } 125 126 @Override 127 public synchronized void submitTask(final Configuration taskConf) { 128 submitTask(this.configurationSerializer.toString(taskConf)); 129 } 130 131 public synchronized void submitTask(final String taskConf) { 132 if (this.isClosed) { 133 throw new RuntimeException("Active context already closed"); 134 } 135 136 LOG.log(Level.FINEST, "Submit task: RunningEvaluator id[{0}] for context id[{1}]", 137 new Object[]{getEvaluatorId(), getId()}); 138 139 final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto = 140 EvaluatorRuntimeProtocol.ContextControlProto.newBuilder() 141 .setStartTask( 142 EvaluatorRuntimeProtocol.StartTaskProto.newBuilder() 143 .setContextId(this.contextIdentifier) 144 .setConfiguration(taskConf) 145 .build()) 146 .build(); 147 148 this.contextControlHandler.send(contextControlProto); 149 } 150 151 @Override 152 public synchronized void submitContext(final Configuration contextConfiguration) { 153 submitContext(this.configurationSerializer.toString(contextConfiguration)); 154 } 155 156 public synchronized void submitContext(final String contextConf) { 157 if (this.isClosed) { 158 throw new RuntimeException("Active context already closed"); 159 } 160 161 LOG.log(Level.FINEST, "Submit context: RunningEvaluator id[{0}] for context id[{1}]", 162 new Object[]{getEvaluatorId(), getId()}); 163 164 final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto = 165 EvaluatorRuntimeProtocol.ContextControlProto.newBuilder() 166 .setAddContext( 167 EvaluatorRuntimeProtocol.AddContextProto.newBuilder() 168 .setParentContextId(getId()) 169 .setContextConfiguration(contextConf) 170 .build()) 171 .build(); 172 173 this.contextControlHandler.send(contextControlProto); 174 } 175 176 @Override 177 public synchronized void submitContextAndService( 178 final Configuration contextConfiguration, final Configuration serviceConfiguration) { 179 180 if (this.isClosed) { 181 throw new RuntimeException("Active context already closed"); 182 } 183 184 LOG.log(Level.FINEST, "Submit new context: RunningEvaluator id[{0}] for context id[{1}]", 185 new Object[]{getEvaluatorId(), getId()}); 186 187 final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto = 188 EvaluatorRuntimeProtocol.ContextControlProto.newBuilder() 189 .setAddContext( 190 EvaluatorRuntimeProtocol.AddContextProto.newBuilder() 191 .setParentContextId(getId()) 192 .setContextConfiguration(this.configurationSerializer.toString(contextConfiguration)) 193 .setServiceConfiguration(this.configurationSerializer.toString(serviceConfiguration)) 194 .build()) 195 .build(); 196 197 this.contextControlHandler.send(contextControlProto); 198 } 199 200 @Override 201 public String getEvaluatorId() { 202 return this.evaluatorIdentifier; 203 } 204 205 @Override 206 public Optional<String> getParentId() { 207 return this.parentID; 208 } 209 210 @Override 211 public EvaluatorDescriptor getEvaluatorDescriptor() { 212 return this.evaluatorDescriptor; 213 } 214 215 @Override 216 public String getId() { 217 return this.contextIdentifier; 218 } 219 220 @Override 221 public String toString() { 222 return "EvaluatorContext{" + 223 "contextIdentifier='" + this.contextIdentifier + '\'' + 224 ", evaluatorIdentifier='" + this.evaluatorIdentifier + '\'' + 225 ", parentID=" + this.parentID + '}'; 226 } 227 228 public synchronized ClosedContext getClosedContext(final ActiveContext parentContext) { 229 return new ClosedContextImpl( 230 parentContext, this.getId(), this.getEvaluatorId(), this.getEvaluatorDescriptor()); 231 } 232 233 /** 234 * @return a FailedContext for the case of an EvaluatorFailure. 235 */ 236 public synchronized FailedContext getFailedContextForEvaluatorFailure() { 237 238 final String id = this.getId(); 239 final Optional<String> description = Optional.empty(); 240 final Optional<byte[]> data = Optional.empty(); 241 final Optional<Throwable> cause = Optional.empty(); 242 final String message = "Evaluator Failure"; 243 244 final Optional<ActiveContext> parentContext = getParentId().isPresent() ? 245 Optional.<ActiveContext>of(this.contextRepresenters.getContext(getParentId().get())) : 246 Optional.<ActiveContext>empty(); 247 248 final String evaluatorID = getEvaluatorId(); 249 250 return new FailedContextImpl( 251 id, message, description, cause, data, parentContext, this.evaluatorDescriptor, evaluatorID); 252 } 253 254 public synchronized FailedContext getFailedContext( 255 final ContextStatusPOJO contextStatus) { 256 257 assert ContextState.FAIL == contextStatus.getContextState(); 258 259 final String id = this.getId(); 260 final Optional<String> description = Optional.empty(); 261 262 final Optional<byte[]> data = contextStatus.hasError() ? 263 Optional.of(contextStatus.getError()) : 264 Optional.<byte[]>empty(); 265 266 final Optional<Throwable> cause = data.isPresent() ? 267 this.exceptionCodec.fromBytes(data) : 268 Optional.<Throwable>empty(); 269 270 final String message = cause.isPresent() ? cause.get().getMessage() : "No message given"; 271 272 final Optional<ActiveContext> parentContext = getParentId().isPresent() ? 273 Optional.<ActiveContext>of(this.contextRepresenters.getContext(getParentId().get())) : 274 Optional.<ActiveContext>empty(); 275 276 final String evaluatorID = getEvaluatorId(); 277 278 return new FailedContextImpl( 279 id, message, description, cause, data, parentContext, this.evaluatorDescriptor, evaluatorID); 280 } 281 282 public synchronized boolean isRootContext() { 283 return !this.parentID.isPresent(); 284 } 285}