001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.evaluator.context; 020 021import com.google.protobuf.ByteString; 022import org.apache.reef.annotations.Provided; 023import org.apache.reef.annotations.audience.EvaluatorSide; 024import org.apache.reef.annotations.audience.Private; 025import org.apache.reef.evaluator.context.ContextMessage; 026import org.apache.reef.evaluator.context.ContextMessageSource; 027import org.apache.reef.evaluator.context.parameters.Services; 028import org.apache.reef.proto.ReefServiceProtos; 029import org.apache.reef.runtime.common.evaluator.task.TaskClientCodeException; 030import org.apache.reef.runtime.common.evaluator.task.TaskRuntime; 031import org.apache.reef.tang.Configuration; 032import org.apache.reef.tang.Injector; 033import org.apache.reef.tang.exceptions.BindException; 034import org.apache.reef.tang.exceptions.InjectionException; 035import org.apache.reef.util.Optional; 036 037import java.util.Set; 038import java.util.logging.Level; 039import java.util.logging.Logger; 040 041/** 042 * The evaluator side resourcemanager for Contexts. 043 */ 044@Provided 045@Private 046@EvaluatorSide 047public final class ContextRuntime { 048 049 private static final Logger LOG = Logger.getLogger(ContextRuntime.class.getName()); 050 051 /** 052 * Context-local injector. This contains information that will not be available in child injectors. 053 */ 054 private final Injector contextInjector; 055 056 /** 057 * Service injector. State in this injector moves to child injectors. 058 */ 059 private final Injector serviceInjector; 060 061 /** 062 * Convenience class to hold all the event handlers for the context as well as the service instances. 063 */ 064 private final ContextLifeCycle contextLifeCycle; 065 /** 066 * The parent context, if there is any. 067 */ 068 private final Optional<ContextRuntime> parentContext; // guarded by this 069 /** 070 * The child context, if there is any. 071 */ 072 private Optional<ContextRuntime> childContext = Optional.empty(); // guarded by this 073 /** 074 * The currently running task, if there is any. 075 */ 076 private Optional<TaskRuntime> task = Optional.empty(); // guarded by this 077 078 private Thread taskRuntimeThread = null; 079 080 // TODO: Which lock guards this? 081 private ReefServiceProtos.ContextStatusProto.State contextState = 082 ReefServiceProtos.ContextStatusProto.State.READY; 083 084 /** 085 * Create a new ContextRuntime. 086 * 087 * @param serviceInjector the serviceInjector to be used. 088 * @param contextConfiguration the Configuration for this context. 089 * @throws ContextClientCodeException if the context cannot be instantiated. 090 */ 091 ContextRuntime(final Injector serviceInjector, final Configuration contextConfiguration, 092 final Optional<ContextRuntime> parentContext) throws ContextClientCodeException { 093 094 this.serviceInjector = serviceInjector; 095 this.parentContext = parentContext; 096 097 // Trigger the instantiation of the services 098 try { 099 100 final Set<Object> services = serviceInjector.getNamedInstance(Services.class); 101 this.contextInjector = serviceInjector.forkInjector(contextConfiguration); 102 103 this.contextLifeCycle = this.contextInjector.getInstance(ContextLifeCycle.class); 104 105 } catch (BindException | InjectionException e) { 106 107 final Optional<String> parentID = this.getParentContext().isPresent() ? 108 Optional.of(this.getParentContext().get().getIdentifier()) : 109 Optional.<String>empty(); 110 111 throw new ContextClientCodeException( 112 ContextClientCodeException.getIdentifier(contextConfiguration), 113 parentID, "Unable to spawn context", e); 114 } 115 116 // Trigger the context start events on contextInjector. 117 this.contextLifeCycle.start(); 118 } 119 120 /** 121 * Create a new ContextRuntime for the root context. 122 * 123 * @param serviceInjector the serviceInjector to be used. 124 * @param contextConfiguration the Configuration for this context. 125 * @throws ContextClientCodeException if the context cannot be instantiated. 126 */ 127 ContextRuntime(final Injector serviceInjector, 128 final Configuration contextConfiguration) throws ContextClientCodeException { 129 this(serviceInjector, contextConfiguration, Optional.<ContextRuntime>empty()); 130 LOG.log(Level.FINEST, "Instantiating root context"); 131 } 132 133 134 /** 135 * Spawns a new context. 136 * <p/> 137 * The new context will have a serviceInjector that is created by forking the one in this object with the given 138 * serviceConfiguration. The contextConfiguration is used to fork the contextInjector from that new serviceInjector. 139 * 140 * @param contextConfiguration the new context's context (local) Configuration. 141 * @param serviceConfiguration the new context's service Configuration. 142 * @return a child context. 143 * @throws ContextClientCodeException If the context can't be instantiate due to user code / configuration issues 144 * @throws IllegalStateException If this method is called when there is either a task or child context already 145 * present. 146 */ 147 ContextRuntime spawnChildContext( 148 final Configuration contextConfiguration, 149 final Configuration serviceConfiguration) throws ContextClientCodeException { 150 151 synchronized (this.contextLifeCycle) { 152 153 if (this.task.isPresent()) { 154 throw new IllegalStateException( 155 "Attempting to spawn a child context when a Task with id '" + 156 this.task.get().getId() + "' is running."); 157 } 158 159 if (this.childContext.isPresent()) { 160 throw new IllegalStateException( 161 "Attempting to instantiate a child context on a context that is not the topmost active context"); 162 } 163 164 try { 165 166 final Injector childServiceInjector = 167 this.serviceInjector.forkInjector(serviceConfiguration); 168 169 final ContextRuntime childContext = 170 new ContextRuntime(childServiceInjector, contextConfiguration, Optional.of(this)); 171 172 this.childContext = Optional.of(childContext); 173 return childContext; 174 175 } catch (final BindException e) { 176 177 final Optional<String> parentID = this.getParentContext().isPresent() ? 178 Optional.of(this.getParentContext().get().getIdentifier()) : 179 Optional.<String>empty(); 180 181 throw new ContextClientCodeException( 182 ContextClientCodeException.getIdentifier(contextConfiguration), 183 parentID, "Unable to spawn context", e); 184 } 185 } 186 } 187 188 /** 189 * Spawns a new context without services of its own. 190 * <p/> 191 * The new context will have a serviceInjector that is created by forking the one in this object. The 192 * contextConfiguration is used to fork the contextInjector from that new serviceInjector. 193 * 194 * @param contextConfiguration the new context's context (local) Configuration. 195 * @return a child context. 196 * @throws ContextClientCodeException If the context can't be instantiate due to user code / configuration issues. 197 * @throws IllegalStateException If this method is called when there is either a task or child context already 198 * present. 199 */ 200 ContextRuntime spawnChildContext( 201 final Configuration contextConfiguration) throws ContextClientCodeException { 202 203 synchronized (this.contextLifeCycle) { 204 205 if (this.task.isPresent()) { 206 throw new IllegalStateException( 207 "Attempting to to spawn a child context while a Task with id '" + 208 this.task.get().getId() + "' is running."); 209 } 210 211 if (this.childContext.isPresent()) { 212 throw new IllegalStateException( 213 "Attempting to spawn a child context on a context that is not the topmost active context"); 214 } 215 216 final Injector childServiceInjector = this.serviceInjector.forkInjector(); 217 final ContextRuntime childContext = 218 new ContextRuntime(childServiceInjector, contextConfiguration, Optional.of(this)); 219 220 this.childContext = Optional.of(childContext); 221 return childContext; 222 } 223 } 224 225 /** 226 * Launches a Task on this context. 227 * 228 * @param taskConfig the configuration to be used for the task. 229 * @throws org.apache.reef.runtime.common.evaluator.task.TaskClientCodeException If the Task cannot be instantiated due to user code / configuration issues. 230 * @throws IllegalStateException If this method is called when there is either a task or child context already present. 231 */ 232 void startTask(final Configuration taskConfig) throws TaskClientCodeException { 233 234 synchronized (this.contextLifeCycle) { 235 236 if (this.task.isPresent() && this.task.get().hasEnded()) { 237 // clean up state 238 this.task = Optional.empty(); 239 } 240 241 if (this.task.isPresent()) { 242 throw new IllegalStateException("Attempting to start a Task when a Task with id '" + 243 this.task.get().getId() + "' is running."); 244 } 245 246 if (this.childContext.isPresent()) { 247 throw new IllegalStateException( 248 "Attempting to start a Task on a context that is not the topmost active context"); 249 } 250 251 try { 252 final Injector taskInjector = this.contextInjector.forkInjector(taskConfig); 253 final TaskRuntime taskRuntime = taskInjector.getInstance(TaskRuntime.class); 254 taskRuntime.initialize(); 255 this.taskRuntimeThread = new Thread(taskRuntime, taskRuntime.getId()); 256 this.taskRuntimeThread.start(); 257 this.task = Optional.of(taskRuntime); 258 LOG.log(Level.FINEST, "Started task: {0}", taskRuntime.getTaskId()); 259 } catch (final BindException | InjectionException e) { 260 throw new TaskClientCodeException(TaskClientCodeException.getTaskId(taskConfig), 261 this.getIdentifier(), 262 "Unable to instantiate the new task", e); 263 } catch (final Throwable t) { 264 throw new TaskClientCodeException(TaskClientCodeException.getTaskId(taskConfig), 265 this.getIdentifier(), 266 "Unable to start the new task", t); 267 } 268 } 269 } 270 271 /** 272 * Close this context. If there is a child context, this recursively closes it before closing this context. If 273 * there is a Task currently running, that will be closed. 274 */ 275 final void close() { 276 277 synchronized (this.contextLifeCycle) { 278 279 this.contextState = ReefServiceProtos.ContextStatusProto.State.DONE; 280 281 if (this.task.isPresent()) { 282 LOG.log(Level.WARNING, "Shutting down a task because the underlying context is being closed."); 283 this.task.get().close(null); 284 } 285 286 if (this.childContext.isPresent()) { 287 LOG.log(Level.WARNING, "Closing a context because its parent context is being closed."); 288 this.childContext.get().close(); 289 } 290 291 this.contextLifeCycle.close(); 292 293 if (this.parentContext.isPresent()) { 294 this.parentContext.get().resetChildContext(); 295 } 296 } 297 } 298 299 /** 300 * @return the parent context, if there is one. 301 */ 302 Optional<ContextRuntime> getParentContext() { 303 return this.parentContext; 304 } 305 306 /** 307 * Deliver the given message to the Task. 308 * <p/> 309 * Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING 310 * in the log. 311 * 312 * @param message the suspend message to deliver or null if there is none. 313 */ 314 void suspendTask(final byte[] message) { 315 synchronized (this.contextLifeCycle) { 316 if (!this.task.isPresent()) { 317 LOG.log(Level.WARNING, "Received a suspend task while there was no task running. Ignoring."); 318 } else { 319 this.task.get().suspend(message); 320 } 321 } 322 } 323 324 /** 325 * Issue a close call to the Task 326 * <p/> 327 * Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING 328 * in the log. 329 * 330 * @param message the close message to deliver or null if there is none. 331 */ 332 void closeTask(final byte[] message) { 333 synchronized (this.contextLifeCycle) { 334 if (!this.task.isPresent()) { 335 LOG.log(Level.WARNING, "Received a close task while there was no task running. Ignoring."); 336 } else { 337 this.task.get().close(message); 338 } 339 } 340 } 341 342 /** 343 * Deliver a message to the Task 344 * <p/> 345 * Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING 346 * in the log. 347 * 348 * @param message the close message to deliver or null if there is none. 349 */ 350 void deliverTaskMessage(final byte[] message) { 351 synchronized (this.contextLifeCycle) { 352 if (!this.task.isPresent()) { 353 LOG.log(Level.WARNING, "Received a task message while there was no task running. Ignoring."); 354 } else { 355 this.task.get().deliver(message); 356 } 357 } 358 } 359 360 /** 361 * @return the identifier of this context. 362 */ 363 String getIdentifier() { 364 return this.contextLifeCycle.getIdentifier(); 365 } 366 367 /** 368 * Handle the context message. 369 * 370 * @param message sent by the driver 371 */ 372 final void handleContextMessage(final byte[] message) { 373 this.contextLifeCycle.handleContextMessage(message); 374 } 375 376 /** 377 * @return the state of the running Task, if one is running. 378 */ 379 Optional<ReefServiceProtos.TaskStatusProto> getTaskStatus() { 380 synchronized (this.contextLifeCycle) { 381 if (this.task.isPresent()) { 382 if (this.task.get().hasEnded()) { 383 this.task = Optional.empty(); 384 return Optional.empty(); 385 } else { 386 return Optional.of(this.task.get().getStatusProto()); 387 } 388 } else { 389 return Optional.empty(); 390 } 391 } 392 } 393 394 /** 395 * Called by the child context when it has been closed. 396 */ 397 private void resetChildContext() { 398 synchronized (this.contextLifeCycle) { 399 if (this.childContext.isPresent()) { 400 this.childContext = Optional.empty(); 401 } else { 402 throw new IllegalStateException("no child context set"); 403 } 404 } 405 } 406 407 /** 408 * @return this context's status in protocol buffer form. 409 */ 410 ReefServiceProtos.ContextStatusProto getContextStatus() { 411 412 synchronized (this.contextLifeCycle) { 413 414 final ReefServiceProtos.ContextStatusProto.Builder builder = 415 ReefServiceProtos.ContextStatusProto.newBuilder() 416 .setContextId(this.getIdentifier()) 417 .setContextState(this.contextState); 418 419 if (this.parentContext.isPresent()) { 420 builder.setParentId(this.parentContext.get().getIdentifier()); 421 } 422 423 for (final ContextMessageSource contextMessageSource : this.contextLifeCycle.getContextMessageSources()) { 424 final Optional<ContextMessage> contextMessageOptional = contextMessageSource.getMessage(); 425 if (contextMessageOptional.isPresent()) { 426 builder.addContextMessage(ReefServiceProtos.ContextStatusProto.ContextMessageProto.newBuilder() 427 .setSourceId(contextMessageOptional.get().getMessageSourceID()) 428 .setMessage(ByteString.copyFrom(contextMessageOptional.get().get())) 429 .build()); 430 } 431 } 432 433 return builder.build(); 434 } 435 } 436}