001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.evaluator.context;
020
021import com.google.protobuf.ByteString;
022import org.apache.reef.annotations.Provided;
023import org.apache.reef.annotations.audience.EvaluatorSide;
024import org.apache.reef.annotations.audience.Private;
025import org.apache.reef.evaluator.context.ContextMessage;
026import org.apache.reef.evaluator.context.ContextMessageSource;
027import org.apache.reef.evaluator.context.parameters.Services;
028import org.apache.reef.proto.ReefServiceProtos;
029import org.apache.reef.runtime.common.evaluator.task.TaskClientCodeException;
030import org.apache.reef.runtime.common.evaluator.task.TaskRuntime;
031import org.apache.reef.tang.Configuration;
032import org.apache.reef.tang.Injector;
033import org.apache.reef.tang.exceptions.BindException;
034import org.apache.reef.tang.exceptions.InjectionException;
035import org.apache.reef.util.Optional;
036
037import java.util.Set;
038import java.util.logging.Level;
039import java.util.logging.Logger;
040
041/**
042 * The evaluator side resourcemanager for Contexts.
043 */
044@Provided
045@Private
046@EvaluatorSide
047public final class ContextRuntime {
048
049  private static final Logger LOG = Logger.getLogger(ContextRuntime.class.getName());
050
051  /**
052   * Context-local injector. This contains information that will not be available in child injectors.
053   */
054  private final Injector contextInjector;
055
056  /**
057   * Service injector. State in this injector moves to child injectors.
058   */
059  private final Injector serviceInjector;
060
061  /**
062   * Convenience class to hold all the event handlers for the context as well as the service instances.
063   */
064  private final ContextLifeCycle contextLifeCycle;
065  /**
066   * The parent context, if there is any.
067   */
068  private final Optional<ContextRuntime> parentContext; // guarded by this
069  /**
070   * The child context, if there is any.
071   */
072  private Optional<ContextRuntime> childContext = Optional.empty(); // guarded by this
073  /**
074   * The currently running task, if there is any.
075   */
076  private Optional<TaskRuntime> task = Optional.empty(); // guarded by this
077
078  private Thread taskRuntimeThread = null;
079
080  // TODO[JIRA REEF-835]: Which lock guards this?
081  private ReefServiceProtos.ContextStatusProto.State contextState =
082      ReefServiceProtos.ContextStatusProto.State.READY;
083
084  /**
085   * Create a new ContextRuntime.
086   *
087   * @param serviceInjector      the serviceInjector to be used.
088   * @param contextConfiguration the Configuration for this context.
089   * @throws ContextClientCodeException if the context cannot be instantiated.
090   */
091  ContextRuntime(final Injector serviceInjector, final Configuration contextConfiguration,
092                 final Optional<ContextRuntime> parentContext) throws ContextClientCodeException {
093
094    this.serviceInjector = serviceInjector;
095    this.parentContext = parentContext;
096
097    // Trigger the instantiation of the services
098    try {
099
100      final Set<Object> services = serviceInjector.getNamedInstance(Services.class);
101      this.contextInjector = serviceInjector.forkInjector(contextConfiguration);
102
103      this.contextLifeCycle = this.contextInjector.getInstance(ContextLifeCycle.class);
104
105    } catch (BindException | InjectionException e) {
106
107      final Optional<String> parentID = this.getParentContext().isPresent() ?
108          Optional.of(this.getParentContext().get().getIdentifier()) :
109          Optional.<String>empty();
110
111      throw new ContextClientCodeException(
112          ContextClientCodeException.getIdentifier(contextConfiguration),
113          parentID, "Unable to spawn context", e);
114    }
115
116    // Trigger the context start events on contextInjector.
117    this.contextLifeCycle.start();
118  }
119
120  /**
121   * Create a new ContextRuntime for the root context.
122   *
123   * @param serviceInjector      the serviceInjector to be used.
124   * @param contextConfiguration the Configuration for this context.
125   * @throws ContextClientCodeException if the context cannot be instantiated.
126   */
127  ContextRuntime(final Injector serviceInjector,
128                 final Configuration contextConfiguration) throws ContextClientCodeException {
129    this(serviceInjector, contextConfiguration, Optional.<ContextRuntime>empty());
130    LOG.log(Level.FINEST, "Instantiating root context");
131  }
132
133
134  /**
135   * Spawns a new context.
136   * <p>
137   * The new context will have a serviceInjector that is created by forking the one in this object with the given
138   * serviceConfiguration. The contextConfiguration is used to fork the contextInjector from that new serviceInjector.
139   *
140   * @param contextConfiguration the new context's context (local) Configuration.
141   * @param serviceConfiguration the new context's service Configuration.
142   * @return a child context.
143   * @throws ContextClientCodeException If the context can't be instantiate due to user code / configuration issues
144   * @throws IllegalStateException      If this method is called when there is either a task or child context already
145   *                                    present.
146   */
147  ContextRuntime spawnChildContext(
148      final Configuration contextConfiguration,
149      final Configuration serviceConfiguration) throws ContextClientCodeException {
150
151    synchronized (this.contextLifeCycle) {
152
153      if (this.task.isPresent()) {
154        throw new IllegalStateException(
155            "Attempting to spawn a child context when a Task with id '" +
156                this.task.get().getId() + "' is running.");
157      }
158
159      if (this.childContext.isPresent()) {
160        throw new IllegalStateException(
161            "Attempting to instantiate a child context on a context that is not the topmost active context");
162      }
163
164      try {
165
166        final Injector childServiceInjector =
167            this.serviceInjector.forkInjector(serviceConfiguration);
168
169        final ContextRuntime newChildContext =
170            new ContextRuntime(childServiceInjector, contextConfiguration, Optional.of(this));
171
172        this.childContext = Optional.of(newChildContext);
173        return newChildContext;
174
175      } catch (final BindException e) {
176
177        final Optional<String> parentID = this.getParentContext().isPresent() ?
178            Optional.of(this.getParentContext().get().getIdentifier()) :
179            Optional.<String>empty();
180
181        throw new ContextClientCodeException(
182            ContextClientCodeException.getIdentifier(contextConfiguration),
183            parentID, "Unable to spawn context", e);
184      }
185    }
186  }
187
188  /**
189   * Spawns a new context without services of its own.
190   * <p>
191   * The new context will have a serviceInjector that is created by forking the one in this object. The
192   * contextConfiguration is used to fork the contextInjector from that new serviceInjector.
193   *
194   * @param contextConfiguration the new context's context (local) Configuration.
195   * @return a child context.
196   * @throws ContextClientCodeException If the context can't be instantiate due to user code / configuration issues.
197   * @throws IllegalStateException      If this method is called when there is either a task or child context already
198   *                                    present.
199   */
200  ContextRuntime spawnChildContext(
201      final Configuration contextConfiguration) throws ContextClientCodeException {
202
203    synchronized (this.contextLifeCycle) {
204
205      if (this.task.isPresent()) {
206        throw new IllegalStateException(
207            "Attempting to to spawn a child context while a Task with id '" +
208                this.task.get().getId() + "' is running.");
209      }
210
211      if (this.childContext.isPresent()) {
212        throw new IllegalStateException(
213            "Attempting to spawn a child context on a context that is not the topmost active context");
214      }
215
216      final Injector childServiceInjector = this.serviceInjector.forkInjector();
217      final ContextRuntime newChildContext =
218          new ContextRuntime(childServiceInjector, contextConfiguration, Optional.of(this));
219
220      this.childContext = Optional.of(newChildContext);
221      return newChildContext;
222    }
223  }
224
225  /**
226   * Launches a Task on this context.
227   *
228   * @param taskConfig the configuration to be used for the task.
229   * @throws org.apache.reef.runtime.common.evaluator.task.TaskClientCodeException If the Task cannot be instantiated
230   * due to user code / configuration issues.
231   * @throws IllegalStateException                                                 If this method is called when
232   * there is either a task or child context already present.
233   */
234  @SuppressWarnings("checkstyle:illegalcatch")
235  void startTask(final Configuration taskConfig) throws TaskClientCodeException {
236
237    synchronized (this.contextLifeCycle) {
238
239      if (this.task.isPresent() && this.task.get().hasEnded()) {
240        // clean up state
241        this.task = Optional.empty();
242      }
243
244      if (this.task.isPresent()) {
245        throw new IllegalStateException("Attempting to start a Task when a Task with id '" +
246            this.task.get().getId() + "' is running.");
247      }
248
249      if (this.childContext.isPresent()) {
250        throw new IllegalStateException(
251            "Attempting to start a Task on a context that is not the topmost active context");
252      }
253
254      try {
255        final Injector taskInjector = this.contextInjector.forkInjector(taskConfig);
256        final TaskRuntime taskRuntime = taskInjector.getInstance(TaskRuntime.class);
257        taskRuntime.initialize();
258        this.taskRuntimeThread = new Thread(taskRuntime, taskRuntime.getId());
259        this.taskRuntimeThread.start();
260        this.task = Optional.of(taskRuntime);
261        LOG.log(Level.FINEST, "Started task: {0}", taskRuntime.getTaskId());
262      } catch (final BindException | InjectionException e) {
263        throw new TaskClientCodeException(TaskClientCodeException.getTaskId(taskConfig),
264            this.getIdentifier(),
265            "Unable to instantiate the new task", e);
266      } catch (final Throwable t) {
267        throw new TaskClientCodeException(TaskClientCodeException.getTaskId(taskConfig),
268            this.getIdentifier(),
269            "Unable to start the new task", t);
270      }
271    }
272  }
273
274  /**
275   * Close this context. If there is a child context, this recursively closes it before closing this context. If
276   * there is a Task currently running, that will be closed.
277   */
278  void close() {
279
280    synchronized (this.contextLifeCycle) {
281
282      this.contextState = ReefServiceProtos.ContextStatusProto.State.DONE;
283
284      if (this.task.isPresent()) {
285        LOG.log(Level.WARNING, "Shutting down a task because the underlying context is being closed.");
286        this.task.get().close(null);
287      }
288
289      if (this.childContext.isPresent()) {
290        LOG.log(Level.WARNING, "Closing a context because its parent context is being closed.");
291        this.childContext.get().close();
292      }
293
294      this.contextLifeCycle.close();
295
296      if (this.parentContext.isPresent()) {
297        this.parentContext.get().resetChildContext();
298      }
299    }
300  }
301
302  /**
303   * @return the parent context, if there is one.
304   */
305  Optional<ContextRuntime> getParentContext() {
306    return this.parentContext;
307  }
308
309  /**
310   * Deliver the given message to the Task.
311   * <p>
312   * Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING
313   * in the log.
314   *
315   * @param message the suspend message to deliver or null if there is none.
316   */
317  void suspendTask(final byte[] message) {
318    synchronized (this.contextLifeCycle) {
319      if (!this.task.isPresent()) {
320        LOG.log(Level.WARNING, "Received a suspend task while there was no task running. Ignoring.");
321      } else {
322        this.task.get().suspend(message);
323      }
324    }
325  }
326
327  /**
328   * Issue a close call to the Task
329   * <p>
330   * Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING
331   * in the log.
332   *
333   * @param message the close  message to deliver or null if there is none.
334   */
335  void closeTask(final byte[] message) {
336    synchronized (this.contextLifeCycle) {
337      if (!this.task.isPresent()) {
338        LOG.log(Level.WARNING, "Received a close task while there was no task running. Ignoring.");
339      } else {
340        this.task.get().close(message);
341      }
342    }
343  }
344
345  /**
346   * Deliver a message to the Task
347   * <p>
348   * Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING
349   * in the log.
350   *
351   * @param message the close  message to deliver or null if there is none.
352   */
353  void deliverTaskMessage(final byte[] message) {
354    synchronized (this.contextLifeCycle) {
355      if (!this.task.isPresent()) {
356        LOG.log(Level.WARNING, "Received a task message while there was no task running. Ignoring.");
357      } else {
358        this.task.get().deliver(message);
359      }
360    }
361  }
362
363  /**
364   * @return the identifier of this context.
365   */
366  String getIdentifier() {
367    return this.contextLifeCycle.getIdentifier();
368  }
369
370  /**
371   * Handle the context message.
372   *
373   * @param message sent by the driver
374   */
375  void handleContextMessage(final byte[] message) {
376    this.contextLifeCycle.handleContextMessage(message);
377  }
378
379  /**
380   * @return the state of the running Task, if one is running.
381   */
382  Optional<ReefServiceProtos.TaskStatusProto> getTaskStatus() {
383    synchronized (this.contextLifeCycle) {
384      if (this.task.isPresent()) {
385        if (this.task.get().hasEnded()) {
386          this.task = Optional.empty();
387          return Optional.empty();
388        } else {
389          return Optional.of(this.task.get().getStatusProto());
390        }
391      } else {
392        return Optional.empty();
393      }
394    }
395  }
396
397  /**
398   * Called by the child context when it has been closed.
399   */
400  private void resetChildContext() {
401    synchronized (this.contextLifeCycle) {
402      if (this.childContext.isPresent()) {
403        this.childContext = Optional.empty();
404      } else {
405        throw new IllegalStateException("no child context set");
406      }
407    }
408  }
409
410  /**
411   * @return this context's status in protocol buffer form.
412   */
413  ReefServiceProtos.ContextStatusProto getContextStatus() {
414
415    synchronized (this.contextLifeCycle) {
416
417      final ReefServiceProtos.ContextStatusProto.Builder builder =
418          ReefServiceProtos.ContextStatusProto.newBuilder()
419              .setContextId(this.getIdentifier())
420              .setContextState(this.contextState);
421
422      if (this.parentContext.isPresent()) {
423        builder.setParentId(this.parentContext.get().getIdentifier());
424      }
425
426      for (final ContextMessageSource contextMessageSource : this.contextLifeCycle.getContextMessageSources()) {
427        final Optional<ContextMessage> contextMessageOptional = contextMessageSource.getMessage();
428        if (contextMessageOptional.isPresent()) {
429          builder.addContextMessage(ReefServiceProtos.ContextStatusProto.ContextMessageProto.newBuilder()
430              .setSourceId(contextMessageOptional.get().getMessageSourceID())
431              .setMessage(ByteString.copyFrom(contextMessageOptional.get().get()))
432              .build());
433        }
434      }
435
436      return builder.build();
437    }
438  }
439}