001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.driver.context; 020 021import com.google.protobuf.ByteString; 022import org.apache.reef.annotations.audience.DriverSide; 023import org.apache.reef.annotations.audience.Private; 024import org.apache.reef.driver.context.ActiveContext; 025import org.apache.reef.driver.context.ClosedContext; 026import org.apache.reef.driver.context.FailedContext; 027import org.apache.reef.driver.evaluator.EvaluatorDescriptor; 028import org.apache.reef.proto.EvaluatorRuntimeProtocol; 029import org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher; 030import org.apache.reef.runtime.common.driver.evaluator.pojos.ContextState; 031import org.apache.reef.runtime.common.driver.evaluator.pojos.ContextStatusPOJO; 032import org.apache.reef.runtime.common.utils.ExceptionCodec; 033import org.apache.reef.tang.Configuration; 034import org.apache.reef.tang.formats.ConfigurationSerializer; 035import org.apache.reef.util.Optional; 036 037import java.util.logging.Level; 038import java.util.logging.Logger; 039 040/** 041 * Driver-side representation of a Context on an Evaluator. 042 */ 043@DriverSide 044@Private 045public final class EvaluatorContext implements ActiveContext { 046 047 private static final Logger LOG = Logger.getLogger(EvaluatorContext.class.getName()); 048 049 private final String contextIdentifier; 050 private final String evaluatorIdentifier; 051 private final EvaluatorDescriptor evaluatorDescriptor; 052 053 private final Optional<String> parentID; 054 private final ConfigurationSerializer configurationSerializer; 055 private final ContextControlHandler contextControlHandler; 056 private final ExceptionCodec exceptionCodec; 057 private final ContextRepresenters contextRepresenters; 058 059 private boolean isClosed = false; 060 061 public EvaluatorContext(final String contextIdentifier, 062 final String evaluatorIdentifier, 063 final EvaluatorDescriptor evaluatorDescriptor, 064 final Optional<String> parentID, 065 final ConfigurationSerializer configurationSerializer, 066 final ContextControlHandler contextControlHandler, 067 final EvaluatorMessageDispatcher messageDispatcher, 068 final ExceptionCodec exceptionCodec, 069 final ContextRepresenters contextRepresenters) { 070 071 this.contextIdentifier = contextIdentifier; 072 this.evaluatorIdentifier = evaluatorIdentifier; 073 this.evaluatorDescriptor = evaluatorDescriptor; 074 this.parentID = parentID; 075 this.configurationSerializer = configurationSerializer; 076 this.contextControlHandler = contextControlHandler; 077 this.exceptionCodec = exceptionCodec; 078 this.contextRepresenters = contextRepresenters; 079 080 LOG.log(Level.FINE, "Instantiated 'EvaluatorContext'"); 081 } 082 083 @Override 084 public synchronized void close() { 085 086 if (this.isClosed) { 087 throw new RuntimeException("Active context already closed"); 088 } 089 090 LOG.log(Level.FINEST, "Submit close context: RunningEvaluator id[{0}] for context id[{1}]", 091 new Object[]{getEvaluatorId(), getId()}); 092 093 final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto = 094 EvaluatorRuntimeProtocol.ContextControlProto.newBuilder() 095 .setRemoveContext( 096 EvaluatorRuntimeProtocol.RemoveContextProto.newBuilder() 097 .setContextId(getId()) 098 .build()) 099 .build(); 100 101 this.contextControlHandler.send(contextControlProto); 102 this.isClosed = true; 103 } 104 105 @Override 106 public synchronized void sendMessage(final byte[] message) { 107 108 if (this.isClosed) { 109 throw new RuntimeException("Active context already closed"); 110 } 111 112 LOG.log(Level.FINEST, "Send message: RunningEvaluator id[{0}] for context id[{1}]", 113 new Object[]{getEvaluatorId(), getId()}); 114 115 final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto = 116 EvaluatorRuntimeProtocol.ContextControlProto.newBuilder() 117 .setContextMessage(EvaluatorRuntimeProtocol.ContextMessageProto.newBuilder() 118 .setContextId(this.contextIdentifier) 119 .setMessage(ByteString.copyFrom(message)) 120 .build()) 121 .build(); 122 123 this.contextControlHandler.send(contextControlProto); 124 } 125 126 @Override 127 public synchronized void submitTask(final Configuration taskConf) { 128 submitTask(this.configurationSerializer.toString(taskConf)); 129 } 130 131 public synchronized void submitTask(final String taskConf) { 132 if (this.isClosed) { 133 throw new RuntimeException("Active context already closed"); 134 } 135 136 LOG.log(Level.FINEST, "Submit task: RunningEvaluator id[{0}] for context id[{1}]", 137 new Object[]{getEvaluatorId(), getId()}); 138 139 final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto = 140 EvaluatorRuntimeProtocol.ContextControlProto.newBuilder() 141 .setStartTask( 142 EvaluatorRuntimeProtocol.StartTaskProto.newBuilder() 143 .setContextId(this.contextIdentifier) 144 .setConfiguration(taskConf) 145 .build()) 146 .build(); 147 148 this.contextControlHandler.send(contextControlProto); 149 } 150 151 @Override 152 public synchronized void submitContext(final Configuration contextConfiguration) { 153 submitContext(this.configurationSerializer.toString(contextConfiguration)); 154 } 155 156 public synchronized void submitContext(final String contextConf) { 157 submitContextAndService(contextConf, Optional.<String>empty()); 158 } 159 160 @Override 161 public synchronized void submitContextAndService( 162 final Configuration contextConfiguration, final Configuration serviceConfiguration) { 163 submitContextAndService( 164 this.configurationSerializer.toString(contextConfiguration), 165 this.configurationSerializer.toString(serviceConfiguration)); 166 } 167 168 public synchronized void submitContextAndService(final String contextConf, final String serviceConf) { 169 submitContextAndService(contextConf, Optional.ofNullable(serviceConf)); 170 } 171 172 public synchronized void submitContextAndService(final String contextConf, final Optional<String> serviceConf) { 173 if (this.isClosed) { 174 throw new RuntimeException("Active context already closed"); 175 } 176 177 EvaluatorRuntimeProtocol.AddContextProto.Builder contextBuilder = 178 EvaluatorRuntimeProtocol.AddContextProto.newBuilder() 179 .setParentContextId(getId()).setContextConfiguration(contextConf); 180 181 if (serviceConf.isPresent()) { 182 contextBuilder = contextBuilder.setServiceConfiguration(serviceConf.get()); 183 } 184 185 final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto = 186 EvaluatorRuntimeProtocol.ContextControlProto.newBuilder() 187 .setAddContext(contextBuilder.build()) 188 .build(); 189 190 this.contextControlHandler.send(contextControlProto); 191 } 192 193 @Override 194 public String getEvaluatorId() { 195 return this.evaluatorIdentifier; 196 } 197 198 @Override 199 public Optional<String> getParentId() { 200 return this.parentID; 201 } 202 203 @Override 204 public EvaluatorDescriptor getEvaluatorDescriptor() { 205 return this.evaluatorDescriptor; 206 } 207 208 @Override 209 public String getId() { 210 return this.contextIdentifier; 211 } 212 213 @Override 214 public String toString() { 215 return "EvaluatorContext{" + 216 "contextIdentifier='" + this.contextIdentifier + '\'' + 217 ", evaluatorIdentifier='" + this.evaluatorIdentifier + '\'' + 218 ", parentID=" + this.parentID + '}'; 219 } 220 221 public synchronized ClosedContext getClosedContext(final ActiveContext parentContext) { 222 return new ClosedContextImpl( 223 parentContext, this.getId(), this.getEvaluatorId(), this.getEvaluatorDescriptor()); 224 } 225 226 /** 227 * @return a FailedContext for the case of an EvaluatorFailure. 228 */ 229 public synchronized FailedContext getFailedContextForEvaluatorFailure() { 230 231 final String id = this.getId(); 232 final Optional<String> description = Optional.empty(); 233 final Optional<byte[]> data = Optional.empty(); 234 final Optional<Throwable> cause = Optional.empty(); 235 final String message = "Evaluator Failure"; 236 237 final Optional<ActiveContext> parentContext = getParentId().isPresent() ? 238 Optional.<ActiveContext>of(this.contextRepresenters.getContext(getParentId().get())) : 239 Optional.<ActiveContext>empty(); 240 241 final String evaluatorID = getEvaluatorId(); 242 243 return new FailedContextImpl( 244 id, message, description, cause, data, parentContext, this.evaluatorDescriptor, evaluatorID); 245 } 246 247 public synchronized FailedContext getFailedContext( 248 final ContextStatusPOJO contextStatus) { 249 250 assert ContextState.FAIL == contextStatus.getContextState(); 251 252 final String id = this.getId(); 253 final Optional<String> description = Optional.empty(); 254 255 final Optional<byte[]> data = contextStatus.hasError() ? 256 Optional.of(contextStatus.getError()) : 257 Optional.<byte[]>empty(); 258 259 final Optional<Throwable> cause = data.isPresent() ? 260 this.exceptionCodec.fromBytes(data) : 261 Optional.<Throwable>empty(); 262 263 final String message = cause.isPresent() ? cause.get().getMessage() : "No message given"; 264 265 final Optional<ActiveContext> parentContext = getParentId().isPresent() ? 266 Optional.<ActiveContext>of(this.contextRepresenters.getContext(getParentId().get())) : 267 Optional.<ActiveContext>empty(); 268 269 final String evaluatorID = getEvaluatorId(); 270 271 return new FailedContextImpl( 272 id, message, description, cause, data, parentContext, this.evaluatorDescriptor, evaluatorID); 273 } 274 275 public synchronized boolean isRootContext() { 276 return !this.parentID.isPresent(); 277 } 278}