001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.driver.context;
020
021import net.jcip.annotations.GuardedBy;
022import net.jcip.annotations.ThreadSafe;
023import org.apache.reef.annotations.audience.DriverSide;
024import org.apache.reef.annotations.audience.Private;
025import org.apache.reef.driver.context.FailedContext;
026import org.apache.reef.driver.restart.DriverRestartManager;
027import org.apache.reef.driver.restart.EvaluatorRestartState;
028import org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher;
029import org.apache.reef.runtime.common.driver.evaluator.pojos.ContextMessagePOJO;
030import org.apache.reef.runtime.common.driver.evaluator.pojos.ContextState;
031import org.apache.reef.runtime.common.driver.evaluator.pojos.ContextStatusPOJO;
032import org.apache.reef.util.Optional;
033
034import javax.inject.Inject;
035import java.util.*;
036import java.util.logging.Level;
037import java.util.logging.Logger;
038
039/**
040 * Driver-Side representation of all contexts on an Evaluator.
041 */
042@ThreadSafe
043@DriverSide
044@Private
045public final class ContextRepresenters {
046  private static final Logger LOG = Logger.getLogger(ContextRepresenters.class.getName());
047
048  private final EvaluatorMessageDispatcher messageDispatcher;
049  private final ContextFactory contextFactory;
050
051  // Mutable fields
052  @GuardedBy("this")
053  private final List<EvaluatorContext> contextStack = new ArrayList<>();
054  @GuardedBy("this")
055  private final Set<String> contextIds = new HashSet<>();
056
057  private final DriverRestartManager driverRestartManager;
058
059  @Inject
060  private ContextRepresenters(final EvaluatorMessageDispatcher messageDispatcher,
061                              final ContextFactory contextFactory,
062                              final DriverRestartManager driverRestartManager) {
063    this.messageDispatcher = messageDispatcher;
064    this.contextFactory = contextFactory;
065    this.driverRestartManager = driverRestartManager;
066  }
067
068  /**
069   * Fetch the context with the given ID.
070   *
071   * @param contextId
072   * @return
073   */
074  public synchronized EvaluatorContext getContext(final String contextId) {
075    for (final EvaluatorContext context : this.contextStack) {
076      if (context.getId().equals(contextId)) {
077        return context;
078      }
079    }
080    throw new RuntimeException("Unknown evaluator context " + contextId);
081  }
082
083  /**
084   * Create the failed contexts for a FailedEvaluator event.
085   *
086   * @return
087   */
088  public synchronized List<FailedContext> getFailedContextsForEvaluatorFailure() {
089    final List<FailedContext> failedContextList = new ArrayList<>();
090    final List<EvaluatorContext> activeContexts = new ArrayList<>(this.contextStack);
091    Collections.reverse(activeContexts);
092
093    for (final EvaluatorContext context : activeContexts) {
094      failedContextList.add(context.getFailedContextForEvaluatorFailure());
095    }
096    return failedContextList;
097  }
098
099  /**
100   * Process heartbeats from the contexts on an Evaluator.
101   *
102   * @param contextStatusPOJOs
103   * @param notifyClientOnNewActiveContext
104   */
105  public synchronized void onContextStatusMessages(final Iterable<ContextStatusPOJO>
106                                                       contextStatusPOJOs,
107                                                   final boolean notifyClientOnNewActiveContext) {
108    for (final ContextStatusPOJO contextStatus : contextStatusPOJOs) {
109      this.onContextStatusMessage(contextStatus, notifyClientOnNewActiveContext);
110    }
111  }
112
113
114  /**
115   * Process a heartbeat from a context.
116   *
117   * @param contextStatus
118   * @param notifyClientOnNewActiveContext
119   */
120  private synchronized void onContextStatusMessage(final ContextStatusPOJO contextStatus,
121                                                   final boolean notifyClientOnNewActiveContext) {
122
123    LOG.log(Level.FINER, "Processing context status message for context {0}", contextStatus.getContextId());
124    switch (contextStatus.getContextState()) {
125    case READY:
126      this.onContextReady(contextStatus, notifyClientOnNewActiveContext);
127      break;
128    case FAIL:
129      this.onContextFailed(contextStatus);
130      break;
131    case DONE:
132      this.onContextDone(contextStatus);
133      break;
134    default:
135      this.onUnknownContextStatus(contextStatus);
136      break;
137    }
138    LOG.log(Level.FINER, "Done processing context status message for context {0}", contextStatus.getContextId());
139
140  }
141
142
143  private synchronized void onUnknownContextStatus(final ContextStatusPOJO contextStatus) {
144    LOG.log(Level.WARNING, "Received unexpected context status: {0}", contextStatus);
145    throw new RuntimeException("Received unexpected context status: " + contextStatus.getContextState());
146  }
147
148  private synchronized void onContextFailed(final ContextStatusPOJO contextStatus) {
149    assert ContextState.FAIL == contextStatus.getContextState();
150    final String contextID = contextStatus.getContextId();
151    LOG.log(Level.FINE, "Context {0} failed", contextID);
152    // It could have failed right away.
153    if (this.isUnknownContextId(contextID)) {
154      this.onNewContext(contextStatus, false);
155    }
156    final EvaluatorContext context = getContext(contextID);
157    this.removeContext(context);
158    this.messageDispatcher.onContextFailed(context.getFailedContext(contextStatus));
159  }
160
161  private synchronized void onContextDone(final ContextStatusPOJO contextStatus) {
162    assert ContextState.DONE == contextStatus.getContextState();
163    final String contextID = contextStatus.getContextId();
164    if (isUnknownContextId(contextID)) {
165      throw new RuntimeException("Received DONE for context " + contextID + " which is unknown.");
166    } else {
167      LOG.log(Level.FINE, "Context {0} is DONE.", contextID);
168      final EvaluatorContext context = getContext(contextID);
169      removeContext(context);
170
171      if (context.isRootContext()) {
172        LOG.log(Level.FINE, "Root context {0} closed. Evaluator closed will trigger final shutdown.", contextID);
173      } else {
174        final EvaluatorContext parentContext = this.getContext(context.getParentId().get());
175        this.messageDispatcher.onContextClose(context.getClosedContext(parentContext));
176      }
177    }
178  }
179
180  /**
181   * Process a message with status READY from a context.
182   *
183   * @param contextStatus
184   * @param notifyClientOnNewActiveContext whether or not to inform the application when this in fact refers to a new
185   *                                       context.
186   */
187  private synchronized void onContextReady(final ContextStatusPOJO contextStatus,
188                                           final boolean notifyClientOnNewActiveContext) {
189    assert ContextState.READY == contextStatus.getContextState();
190    final String contextID = contextStatus.getContextId();
191    // This could be the first message we get from that context
192    if (this.isUnknownContextId(contextID)) {
193      this.onNewContext(contextStatus, notifyClientOnNewActiveContext);
194    }
195
196    // Dispatch the messages to the application, if there are any.
197    for (final ContextMessagePOJO
198             contextMessage : contextStatus.getContextMessageList()) {
199      final byte[] theMessage = contextMessage.getMessage();
200      final String sourceID = contextMessage.getSourceId();
201      final long sequenceNumber = contextMessage.getSequenceNumber();
202      this.messageDispatcher.onContextMessage(new ContextMessageImpl(theMessage, contextID, sourceID, sequenceNumber));
203    }
204
205  }
206
207  /**
208   * Create and add a new context representer.
209   *
210   * @param contextStatus             the message to create the context from
211   * @param notifyClientOnNewActiveContext whether or not to fire an event to the user.
212   */
213  private synchronized void onNewContext(final ContextStatusPOJO contextStatus,
214                                         final boolean notifyClientOnNewActiveContext) {
215    final String contextID = contextStatus.getContextId();
216    LOG.log(Level.FINE, "Adding new context {0}.", contextID);
217
218    final Optional<String> parentID = contextStatus.hasParentId() ?
219        Optional.of(contextStatus.getParentId()) : Optional.<String>empty();
220    final EvaluatorContext context = contextFactory.newContext(contextID, parentID);
221    this.addContext(context);
222    if (notifyClientOnNewActiveContext) {
223      if (driverRestartManager.getEvaluatorRestartState(context.getEvaluatorId())
224          == EvaluatorRestartState.REREGISTERED) {
225        // if restart, advance restart state and all the restart context active handlers.
226        driverRestartManager.setEvaluatorProcessed(context.getEvaluatorId());
227        this.messageDispatcher.onDriverRestartContextActive(context);
228      }
229
230      this.messageDispatcher.onContextActive(context);
231    }
232  }
233
234  /**
235   * Add the given context to the data structures.
236   *
237   * @param context
238   */
239  private synchronized void addContext(final EvaluatorContext context) {
240    this.contextStack.add(context);
241    this.contextIds.add(context.getId());
242  }
243
244  /**
245   * Remove the given context from the data structures.
246   *
247   * @param context
248   */
249  private synchronized void removeContext(final EvaluatorContext context) {
250    this.contextStack.remove(context);
251    this.contextIds.remove(context.getId());
252  }
253
254  /**
255   * @param contextId
256   * @return true if the given context id is unknown so far.
257   */
258  private synchronized boolean isUnknownContextId(final String contextId) {
259    return !this.contextIds.contains(contextId);
260  }
261
262}