This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.local.process;
020
021import net.jcip.annotations.ThreadSafe;
022import org.apache.reef.proto.DriverRuntimeProtocol;
023import org.apache.reef.proto.ReefServiceProtos;
024import org.apache.reef.runtime.common.driver.api.RuntimeParameters;
025import org.apache.reef.runtime.local.driver.ResourceManager;
026import org.apache.reef.tang.InjectionFuture;
027import org.apache.reef.tang.annotations.Parameter;
028import org.apache.reef.wake.EventHandler;
029
030import javax.inject.Inject;
031import java.util.logging.Level;
032import java.util.logging.Logger;
033
034/**
035 * a RunnableProcessObserver that relies events to REEF's ResourceStatusHandler
036 */
037@ThreadSafe
038public final class ReefRunnableProcessObserver implements RunnableProcessObserver {
039  private static final Logger LOG = Logger.getLogger(ReefRunnableProcessObserver.class.getName());
040
041  private final EventHandler<DriverRuntimeProtocol.ResourceStatusProto> resourceStatusHandler;
042  private final InjectionFuture<ResourceManager> resourceManager;
043
044  /**
045   * @param resourceStatusHandler the event handler to inform of resource changes.
046   */
047  @Inject
048  public ReefRunnableProcessObserver(final @Parameter(RuntimeParameters.ResourceStatusHandler.class)
049                                     EventHandler<DriverRuntimeProtocol.ResourceStatusProto> resourceStatusHandler,
050                                     final InjectionFuture<ResourceManager> resourceManager) {
051    this.resourceStatusHandler = resourceStatusHandler;
052    this.resourceManager = resourceManager;
053  }
054
055  @Override
056  public void onProcessStarted(final String processId) {
057    this.onResourceStatus(
058        DriverRuntimeProtocol.ResourceStatusProto.newBuilder()
059            .setIdentifier(processId)
060            .setState(ReefServiceProtos.State.RUNNING)
061            .build()
062    );
063  }
064
065  @Override
066  public void onProcessExit(final String processId, final int exitCode) {
067    // Note that the order here matters: We need to first inform the Driver's event handlers about the process exit
068    // and then release the resources. Otherwise, the Driver might be shutdown because of an idle condition before the
069    // message about the evaluator exit could have been sent and processed.
070    switch (exitCode) {
071      case 0:
072        this.onCleanExit(processId);
073        break;
074      default:
075        this.onUncleanExit(processId, exitCode);
076    }
077    this.resourceManager.get().onEvaluatorExit(processId);
078  }
079
080  /**
081   * Inform REEF of a cleanly exited process.
082   *
083   * @param processId
084   */
085  private void onCleanExit(final String processId) {
086    this.onResourceStatus(
087        DriverRuntimeProtocol.ResourceStatusProto.newBuilder()
088            .setIdentifier(processId)
089            .setState(ReefServiceProtos.State.DONE)
090            .setExitCode(0)
091            .build()
092    );
093  }
094
095  /**
096   * Inform REEF of an unclean process exit
097   *
098   * @param processId
099   * @param exitCode
100   */
101  private void onUncleanExit(final String processId, final int exitCode) {
102    this.onResourceStatus(
103        DriverRuntimeProtocol.ResourceStatusProto.newBuilder()
104            .setIdentifier(processId)
105            .setState(ReefServiceProtos.State.FAILED)
106            .setExitCode(exitCode)
107            .build()
108    );
109  }
110
111  private void onResourceStatus(final DriverRuntimeProtocol.ResourceStatusProto resourceStatus) {
112    LOG.log(Level.INFO, "Sending resource status: {0} ", resourceStatus);
113
114    // Here, we introduce an arbitrary wait. This is to make sure that at the exit of an Evaluator, the last
115    // heartbeat from that Evaluator arrives before this message. This makes sure that the local runtime behaves like
116    // a resource manager with regard to that timing.
117    try {
118      Thread.sleep(100);
119    } catch (final InterruptedException e) {
120      LOG.log(Level.FINEST, "Sleep interrupted. Event will be fired earlier than usual.");
121    }
122    this.resourceStatusHandler.onNext(resourceStatus);
123  }
124
125}