001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.evaluator.context;
020
021import com.google.protobuf.ByteString;
022import org.apache.reef.annotations.Provided;
023import org.apache.reef.annotations.audience.EvaluatorSide;
024import org.apache.reef.annotations.audience.Private;
025import org.apache.reef.proto.EvaluatorRuntimeProtocol;
026import org.apache.reef.proto.ReefServiceProtos;
027import org.apache.reef.runtime.common.evaluator.HeartBeatManager;
028import org.apache.reef.runtime.common.evaluator.task.TaskClientCodeException;
029import org.apache.reef.runtime.common.utils.ExceptionCodec;
030import org.apache.reef.tang.Configuration;
031import org.apache.reef.tang.InjectionFuture;
032import org.apache.reef.tang.exceptions.BindException;
033import org.apache.reef.tang.formats.ConfigurationSerializer;
034import org.apache.reef.util.Optional;
035
036import javax.inject.Inject;
037import java.io.IOException;
038import java.util.ArrayList;
039import java.util.Collection;
040import java.util.List;
041import java.util.Stack;
042import java.util.logging.Level;
043import java.util.logging.Logger;
044
045/**
046 * Manages the stack of context in the Evaluator.
047 */
048@Private
049@EvaluatorSide
050@Provided
051public final class ContextManager implements AutoCloseable {
052
053  private static final Logger LOG = Logger.getLogger(ContextManager.class.getName());
054
055  /**
056   * The stack of context.
057   */
058  private final Stack<ContextRuntime> contextStack = new Stack<>();
059
060  /**
061   * Used to instantiate the root context.
062   */
063  private final InjectionFuture<RootContextLauncher> launchContext;
064
065  /**
066   * Used for status reporting to the Driver.
067   */
068  private final HeartBeatManager heartBeatManager;
069
070  /**
071   * To serialize Configurations.
072   */
073  private final ConfigurationSerializer configurationSerializer;
074
075  private final ExceptionCodec exceptionCodec;
076
077  /**
078   * @param launchContext           to instantiate the root context.
079   * @param heartBeatManager        for status reporting to the Driver.
080   * @param configurationSerializer
081   * @param exceptionCodec
082   */
083  @Inject
084  ContextManager(final InjectionFuture<RootContextLauncher> launchContext,
085                 final HeartBeatManager heartBeatManager,
086                 final ConfigurationSerializer configurationSerializer,
087                 final ExceptionCodec exceptionCodec) {
088    this.launchContext = launchContext;
089    this.heartBeatManager = heartBeatManager;
090    this.configurationSerializer = configurationSerializer;
091    this.exceptionCodec = exceptionCodec;
092  }
093
094  /**
095   * Start the context manager. This initiates the root context.
096   *
097   * @throws ContextClientCodeException if the root context can't be instantiated.
098   */
099  public void start() throws ContextClientCodeException {
100    synchronized (this.contextStack) {
101      LOG.log(Level.FINEST, "Instantiating root context.");
102      this.contextStack.push(this.launchContext.get().getRootContext());
103
104      if (this.launchContext.get().getInitialTaskConfiguration().isPresent()) {
105        LOG.log(Level.FINEST, "Launching the initial Task");
106        try {
107          this.contextStack.peek().startTask(this.launchContext.get().getInitialTaskConfiguration().get());
108        } catch (final TaskClientCodeException e) {
109          this.handleTaskException(e);
110        }
111      }
112    }
113  }
114
115  /**
116   * Shuts down. This forecefully kills the Task if there is one and then shuts down all Contexts on the stack,
117   * starting at the top.
118   */
119  @Override
120  public void close() {
121    synchronized (this.contextStack) {
122      if (!this.contextStackIsEmpty()) {
123        this.contextStack.lastElement().close();
124      }
125    }
126  }
127
128  /**
129   * @return true if there is no context.
130   */
131  public boolean contextStackIsEmpty() {
132    synchronized (this.contextStack) {
133      return this.contextStack.isEmpty();
134    }
135  }
136
137  /**
138   * Processes the given ContextControlProto to launch / close / suspend Tasks and Contexts.
139   * <p>
140   * This also triggers the HeartBeatManager to send a heartbeat with the result of this operation.
141   *
142   * @param controlMessage the message to process
143   */
144  public void handleContextControlProtocol(
145      final EvaluatorRuntimeProtocol.ContextControlProto controlMessage) {
146
147    synchronized (this.heartBeatManager) {
148      try {
149        if (controlMessage.hasAddContext() && controlMessage.hasRemoveContext()) {
150          throw new IllegalArgumentException(
151              "Received a message with both add and remove context. This is unsupported.");
152        }
153
154        final byte[] message = controlMessage.hasTaskMessage() ?
155            controlMessage.getTaskMessage().toByteArray() : null;
156
157        if (controlMessage.hasAddContext()) {
158          this.addContext(controlMessage.getAddContext());
159          if (controlMessage.hasStartTask()) {
160            // We support submitContextAndTask()
161            this.startTask(controlMessage.getStartTask());
162          } else {
163            // We need to trigger a heartbeat here.
164            // In other cases, the heartbeat will be triggered by the TaskRuntime
165            // Therefore this call can not go into addContext.
166            this.heartBeatManager.sendHeartbeat();
167          }
168        } else if (controlMessage.hasRemoveContext()) {
169          this.removeContext(controlMessage.getRemoveContext().getContextId());
170        } else if (controlMessage.hasStartTask()) {
171          this.startTask(controlMessage.getStartTask());
172        } else if (controlMessage.hasStopTask()) {
173          this.contextStack.peek().closeTask(message);
174        } else if (controlMessage.hasSuspendTask()) {
175          this.contextStack.peek().suspendTask(message);
176        } else if (controlMessage.hasTaskMessage()) {
177          this.contextStack.peek().deliverTaskMessage(message);
178        } else if (controlMessage.hasContextMessage()) {
179          final EvaluatorRuntimeProtocol.ContextMessageProto contextMessageProto = controlMessage.getContextMessage();
180          boolean deliveredMessage = false;
181          for (final ContextRuntime context : this.contextStack) {
182            if (context.getIdentifier().equals(contextMessageProto.getContextId())) {
183              context.handleContextMessage(contextMessageProto.getMessage().toByteArray());
184              deliveredMessage = true;
185              break;
186            }
187          }
188          if (!deliveredMessage) {
189            throw new IllegalStateException(
190                "Sent message to unknown context " + contextMessageProto.getContextId());
191          }
192        } else {
193          throw new RuntimeException("Unknown task control message: " + controlMessage);
194        }
195      } catch (final TaskClientCodeException e) {
196        this.handleTaskException(e);
197      } catch (final ContextClientCodeException e) {
198        this.handleContextException(e);
199      }
200    }
201
202  }
203
204  /**
205   * @return the TaskStatusProto of the currently running task, if there is any
206   */
207  public Optional<ReefServiceProtos.TaskStatusProto> getTaskStatus() {
208    synchronized (this.contextStack) {
209      if (this.contextStack.isEmpty()) {
210        throw new RuntimeException(
211            "Asked for a Task status while there isn't even a context running.");
212      }
213      return this.contextStack.peek().getTaskStatus();
214    }
215  }
216
217  /**
218   * @return the status of all context in the stack.
219   */
220  public Collection<ReefServiceProtos.ContextStatusProto> getContextStatusCollection() {
221    synchronized (this.contextStack) {
222      final List<ReefServiceProtos.ContextStatusProto> result = new ArrayList<>(this.contextStack.size());
223      for (final ContextRuntime contextRuntime : this.contextStack) {
224        final ReefServiceProtos.ContextStatusProto contextStatusProto = contextRuntime.getContextStatus();
225        LOG.log(Level.FINEST, "Add context status: {0}", contextStatusProto);
226        result.add(contextStatusProto);
227      }
228      return result;
229    }
230  }
231
232  /**
233   * Add a context to the stack.
234   *
235   * @param addContextProto
236   * @throws ContextClientCodeException if there is a client code related issue.
237   */
238  private void addContext(
239      final EvaluatorRuntimeProtocol.AddContextProto addContextProto)
240      throws ContextClientCodeException {
241
242    synchronized (this.contextStack) {
243      try {
244
245        final ContextRuntime currentTopContext = this.contextStack.peek();
246
247        if (!currentTopContext.getIdentifier().equals(addContextProto.getParentContextId())) {
248          throw new IllegalStateException("Trying to instantiate a child context on context with id `" +
249              addContextProto.getParentContextId() + "` while the current top context id is `" +
250              currentTopContext.getIdentifier() + "`");
251        }
252
253        final Configuration contextConfiguration =
254            this.configurationSerializer.fromString(addContextProto.getContextConfiguration());
255
256        final ContextRuntime newTopContext;
257        if (addContextProto.hasServiceConfiguration()) {
258          newTopContext = currentTopContext.spawnChildContext(contextConfiguration,
259              this.configurationSerializer.fromString(addContextProto.getServiceConfiguration()));
260        } else {
261          newTopContext = currentTopContext.spawnChildContext(contextConfiguration);
262        }
263
264        this.contextStack.push(newTopContext);
265
266      } catch (final IOException | BindException e) {
267        throw new RuntimeException("Unable to read configuration.", e);
268      }
269
270    }
271  }
272
273  /**
274   * Remove the context with the given ID from the stack.
275   *
276   * @throws IllegalStateException if the given ID does not refer to the top of stack.
277   */
278  private void removeContext(final String contextID) {
279
280    synchronized (this.contextStack) {
281
282      if (!contextID.equals(this.contextStack.peek().getIdentifier())) {
283        throw new IllegalStateException("Trying to close context with id `" + contextID +
284            "`. But the top context has id `" +
285            this.contextStack.peek().getIdentifier() + "`");
286      }
287
288      this.contextStack.peek().close();
289      if (this.contextStack.size() > 1) {
290        /* We did not close the root context. Therefore, we need to inform the
291         * driver explicitly that this context is closed. The root context notification
292         * is implicit in the Evaluator close/done notification.
293         */
294        this.heartBeatManager.sendHeartbeat(); // Ensure Driver gets notified of context DONE state
295      }
296      this.contextStack.pop();
297      /*
298      * At this moment, the Evaluator is actually idle and has some time till the Driver sends it additional work.
299      * Also, a potentially large object graph just became orphaned: all the objects instantiated by the context
300      * and service injectors can now be garbage collected. So GC call is justified.
301      * */
302      System.gc();
303    }
304  }
305
306  /**
307   * Launch a Task.
308   */
309  private void startTask(
310      final EvaluatorRuntimeProtocol.StartTaskProto startTaskProto) throws TaskClientCodeException {
311
312    synchronized (this.contextStack) {
313
314      final ContextRuntime currentActiveContext = this.contextStack.peek();
315
316      final String expectedContextId = startTaskProto.getContextId();
317      if (!expectedContextId.equals(currentActiveContext.getIdentifier())) {
318        throw new IllegalStateException("Task expected context `" + expectedContextId +
319            "` but the active context has ID `" + currentActiveContext.getIdentifier() + "`");
320      }
321
322      try {
323        final Configuration taskConfig =
324            this.configurationSerializer.fromString(startTaskProto.getConfiguration());
325        currentActiveContext.startTask(taskConfig);
326      } catch (IOException | BindException e) {
327        throw new RuntimeException("Unable to read configuration.", e);
328      }
329    }
330  }
331
332  /**
333   * THIS ASSUMES THAT IT IS CALLED ON A THREAD HOLDING THE LOCK ON THE HeartBeatManager.
334   */
335  private void handleTaskException(final TaskClientCodeException e) {
336
337    LOG.log(Level.SEVERE, "TaskClientCodeException", e);
338
339    final ByteString exception = ByteString.copyFrom(this.exceptionCodec.toBytes(e.getCause()));
340
341    final ReefServiceProtos.TaskStatusProto taskStatus =
342        ReefServiceProtos.TaskStatusProto.newBuilder()
343            .setContextId(e.getContextId())
344            .setTaskId(e.getTaskId())
345            .setResult(exception)
346            .setState(ReefServiceProtos.State.FAILED)
347            .build();
348
349    LOG.log(Level.SEVERE, "Sending heartbeat: {0}", taskStatus);
350
351    this.heartBeatManager.sendTaskStatus(taskStatus);
352  }
353
354  /**
355   * THIS ASSUMES THAT IT IS CALLED ON A THREAD HOLDING THE LOCK ON THE HeartBeatManager.
356   */
357  private void handleContextException(final ContextClientCodeException e) {
358
359    LOG.log(Level.SEVERE, "ContextClientCodeException", e);
360
361    final ByteString exception = ByteString.copyFrom(this.exceptionCodec.toBytes(e.getCause()));
362
363    final ReefServiceProtos.ContextStatusProto.Builder contextStatusBuilder =
364        ReefServiceProtos.ContextStatusProto.newBuilder()
365            .setContextId(e.getContextID())
366            .setContextState(ReefServiceProtos.ContextStatusProto.State.FAIL)
367            .setError(exception);
368
369    if (e.getParentID().isPresent()) {
370      contextStatusBuilder.setParentId(e.getParentID().get());
371    }
372
373    final ReefServiceProtos.ContextStatusProto contextStatus = contextStatusBuilder.build();
374
375    LOG.log(Level.SEVERE, "Sending heartbeat: {0}", contextStatus);
376
377    this.heartBeatManager.sendContextStatus(contextStatus);
378  }
379}