This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.evaluator.context;
020
021import com.google.protobuf.ByteString;
022import org.apache.reef.annotations.Provided;
023import org.apache.reef.annotations.audience.EvaluatorSide;
024import org.apache.reef.annotations.audience.Private;
025import org.apache.reef.evaluator.context.ContextMessage;
026import org.apache.reef.evaluator.context.ContextMessageSource;
027import org.apache.reef.evaluator.context.parameters.Services;
028import org.apache.reef.proto.ReefServiceProtos;
029import org.apache.reef.runtime.common.evaluator.task.TaskClientCodeException;
030import org.apache.reef.runtime.common.evaluator.task.TaskRuntime;
031import org.apache.reef.tang.Configuration;
032import org.apache.reef.tang.Injector;
033import org.apache.reef.tang.exceptions.BindException;
034import org.apache.reef.tang.exceptions.InjectionException;
035import org.apache.reef.util.Optional;
036
037import java.util.Set;
038import java.util.logging.Level;
039import java.util.logging.Logger;
040
041/**
042 * The evaluator side resourcemanager for Contexts.
043 */
044@Provided
045@Private
046@EvaluatorSide
047public final class ContextRuntime {
048
049  private static final Logger LOG = Logger.getLogger(ContextRuntime.class.getName());
050
051  /**
052   * Context-local injector. This contains information that will not be available in child injectors.
053   */
054  private final Injector contextInjector;
055
056  /**
057   * Service injector. State in this injector moves to child injectors.
058   */
059  private final Injector serviceInjector;
060
061  /**
062   * Convenience class to hold all the event handlers for the context as well as the service instances.
063   */
064  private final ContextLifeCycle contextLifeCycle;
065  /**
066   * The parent context, if there is any.
067   */
068  private final Optional<ContextRuntime> parentContext; // guarded by this
069  /**
070   * The child context, if there is any.
071   */
072  private Optional<ContextRuntime> childContext = Optional.empty(); // guarded by this
073  /**
074   * The currently running task, if there is any.
075   */
076  private Optional<TaskRuntime> task = Optional.empty(); // guarded by this
077
078  private Thread taskRuntimeThread = null;
079
080  // TODO: Which lock guards this?
081  private ReefServiceProtos.ContextStatusProto.State contextState =
082      ReefServiceProtos.ContextStatusProto.State.READY;
083
084  /**
085   * Create a new ContextRuntime.
086   *
087   * @param serviceInjector      the serviceInjector to be used.
088   * @param contextConfiguration the Configuration for this context.
089   * @throws ContextClientCodeException if the context cannot be instantiated.
090   */
091  ContextRuntime(final Injector serviceInjector, final Configuration contextConfiguration,
092                 final Optional<ContextRuntime> parentContext) throws ContextClientCodeException {
093
094    this.serviceInjector = serviceInjector;
095    this.parentContext = parentContext;
096
097    // Trigger the instantiation of the services
098    try {
099
100      final Set<Object> services = serviceInjector.getNamedInstance(Services.class);
101      this.contextInjector = serviceInjector.forkInjector(contextConfiguration);
102
103      this.contextLifeCycle = this.contextInjector.getInstance(ContextLifeCycle.class);
104
105    } catch (BindException | InjectionException e) {
106
107      final Optional<String> parentID = this.getParentContext().isPresent() ?
108          Optional.of(this.getParentContext().get().getIdentifier()) :
109          Optional.<String>empty();
110
111      throw new ContextClientCodeException(
112          ContextClientCodeException.getIdentifier(contextConfiguration),
113          parentID, "Unable to spawn context", e);
114    }
115
116    // Trigger the context start events on contextInjector.
117    this.contextLifeCycle.start();
118  }
119
120  /**
121   * Create a new ContextRuntime for the root context.
122   *
123   * @param serviceInjector      the serviceInjector to be used.
124   * @param contextConfiguration the Configuration for this context.
125   * @throws ContextClientCodeException if the context cannot be instantiated.
126   */
127  ContextRuntime(final Injector serviceInjector,
128                 final Configuration contextConfiguration) throws ContextClientCodeException {
129    this(serviceInjector, contextConfiguration, Optional.<ContextRuntime>empty());
130    LOG.log(Level.FINEST, "Instantiating root context");
131  }
132
133
134  /**
135   * Spawns a new context.
136   * <p/>
137   * The new context will have a serviceInjector that is created by forking the one in this object with the given
138   * serviceConfiguration. The contextConfiguration is used to fork the contextInjector from that new serviceInjector.
139   *
140   * @param contextConfiguration the new context's context (local) Configuration.
141   * @param serviceConfiguration the new context's service Configuration.
142   * @return a child context.
143   * @throws ContextClientCodeException If the context can't be instantiate due to user code / configuration issues
144   * @throws IllegalStateException      If this method is called when there is either a task or child context already
145   *                                    present.
146   */
147  ContextRuntime spawnChildContext(
148      final Configuration contextConfiguration,
149      final Configuration serviceConfiguration) throws ContextClientCodeException {
150
151    synchronized (this.contextLifeCycle) {
152
153      if (this.task.isPresent()) {
154        throw new IllegalStateException(
155            "Attempting to spawn a child context when a Task with id '" +
156                this.task.get().getId() + "' is running.");
157      }
158
159      if (this.childContext.isPresent()) {
160        throw new IllegalStateException(
161            "Attempting to instantiate a child context on a context that is not the topmost active context");
162      }
163
164      try {
165
166        final Injector childServiceInjector =
167            this.serviceInjector.forkInjector(serviceConfiguration);
168
169        final ContextRuntime childContext =
170            new ContextRuntime(childServiceInjector, contextConfiguration, Optional.of(this));
171
172        this.childContext = Optional.of(childContext);
173        return childContext;
174
175      } catch (final BindException e) {
176
177        final Optional<String> parentID = this.getParentContext().isPresent() ?
178            Optional.of(this.getParentContext().get().getIdentifier()) :
179            Optional.<String>empty();
180
181        throw new ContextClientCodeException(
182            ContextClientCodeException.getIdentifier(contextConfiguration),
183            parentID, "Unable to spawn context", e);
184      }
185    }
186  }
187
188  /**
189   * Spawns a new context without services of its own.
190   * <p/>
191   * The new context will have a serviceInjector that is created by forking the one in this object. The
192   * contextConfiguration is used to fork the contextInjector from that new serviceInjector.
193   *
194   * @param contextConfiguration the new context's context (local) Configuration.
195   * @return a child context.
196   * @throws ContextClientCodeException If the context can't be instantiate due to user code / configuration issues.
197   * @throws IllegalStateException      If this method is called when there is either a task or child context already
198   *                                    present.
199   */
200  ContextRuntime spawnChildContext(
201      final Configuration contextConfiguration) throws ContextClientCodeException {
202
203    synchronized (this.contextLifeCycle) {
204
205      if (this.task.isPresent()) {
206        throw new IllegalStateException(
207            "Attempting to to spawn a child context while a Task with id '" +
208                this.task.get().getId() + "' is running.");
209      }
210
211      if (this.childContext.isPresent()) {
212        throw new IllegalStateException(
213            "Attempting to spawn a child context on a context that is not the topmost active context");
214      }
215
216      final Injector childServiceInjector = this.serviceInjector.forkInjector();
217      final ContextRuntime childContext =
218          new ContextRuntime(childServiceInjector, contextConfiguration, Optional.of(this));
219
220      this.childContext = Optional.of(childContext);
221      return childContext;
222    }
223  }
224
225  /**
226   * Launches a Task on this context.
227   *
228   * @param taskConfig the configuration to be used for the task.
229   * @throws org.apache.reef.runtime.common.evaluator.task.TaskClientCodeException If the Task cannot be instantiated due to user code / configuration issues.
230   * @throws IllegalStateException                                                 If this method is called when there is either a task or child context already present.
231   */
232  void startTask(final Configuration taskConfig) throws TaskClientCodeException {
233
234    synchronized (this.contextLifeCycle) {
235
236      if (this.task.isPresent() && this.task.get().hasEnded()) {
237        // clean up state
238        this.task = Optional.empty();
239      }
240
241      if (this.task.isPresent()) {
242        throw new IllegalStateException("Attempting to start a Task when a Task with id '" +
243            this.task.get().getId() + "' is running.");
244      }
245
246      if (this.childContext.isPresent()) {
247        throw new IllegalStateException(
248            "Attempting to start a Task on a context that is not the topmost active context");
249      }
250
251      try {
252        final Injector taskInjector = this.contextInjector.forkInjector(taskConfig);
253        final TaskRuntime taskRuntime = taskInjector.getInstance(TaskRuntime.class);
254        taskRuntime.initialize();
255        this.taskRuntimeThread = new Thread(taskRuntime, taskRuntime.getId());
256        this.taskRuntimeThread.start();
257        this.task = Optional.of(taskRuntime);
258        LOG.log(Level.FINEST, "Started task: {0}", taskRuntime.getTaskId());
259      } catch (final BindException | InjectionException e) {
260        throw new TaskClientCodeException(TaskClientCodeException.getTaskId(taskConfig),
261            this.getIdentifier(),
262            "Unable to instantiate the new task", e);
263      } catch (final Throwable t) {
264        throw new TaskClientCodeException(TaskClientCodeException.getTaskId(taskConfig),
265            this.getIdentifier(),
266            "Unable to start the new task", t);
267      }
268    }
269  }
270
271  /**
272   * Close this context. If there is a child context, this recursively closes it before closing this context. If
273   * there is a Task currently running, that will be closed.
274   */
275  final void close() {
276
277    synchronized (this.contextLifeCycle) {
278
279      this.contextState = ReefServiceProtos.ContextStatusProto.State.DONE;
280
281      if (this.task.isPresent()) {
282        LOG.log(Level.WARNING, "Shutting down a task because the underlying context is being closed.");
283        this.task.get().close(null);
284      }
285
286      if (this.childContext.isPresent()) {
287        LOG.log(Level.WARNING, "Closing a context because its parent context is being closed.");
288        this.childContext.get().close();
289      }
290
291      this.contextLifeCycle.close();
292
293      if (this.parentContext.isPresent()) {
294        this.parentContext.get().resetChildContext();
295      }
296    }
297  }
298
299  /**
300   * @return the parent context, if there is one.
301   */
302  Optional<ContextRuntime> getParentContext() {
303    return this.parentContext;
304  }
305
306  /**
307   * Deliver the given message to the Task.
308   * <p/>
309   * Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING
310   * in the log.
311   *
312   * @param message the suspend message to deliver or null if there is none.
313   */
314  void suspendTask(final byte[] message) {
315    synchronized (this.contextLifeCycle) {
316      if (!this.task.isPresent()) {
317        LOG.log(Level.WARNING, "Received a suspend task while there was no task running. Ignoring.");
318      } else {
319        this.task.get().suspend(message);
320      }
321    }
322  }
323
324  /**
325   * Issue a close call to the Task
326   * <p/>
327   * Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING
328   * in the log.
329   *
330   * @param message the close  message to deliver or null if there is none.
331   */
332  void closeTask(final byte[] message) {
333    synchronized (this.contextLifeCycle) {
334      if (!this.task.isPresent()) {
335        LOG.log(Level.WARNING, "Received a close task while there was no task running. Ignoring.");
336      } else {
337        this.task.get().close(message);
338      }
339    }
340  }
341
342  /**
343   * Deliver a message to the Task
344   * <p/>
345   * Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING
346   * in the log.
347   *
348   * @param message the close  message to deliver or null if there is none.
349   */
350  void deliverTaskMessage(final byte[] message) {
351    synchronized (this.contextLifeCycle) {
352      if (!this.task.isPresent()) {
353        LOG.log(Level.WARNING, "Received a task message while there was no task running. Ignoring.");
354      } else {
355        this.task.get().deliver(message);
356      }
357    }
358  }
359
360  /**
361   * @return the identifier of this context.
362   */
363  String getIdentifier() {
364    return this.contextLifeCycle.getIdentifier();
365  }
366
367  /**
368   * Handle the context message.
369   *
370   * @param message sent by the driver
371   */
372  final void handleContextMessage(final byte[] message) {
373    this.contextLifeCycle.handleContextMessage(message);
374  }
375
376  /**
377   * @return the state of the running Task, if one is running.
378   */
379  Optional<ReefServiceProtos.TaskStatusProto> getTaskStatus() {
380    synchronized (this.contextLifeCycle) {
381      if (this.task.isPresent()) {
382        if (this.task.get().hasEnded()) {
383          this.task = Optional.empty();
384          return Optional.empty();
385        } else {
386          return Optional.of(this.task.get().getStatusProto());
387        }
388      } else {
389        return Optional.empty();
390      }
391    }
392  }
393
394  /**
395   * Called by the child context when it has been closed.
396   */
397  private void resetChildContext() {
398    synchronized (this.contextLifeCycle) {
399      if (this.childContext.isPresent()) {
400        this.childContext = Optional.empty();
401      } else {
402        throw new IllegalStateException("no child context set");
403      }
404    }
405  }
406
407  /**
408   * @return this context's status in protocol buffer form.
409   */
410  ReefServiceProtos.ContextStatusProto getContextStatus() {
411
412    synchronized (this.contextLifeCycle) {
413
414      final ReefServiceProtos.ContextStatusProto.Builder builder =
415          ReefServiceProtos.ContextStatusProto.newBuilder()
416              .setContextId(this.getIdentifier())
417              .setContextState(this.contextState);
418
419      if (this.parentContext.isPresent()) {
420        builder.setParentId(this.parentContext.get().getIdentifier());
421      }
422
423      for (final ContextMessageSource contextMessageSource : this.contextLifeCycle.getContextMessageSources()) {
424        final Optional<ContextMessage> contextMessageOptional = contextMessageSource.getMessage();
425        if (contextMessageOptional.isPresent()) {
426          builder.addContextMessage(ReefServiceProtos.ContextStatusProto.ContextMessageProto.newBuilder()
427              .setSourceId(contextMessageOptional.get().getMessageSourceID())
428              .setMessage(ByteString.copyFrom(contextMessageOptional.get().get()))
429              .build());
430        }
431      }
432
433      return builder.build();
434    }
435  }
436}