001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.evaluator.context; 020 021import com.google.protobuf.ByteString; 022import org.apache.reef.annotations.Provided; 023import org.apache.reef.annotations.audience.EvaluatorSide; 024import org.apache.reef.annotations.audience.Private; 025import org.apache.reef.evaluator.context.ContextMessage; 026import org.apache.reef.evaluator.context.ContextMessageSource; 027import org.apache.reef.evaluator.context.parameters.Services; 028import org.apache.reef.proto.ReefServiceProtos; 029import org.apache.reef.runtime.common.evaluator.task.TaskClientCodeException; 030import org.apache.reef.runtime.common.evaluator.task.TaskRuntime; 031import org.apache.reef.tang.Configuration; 032import org.apache.reef.tang.Injector; 033import org.apache.reef.tang.exceptions.BindException; 034import org.apache.reef.tang.exceptions.InjectionException; 035import org.apache.reef.util.Optional; 036 037import java.util.Set; 038import java.util.logging.Level; 039import java.util.logging.Logger; 040 041/** 042 * The evaluator side resourcemanager for Contexts. 043 */ 044@Provided 045@Private 046@EvaluatorSide 047public final class ContextRuntime { 048 049 private static final Logger LOG = Logger.getLogger(ContextRuntime.class.getName()); 050 051 /** 052 * Context-local injector. This contains information that will not be available in child injectors. 053 */ 054 private final Injector contextInjector; 055 056 /** 057 * Service injector. State in this injector moves to child injectors. 058 */ 059 private final Injector serviceInjector; 060 061 /** 062 * Convenience class to hold all the event handlers for the context as well as the service instances. 063 */ 064 private final ContextLifeCycle contextLifeCycle; 065 /** 066 * The parent context, if there is any. 067 */ 068 private final Optional<ContextRuntime> parentContext; // guarded by this 069 /** 070 * The child context, if there is any. 071 */ 072 private Optional<ContextRuntime> childContext = Optional.empty(); // guarded by this 073 /** 074 * The currently running task, if there is any. 075 */ 076 private Optional<TaskRuntime> task = Optional.empty(); // guarded by this 077 078 private Thread taskRuntimeThread = null; 079 080 // TODO[JIRA REEF-835]: Which lock guards this? 081 private ReefServiceProtos.ContextStatusProto.State contextState = 082 ReefServiceProtos.ContextStatusProto.State.READY; 083 084 /** 085 * Create a new ContextRuntime. 086 * 087 * @param serviceInjector the serviceInjector to be used. 088 * @param contextConfiguration the Configuration for this context. 089 * @throws ContextClientCodeException if the context cannot be instantiated. 090 */ 091 ContextRuntime(final Injector serviceInjector, final Configuration contextConfiguration, 092 final Optional<ContextRuntime> parentContext) throws ContextClientCodeException { 093 094 this.serviceInjector = serviceInjector; 095 this.parentContext = parentContext; 096 097 // Trigger the instantiation of the services 098 try { 099 100 final Set<Object> services = serviceInjector.getNamedInstance(Services.class); 101 this.contextInjector = serviceInjector.forkInjector(contextConfiguration); 102 103 this.contextLifeCycle = this.contextInjector.getInstance(ContextLifeCycle.class); 104 105 } catch (BindException | InjectionException e) { 106 107 final Optional<String> parentID = this.getParentContext().isPresent() ? 108 Optional.of(this.getParentContext().get().getIdentifier()) : 109 Optional.<String>empty(); 110 111 throw new ContextClientCodeException( 112 ContextClientCodeException.getIdentifier(contextConfiguration), 113 parentID, "Unable to spawn context", e); 114 } 115 116 // Trigger the context start events on contextInjector. 117 this.contextLifeCycle.start(); 118 } 119 120 /** 121 * Create a new ContextRuntime for the root context. 122 * 123 * @param serviceInjector the serviceInjector to be used. 124 * @param contextConfiguration the Configuration for this context. 125 * @throws ContextClientCodeException if the context cannot be instantiated. 126 */ 127 ContextRuntime(final Injector serviceInjector, 128 final Configuration contextConfiguration) throws ContextClientCodeException { 129 this(serviceInjector, contextConfiguration, Optional.<ContextRuntime>empty()); 130 LOG.log(Level.FINEST, "Instantiating root context"); 131 } 132 133 134 /** 135 * Spawns a new context. 136 * <p> 137 * The new context will have a serviceInjector that is created by forking the one in this object with the given 138 * serviceConfiguration. The contextConfiguration is used to fork the contextInjector from that new serviceInjector. 139 * 140 * @param contextConfiguration the new context's context (local) Configuration. 141 * @param serviceConfiguration the new context's service Configuration. 142 * @return a child context. 143 * @throws ContextClientCodeException If the context can't be instantiate due to user code / configuration issues 144 * @throws IllegalStateException If this method is called when there is either a task or child context already 145 * present. 146 */ 147 ContextRuntime spawnChildContext( 148 final Configuration contextConfiguration, 149 final Configuration serviceConfiguration) throws ContextClientCodeException { 150 151 synchronized (this.contextLifeCycle) { 152 153 if (this.task.isPresent()) { 154 throw new IllegalStateException( 155 "Attempting to spawn a child context when a Task with id '" + 156 this.task.get().getId() + "' is running."); 157 } 158 159 if (this.childContext.isPresent()) { 160 throw new IllegalStateException( 161 "Attempting to instantiate a child context on a context that is not the topmost active context"); 162 } 163 164 try { 165 166 final Injector childServiceInjector = 167 this.serviceInjector.forkInjector(serviceConfiguration); 168 169 final ContextRuntime newChildContext = 170 new ContextRuntime(childServiceInjector, contextConfiguration, Optional.of(this)); 171 172 this.childContext = Optional.of(newChildContext); 173 return newChildContext; 174 175 } catch (final BindException e) { 176 177 final Optional<String> parentID = this.getParentContext().isPresent() ? 178 Optional.of(this.getParentContext().get().getIdentifier()) : 179 Optional.<String>empty(); 180 181 throw new ContextClientCodeException( 182 ContextClientCodeException.getIdentifier(contextConfiguration), 183 parentID, "Unable to spawn context", e); 184 } 185 } 186 } 187 188 /** 189 * Spawns a new context without services of its own. 190 * <p> 191 * The new context will have a serviceInjector that is created by forking the one in this object. The 192 * contextConfiguration is used to fork the contextInjector from that new serviceInjector. 193 * 194 * @param contextConfiguration the new context's context (local) Configuration. 195 * @return a child context. 196 * @throws ContextClientCodeException If the context can't be instantiate due to user code / configuration issues. 197 * @throws IllegalStateException If this method is called when there is either a task or child context already 198 * present. 199 */ 200 ContextRuntime spawnChildContext( 201 final Configuration contextConfiguration) throws ContextClientCodeException { 202 203 synchronized (this.contextLifeCycle) { 204 205 if (this.task.isPresent()) { 206 throw new IllegalStateException( 207 "Attempting to to spawn a child context while a Task with id '" + 208 this.task.get().getId() + "' is running."); 209 } 210 211 if (this.childContext.isPresent()) { 212 throw new IllegalStateException( 213 "Attempting to spawn a child context on a context that is not the topmost active context"); 214 } 215 216 final Injector childServiceInjector = this.serviceInjector.forkInjector(); 217 final ContextRuntime newChildContext = 218 new ContextRuntime(childServiceInjector, contextConfiguration, Optional.of(this)); 219 220 this.childContext = Optional.of(newChildContext); 221 return newChildContext; 222 } 223 } 224 225 /** 226 * Launches a Task on this context. 227 * 228 * @param taskConfig the configuration to be used for the task. 229 * @throws org.apache.reef.runtime.common.evaluator.task.TaskClientCodeException If the Task cannot be instantiated 230 * due to user code / configuration issues. 231 * @throws IllegalStateException If this method is called when 232 * there is either a task or child context already present. 233 */ 234 @SuppressWarnings("checkstyle:illegalcatch") 235 void startTask(final Configuration taskConfig) throws TaskClientCodeException { 236 237 synchronized (this.contextLifeCycle) { 238 239 if (this.task.isPresent() && this.task.get().hasEnded()) { 240 // clean up state 241 this.task = Optional.empty(); 242 } 243 244 if (this.task.isPresent()) { 245 throw new IllegalStateException("Attempting to start a Task when a Task with id '" + 246 this.task.get().getId() + "' is running."); 247 } 248 249 if (this.childContext.isPresent()) { 250 throw new IllegalStateException( 251 "Attempting to start a Task on a context that is not the topmost active context"); 252 } 253 254 try { 255 final Injector taskInjector = this.contextInjector.forkInjector(taskConfig); 256 final TaskRuntime taskRuntime = taskInjector.getInstance(TaskRuntime.class); 257 taskRuntime.initialize(); 258 this.taskRuntimeThread = new Thread(taskRuntime, taskRuntime.getId()); 259 this.taskRuntimeThread.start(); 260 this.task = Optional.of(taskRuntime); 261 LOG.log(Level.FINEST, "Started task: {0}", taskRuntime.getTaskId()); 262 } catch (final BindException | InjectionException e) { 263 throw new TaskClientCodeException(TaskClientCodeException.getTaskId(taskConfig), 264 this.getIdentifier(), 265 "Unable to instantiate the new task", e); 266 } catch (final Throwable t) { 267 throw new TaskClientCodeException(TaskClientCodeException.getTaskId(taskConfig), 268 this.getIdentifier(), 269 "Unable to start the new task", t); 270 } 271 } 272 } 273 274 /** 275 * Close this context. If there is a child context, this recursively closes it before closing this context. If 276 * there is a Task currently running, that will be closed. 277 */ 278 void close() { 279 280 synchronized (this.contextLifeCycle) { 281 282 this.contextState = ReefServiceProtos.ContextStatusProto.State.DONE; 283 284 if (this.task.isPresent()) { 285 LOG.log(Level.WARNING, "Shutting down a task because the underlying context is being closed."); 286 this.task.get().close(null); 287 } 288 289 if (this.childContext.isPresent()) { 290 LOG.log(Level.WARNING, "Closing a context because its parent context is being closed."); 291 this.childContext.get().close(); 292 } 293 294 this.contextLifeCycle.close(); 295 296 if (this.parentContext.isPresent()) { 297 this.parentContext.get().resetChildContext(); 298 } 299 } 300 } 301 302 /** 303 * @return the parent context, if there is one. 304 */ 305 Optional<ContextRuntime> getParentContext() { 306 return this.parentContext; 307 } 308 309 /** 310 * Deliver the given message to the Task. 311 * <p> 312 * Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING 313 * in the log. 314 * 315 * @param message the suspend message to deliver or null if there is none. 316 */ 317 void suspendTask(final byte[] message) { 318 synchronized (this.contextLifeCycle) { 319 if (!this.task.isPresent()) { 320 LOG.log(Level.WARNING, "Received a suspend task while there was no task running. Ignoring."); 321 } else { 322 this.task.get().suspend(message); 323 } 324 } 325 } 326 327 /** 328 * Issue a close call to the Task 329 * <p> 330 * Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING 331 * in the log. 332 * 333 * @param message the close message to deliver or null if there is none. 334 */ 335 void closeTask(final byte[] message) { 336 synchronized (this.contextLifeCycle) { 337 if (!this.task.isPresent()) { 338 LOG.log(Level.WARNING, "Received a close task while there was no task running. Ignoring."); 339 } else { 340 this.task.get().close(message); 341 } 342 } 343 } 344 345 /** 346 * Deliver a message to the Task 347 * <p> 348 * Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING 349 * in the log. 350 * 351 * @param message the close message to deliver or null if there is none. 352 */ 353 void deliverTaskMessage(final byte[] message) { 354 synchronized (this.contextLifeCycle) { 355 if (!this.task.isPresent()) { 356 LOG.log(Level.WARNING, "Received a task message while there was no task running. Ignoring."); 357 } else { 358 this.task.get().deliver(message); 359 } 360 } 361 } 362 363 /** 364 * @return the identifier of this context. 365 */ 366 String getIdentifier() { 367 return this.contextLifeCycle.getIdentifier(); 368 } 369 370 /** 371 * Handle the context message. 372 * 373 * @param message sent by the driver 374 */ 375 void handleContextMessage(final byte[] message) { 376 this.contextLifeCycle.handleContextMessage(message); 377 } 378 379 /** 380 * @return the state of the running Task, if one is running. 381 */ 382 Optional<ReefServiceProtos.TaskStatusProto> getTaskStatus() { 383 synchronized (this.contextLifeCycle) { 384 if (this.task.isPresent()) { 385 if (this.task.get().hasEnded()) { 386 this.task = Optional.empty(); 387 return Optional.empty(); 388 } else { 389 return Optional.of(this.task.get().getStatusProto()); 390 } 391 } else { 392 return Optional.empty(); 393 } 394 } 395 } 396 397 /** 398 * Called by the child context when it has been closed. 399 */ 400 private void resetChildContext() { 401 synchronized (this.contextLifeCycle) { 402 if (this.childContext.isPresent()) { 403 this.childContext = Optional.empty(); 404 } else { 405 throw new IllegalStateException("no child context set"); 406 } 407 } 408 } 409 410 /** 411 * @return this context's status in protocol buffer form. 412 */ 413 ReefServiceProtos.ContextStatusProto getContextStatus() { 414 415 synchronized (this.contextLifeCycle) { 416 417 final ReefServiceProtos.ContextStatusProto.Builder builder = 418 ReefServiceProtos.ContextStatusProto.newBuilder() 419 .setContextId(this.getIdentifier()) 420 .setContextState(this.contextState); 421 422 if (this.parentContext.isPresent()) { 423 builder.setParentId(this.parentContext.get().getIdentifier()); 424 } 425 426 for (final ContextMessageSource contextMessageSource : this.contextLifeCycle.getContextMessageSources()) { 427 final Optional<ContextMessage> contextMessageOptional = contextMessageSource.getMessage(); 428 if (contextMessageOptional.isPresent()) { 429 builder.addContextMessage(ReefServiceProtos.ContextStatusProto.ContextMessageProto.newBuilder() 430 .setSourceId(contextMessageOptional.get().getMessageSourceID()) 431 .setMessage(ByteString.copyFrom(contextMessageOptional.get().get())) 432 .build()); 433 } 434 } 435 436 return builder.build(); 437 } 438 } 439}