001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.evaluator.context; 020 021import com.google.protobuf.ByteString; 022import org.apache.reef.annotations.Provided; 023import org.apache.reef.annotations.audience.EvaluatorSide; 024import org.apache.reef.annotations.audience.Private; 025import org.apache.reef.proto.EvaluatorRuntimeProtocol; 026import org.apache.reef.proto.ReefServiceProtos; 027import org.apache.reef.runtime.common.evaluator.HeartBeatManager; 028import org.apache.reef.runtime.common.evaluator.task.TaskClientCodeException; 029import org.apache.reef.runtime.common.utils.ExceptionCodec; 030import org.apache.reef.tang.Configuration; 031import org.apache.reef.tang.InjectionFuture; 032import org.apache.reef.tang.exceptions.BindException; 033import org.apache.reef.tang.formats.ConfigurationSerializer; 034import org.apache.reef.util.Optional; 035 036import javax.inject.Inject; 037import java.io.IOException; 038import java.util.ArrayList; 039import java.util.Collection; 040import java.util.List; 041import java.util.Stack; 042import java.util.logging.Level; 043import java.util.logging.Logger; 044 045/** 046 * Manages the stack of context in the Evaluator. 047 */ 048@Private 049@EvaluatorSide 050@Provided 051public final class ContextManager implements AutoCloseable { 052 053 private static final Logger LOG = Logger.getLogger(ContextManager.class.getName()); 054 055 /** 056 * The stack of context. 057 */ 058 private final Stack<ContextRuntime> contextStack = new Stack<>(); 059 060 /** 061 * Used to instantiate the root context. 062 */ 063 private final InjectionFuture<RootContextLauncher> launchContext; 064 065 /** 066 * Used for status reporting to the Driver. 067 */ 068 private final HeartBeatManager heartBeatManager; 069 070 /** 071 * To serialize Configurations. 072 */ 073 private final ConfigurationSerializer configurationSerializer; 074 075 private final ExceptionCodec exceptionCodec; 076 077 /** 078 * @param launchContext to instantiate the root context. 079 * @param heartBeatManager for status reporting to the Driver. 080 * @param configurationSerializer 081 * @param exceptionCodec 082 */ 083 @Inject 084 ContextManager(final InjectionFuture<RootContextLauncher> launchContext, 085 final HeartBeatManager heartBeatManager, 086 final ConfigurationSerializer configurationSerializer, 087 final ExceptionCodec exceptionCodec) { 088 this.launchContext = launchContext; 089 this.heartBeatManager = heartBeatManager; 090 this.configurationSerializer = configurationSerializer; 091 this.exceptionCodec = exceptionCodec; 092 } 093 094 /** 095 * Start the context manager. This initiates the root context. 096 * 097 * @throws ContextClientCodeException if the root context can't be instantiated. 098 */ 099 public void start() throws ContextClientCodeException { 100 synchronized (this.contextStack) { 101 LOG.log(Level.FINEST, "Instantiating root context."); 102 this.contextStack.push(this.launchContext.get().getRootContext()); 103 104 if (this.launchContext.get().getInitialTaskConfiguration().isPresent()) { 105 LOG.log(Level.FINEST, "Launching the initial Task"); 106 try { 107 this.contextStack.peek().startTask(this.launchContext.get().getInitialTaskConfiguration().get()); 108 } catch (final TaskClientCodeException e) { 109 this.handleTaskException(e); 110 } 111 } 112 } 113 } 114 115 /** 116 * Shuts down. This forecefully kills the Task if there is one and then shuts down all Contexts on the stack, 117 * starting at the top. 118 */ 119 @Override 120 public void close() { 121 synchronized (this.contextStack) { 122 if (!this.contextStackIsEmpty()) { 123 this.contextStack.lastElement().close(); 124 } 125 } 126 } 127 128 /** 129 * @return true if there is no context. 130 */ 131 public boolean contextStackIsEmpty() { 132 synchronized (this.contextStack) { 133 return this.contextStack.isEmpty(); 134 } 135 } 136 137 /** 138 * Processes the given ContextControlProto to launch / close / suspend Tasks and Contexts. 139 * <p> 140 * This also triggers the HeartBeatManager to send a heartbeat with the result of this operation. 141 * 142 * @param controlMessage the message to process 143 */ 144 public void handleContextControlProtocol( 145 final EvaluatorRuntimeProtocol.ContextControlProto controlMessage) { 146 147 synchronized (this.heartBeatManager) { 148 try { 149 if (controlMessage.hasAddContext() && controlMessage.hasRemoveContext()) { 150 throw new IllegalArgumentException( 151 "Received a message with both add and remove context. This is unsupported."); 152 } 153 154 final byte[] message = controlMessage.hasTaskMessage() ? 155 controlMessage.getTaskMessage().toByteArray() : null; 156 157 if (controlMessage.hasAddContext()) { 158 this.addContext(controlMessage.getAddContext()); 159 if (controlMessage.hasStartTask()) { 160 // We support submitContextAndTask() 161 this.startTask(controlMessage.getStartTask()); 162 } else { 163 // We need to trigger a heartbeat here. 164 // In other cases, the heartbeat will be triggered by the TaskRuntime 165 // Therefore this call can not go into addContext. 166 this.heartBeatManager.sendHeartbeat(); 167 } 168 } else if (controlMessage.hasRemoveContext()) { 169 this.removeContext(controlMessage.getRemoveContext().getContextId()); 170 } else if (controlMessage.hasStartTask()) { 171 this.startTask(controlMessage.getStartTask()); 172 } else if (controlMessage.hasStopTask()) { 173 this.contextStack.peek().closeTask(message); 174 } else if (controlMessage.hasSuspendTask()) { 175 this.contextStack.peek().suspendTask(message); 176 } else if (controlMessage.hasTaskMessage()) { 177 this.contextStack.peek().deliverTaskMessage(message); 178 } else if (controlMessage.hasContextMessage()) { 179 final EvaluatorRuntimeProtocol.ContextMessageProto contextMessageProto = controlMessage.getContextMessage(); 180 boolean deliveredMessage = false; 181 for (final ContextRuntime context : this.contextStack) { 182 if (context.getIdentifier().equals(contextMessageProto.getContextId())) { 183 context.handleContextMessage(contextMessageProto.getMessage().toByteArray()); 184 deliveredMessage = true; 185 break; 186 } 187 } 188 if (!deliveredMessage) { 189 throw new IllegalStateException( 190 "Sent message to unknown context " + contextMessageProto.getContextId()); 191 } 192 } else { 193 throw new RuntimeException("Unknown task control message: " + controlMessage); 194 } 195 } catch (final TaskClientCodeException e) { 196 this.handleTaskException(e); 197 } catch (final ContextClientCodeException e) { 198 this.handleContextException(e); 199 } 200 } 201 202 } 203 204 /** 205 * @return the TaskStatusProto of the currently running task, if there is any 206 */ 207 public Optional<ReefServiceProtos.TaskStatusProto> getTaskStatus() { 208 synchronized (this.contextStack) { 209 if (this.contextStack.isEmpty()) { 210 throw new RuntimeException( 211 "Asked for a Task status while there isn't even a context running."); 212 } 213 return this.contextStack.peek().getTaskStatus(); 214 } 215 } 216 217 /** 218 * @return the status of all context in the stack. 219 */ 220 public Collection<ReefServiceProtos.ContextStatusProto> getContextStatusCollection() { 221 synchronized (this.contextStack) { 222 final List<ReefServiceProtos.ContextStatusProto> result = new ArrayList<>(this.contextStack.size()); 223 for (final ContextRuntime contextRuntime : this.contextStack) { 224 final ReefServiceProtos.ContextStatusProto contextStatusProto = contextRuntime.getContextStatus(); 225 LOG.log(Level.FINEST, "Add context status: {0}", contextStatusProto); 226 result.add(contextStatusProto); 227 } 228 return result; 229 } 230 } 231 232 /** 233 * Add a context to the stack. 234 * 235 * @param addContextProto 236 * @throws ContextClientCodeException if there is a client code related issue. 237 */ 238 private void addContext( 239 final EvaluatorRuntimeProtocol.AddContextProto addContextProto) 240 throws ContextClientCodeException { 241 242 synchronized (this.contextStack) { 243 try { 244 245 final ContextRuntime currentTopContext = this.contextStack.peek(); 246 247 if (!currentTopContext.getIdentifier().equals(addContextProto.getParentContextId())) { 248 throw new IllegalStateException("Trying to instantiate a child context on context with id `" + 249 addContextProto.getParentContextId() + "` while the current top context id is `" + 250 currentTopContext.getIdentifier() + "`"); 251 } 252 253 final Configuration contextConfiguration = 254 this.configurationSerializer.fromString(addContextProto.getContextConfiguration()); 255 256 final ContextRuntime newTopContext; 257 if (addContextProto.hasServiceConfiguration()) { 258 newTopContext = currentTopContext.spawnChildContext(contextConfiguration, 259 this.configurationSerializer.fromString(addContextProto.getServiceConfiguration())); 260 } else { 261 newTopContext = currentTopContext.spawnChildContext(contextConfiguration); 262 } 263 264 this.contextStack.push(newTopContext); 265 266 } catch (final IOException | BindException e) { 267 throw new RuntimeException("Unable to read configuration.", e); 268 } 269 270 } 271 } 272 273 /** 274 * Remove the context with the given ID from the stack. 275 * 276 * @throws IllegalStateException if the given ID does not refer to the top of stack. 277 */ 278 private void removeContext(final String contextID) { 279 280 synchronized (this.contextStack) { 281 282 if (!contextID.equals(this.contextStack.peek().getIdentifier())) { 283 throw new IllegalStateException("Trying to close context with id `" + contextID + 284 "`. But the top context has id `" + 285 this.contextStack.peek().getIdentifier() + "`"); 286 } 287 288 this.contextStack.peek().close(); 289 if (this.contextStack.size() > 1) { 290 /* We did not close the root context. Therefore, we need to inform the 291 * driver explicitly that this context is closed. The root context notification 292 * is implicit in the Evaluator close/done notification. 293 */ 294 this.heartBeatManager.sendHeartbeat(); // Ensure Driver gets notified of context DONE state 295 } 296 this.contextStack.pop(); 297 /* 298 * At this moment, the Evaluator is actually idle and has some time till the Driver sends it additional work. 299 * Also, a potentially large object graph just became orphaned: all the objects instantiated by the context 300 * and service injectors can now be garbage collected. So GC call is justified. 301 * */ 302 System.gc(); 303 } 304 } 305 306 /** 307 * Launch a Task. 308 */ 309 private void startTask( 310 final EvaluatorRuntimeProtocol.StartTaskProto startTaskProto) throws TaskClientCodeException { 311 312 synchronized (this.contextStack) { 313 314 final ContextRuntime currentActiveContext = this.contextStack.peek(); 315 316 final String expectedContextId = startTaskProto.getContextId(); 317 if (!expectedContextId.equals(currentActiveContext.getIdentifier())) { 318 throw new IllegalStateException("Task expected context `" + expectedContextId + 319 "` but the active context has ID `" + currentActiveContext.getIdentifier() + "`"); 320 } 321 322 try { 323 final Configuration taskConfig = 324 this.configurationSerializer.fromString(startTaskProto.getConfiguration()); 325 currentActiveContext.startTask(taskConfig); 326 } catch (IOException | BindException e) { 327 throw new RuntimeException("Unable to read configuration.", e); 328 } 329 } 330 } 331 332 /** 333 * THIS ASSUMES THAT IT IS CALLED ON A THREAD HOLDING THE LOCK ON THE HeartBeatManager. 334 */ 335 private void handleTaskException(final TaskClientCodeException e) { 336 337 LOG.log(Level.SEVERE, "TaskClientCodeException", e); 338 339 final ByteString exception = ByteString.copyFrom(this.exceptionCodec.toBytes(e.getCause())); 340 341 final ReefServiceProtos.TaskStatusProto taskStatus = 342 ReefServiceProtos.TaskStatusProto.newBuilder() 343 .setContextId(e.getContextId()) 344 .setTaskId(e.getTaskId()) 345 .setResult(exception) 346 .setState(ReefServiceProtos.State.FAILED) 347 .build(); 348 349 LOG.log(Level.SEVERE, "Sending heartbeat: {0}", taskStatus); 350 351 this.heartBeatManager.sendTaskStatus(taskStatus); 352 } 353 354 /** 355 * THIS ASSUMES THAT IT IS CALLED ON A THREAD HOLDING THE LOCK ON THE HeartBeatManager. 356 */ 357 private void handleContextException(final ContextClientCodeException e) { 358 359 LOG.log(Level.SEVERE, "ContextClientCodeException", e); 360 361 final ByteString exception = ByteString.copyFrom(this.exceptionCodec.toBytes(e.getCause())); 362 363 final ReefServiceProtos.ContextStatusProto.Builder contextStatusBuilder = 364 ReefServiceProtos.ContextStatusProto.newBuilder() 365 .setContextId(e.getContextID()) 366 .setContextState(ReefServiceProtos.ContextStatusProto.State.FAIL) 367 .setError(exception); 368 369 if (e.getParentID().isPresent()) { 370 contextStatusBuilder.setParentId(e.getParentID().get()); 371 } 372 373 final ReefServiceProtos.ContextStatusProto contextStatus = contextStatusBuilder.build(); 374 375 LOG.log(Level.SEVERE, "Sending heartbeat: {0}", contextStatus); 376 377 this.heartBeatManager.sendContextStatus(contextStatus); 378 } 379}