This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.driver.context;
020
021import net.jcip.annotations.GuardedBy;
022import net.jcip.annotations.ThreadSafe;
023import org.apache.reef.annotations.audience.DriverSide;
024import org.apache.reef.annotations.audience.Private;
025import org.apache.reef.driver.context.FailedContext;
026import org.apache.reef.proto.ReefServiceProtos;
027import org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher;
028import org.apache.reef.util.Optional;
029
030import javax.inject.Inject;
031import java.util.*;
032import java.util.logging.Level;
033import java.util.logging.Logger;
034
035/**
036 * Driver-Side representation of all contexts on an Evaluator.
037 */
038@ThreadSafe
039@DriverSide
040@Private
041public final class ContextRepresenters {
042  private static final Logger LOG = Logger.getLogger(ContextRepresenters.class.getName());
043
044  private final EvaluatorMessageDispatcher messageDispatcher;
045  private final ContextFactory contextFactory;
046
047  // Mutable fields
048  @GuardedBy("this")
049  private final List<EvaluatorContext> contextStack = new ArrayList<>();
050  @GuardedBy("this")
051  private final Set<String> contextIds = new HashSet<>();
052
053  @Inject
054  private ContextRepresenters(final EvaluatorMessageDispatcher messageDispatcher,
055                              final ContextFactory contextFactory) {
056    this.messageDispatcher = messageDispatcher;
057    this.contextFactory = contextFactory;
058  }
059
060  /**
061   * Fetch the context with the given ID.
062   *
063   * @param contextId
064   * @return
065   */
066  public synchronized EvaluatorContext getContext(final String contextId) {
067    for (final EvaluatorContext context : this.contextStack) {
068      if (context.getId().equals(contextId)) return context;
069    }
070    throw new RuntimeException("Unknown evaluator context " + contextId);
071  }
072
073  /**
074   * Create the failed contexts for a FailedEvaluator event.
075   *
076   * @return
077   */
078  public synchronized List<FailedContext> getFailedContextsForEvaluatorFailure() {
079    final List<FailedContext> failedContextList = new ArrayList<>();
080    final List<EvaluatorContext> activeContexts = new ArrayList<>(this.contextStack);
081    Collections.reverse(activeContexts);
082
083    for (final EvaluatorContext context : activeContexts) {
084      failedContextList.add(context.getFailedContextForEvaluatorFailure());
085    }
086    return failedContextList;
087  }
088
089  /**
090   * Process heartbeats from the contexts on an Evaluator.
091   *
092   * @param contextStatusProto
093   * @param notifyClientOnNewActiveContext
094   */
095  public synchronized void onContextStatusMessages(final Iterable<ReefServiceProtos.ContextStatusProto> contextStatusProtos,
096                                                   final boolean notifyClientOnNewActiveContext) {
097    for (final ReefServiceProtos.ContextStatusProto contextStatusProto : contextStatusProtos) {
098      this.onContextStatusMessage(contextStatusProto, notifyClientOnNewActiveContext);
099    }
100  }
101
102
103  /**
104   * Process a heartbeat from a context
105   *
106   * @param contextStatusProto
107   * @param notifyClientOnNewActiveContext
108   */
109  private synchronized void onContextStatusMessage(final ReefServiceProtos.ContextStatusProto contextStatusProto,
110                                                   final boolean notifyClientOnNewActiveContext) {
111
112    LOG.log(Level.FINER, "Processing context status message for context {0}", contextStatusProto.getContextId());
113    switch (contextStatusProto.getContextState()) {
114      case READY:
115        this.onContextReady(contextStatusProto, notifyClientOnNewActiveContext);
116        break;
117      case FAIL:
118        this.onContextFailed(contextStatusProto);
119        break;
120      case DONE:
121        this.onContextDone(contextStatusProto);
122        break;
123      default:
124        this.onUnknownContextStatus(contextStatusProto);
125        break;
126    }
127    LOG.log(Level.FINER, "Done processing context status message for context {0}", contextStatusProto.getContextId());
128
129  }
130
131
132  private synchronized void onUnknownContextStatus(final ReefServiceProtos.ContextStatusProto contextStatusProto) {
133    LOG.log(Level.WARNING, "Received unexpected context status: {0}", contextStatusProto);
134    throw new RuntimeException("Received unexpected context status: " + contextStatusProto.getContextState());
135  }
136
137  private synchronized void onContextFailed(final ReefServiceProtos.ContextStatusProto contextStatusProto) {
138    assert (ReefServiceProtos.ContextStatusProto.State.FAIL == contextStatusProto.getContextState());
139    final String contextID = contextStatusProto.getContextId();
140    LOG.log(Level.FINE, "Context {0} failed", contextID);
141    // It could have failed right away.
142    if (this.isUnknownContextId(contextID)) {
143      this.onNewContext(contextStatusProto, false);
144    }
145    final EvaluatorContext context = getContext(contextID);
146    this.removeContext(context);
147    this.messageDispatcher.onContextFailed(context.getFailedContext(contextStatusProto));
148  }
149
150  private synchronized void onContextDone(final ReefServiceProtos.ContextStatusProto contextStatusProto) {
151    assert (ReefServiceProtos.ContextStatusProto.State.DONE == contextStatusProto.getContextState());
152    final String contextID = contextStatusProto.getContextId();
153    if (isUnknownContextId(contextID)) {
154      throw new RuntimeException("Received DONE for context " + contextID + " which is unknown.");
155    } else {
156      LOG.log(Level.FINE, "Context {0} is DONE.", contextID);
157      final EvaluatorContext context = getContext(contextID);
158      removeContext(context);
159
160      if (context.isRootContext()) {
161        LOG.log(Level.FINE, "Root context {0} closed. Evaluator closed will trigger final shutdown.", contextID);
162      } else {
163        final EvaluatorContext parentContext = this.getContext(context.getParentId().get());
164        this.messageDispatcher.onContextClose(context.getClosedContext(parentContext));
165      }
166    }
167  }
168
169  /**
170   * Process a message with status READY from a context.
171   *
172   * @param contextStatusProto
173   * @param notifyClientOnNewActiveContext whether or not to inform the application when this in fact refers to a new
174   *                                       context.
175   */
176  private synchronized void onContextReady(final ReefServiceProtos.ContextStatusProto contextStatusProto,
177                                           final boolean notifyClientOnNewActiveContext) {
178    assert (ReefServiceProtos.ContextStatusProto.State.READY == contextStatusProto.getContextState());
179    final String contextID = contextStatusProto.getContextId();
180    // This could be the first message we get from that context
181    if (this.isUnknownContextId(contextID)) {
182      this.onNewContext(contextStatusProto, notifyClientOnNewActiveContext);
183    }
184
185    // Dispatch the messages to the application, if there are any.
186    for (final ReefServiceProtos.ContextStatusProto.ContextMessageProto contextMessageProto : contextStatusProto.getContextMessageList()) {
187      final byte[] theMessage = contextMessageProto.getMessage().toByteArray();
188      final String sourceID = contextMessageProto.getSourceId();
189      this.messageDispatcher.onContextMessage(new ContextMessageImpl(theMessage, contextID, sourceID));
190    }
191
192  }
193
194  /**
195   * Create and add a new context representer.
196   *
197   * @param contextStatusProto             the message to create the context from
198   * @param notifyClientOnNewActiveContext whether or not to fire an event to the user.
199   */
200  private synchronized void onNewContext(final ReefServiceProtos.ContextStatusProto contextStatusProto,
201                                         final boolean notifyClientOnNewActiveContext) {
202    final String contextID = contextStatusProto.getContextId();
203    LOG.log(Level.FINE, "Adding new context {0}.", contextID);
204
205    final Optional<String> parentID = contextStatusProto.hasParentId() ?
206        Optional.of(contextStatusProto.getParentId()) : Optional.<String>empty();
207    final EvaluatorContext context = contextFactory.newContext(contextID, parentID);
208    this.addContext(context);
209    if (contextStatusProto.getRecovery()) {
210      // when we get a recovered active context, always notify application
211      this.messageDispatcher.OnDriverRestartContextActive(context);
212    } else {
213      if (notifyClientOnNewActiveContext) {
214        this.messageDispatcher.onContextActive(context);
215      }
216    }
217  }
218
219  /**
220   * Add the given context to the data structures.
221   *
222   * @param context
223   */
224  private synchronized void addContext(final EvaluatorContext context) {
225    this.contextStack.add(context);
226    this.contextIds.add(context.getId());
227  }
228
229  /**
230   * Remove the given context from the data structures.
231   *
232   * @param context
233   */
234  private synchronized void removeContext(final EvaluatorContext context) {
235    this.contextStack.remove(context);
236    this.contextIds.remove(context.getId());
237  }
238
239  /**
240   * @param contextId
241   * @return true if the given context id is unknown so far.
242   */
243  private synchronized boolean isUnknownContextId(final String contextId) {
244    return !this.contextIds.contains(contextId);
245  }
246
247}