This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.local.process;
020
021import net.jcip.annotations.ThreadSafe;
022import org.apache.reef.runtime.common.driver.api.RuntimeParameters;
023import org.apache.reef.runtime.common.driver.evaluator.pojos.State;
024import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent;
025import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl;
026import org.apache.reef.runtime.local.driver.ResourceManager;
027import org.apache.reef.tang.InjectionFuture;
028import org.apache.reef.tang.annotations.Parameter;
029import org.apache.reef.wake.EventHandler;
030
031import javax.inject.Inject;
032import java.util.logging.Level;
033import java.util.logging.Logger;
034
035/**
036 * a RunnableProcessObserver that relies events to REEF's ResourceStatusHandler.
037 */
038@ThreadSafe
039public final class ReefRunnableProcessObserver implements RunnableProcessObserver {
040  private static final Logger LOG = Logger.getLogger(ReefRunnableProcessObserver.class.getName());
041
042  private final EventHandler<ResourceStatusEvent> resourceStatusHandler;
043  private final InjectionFuture<ResourceManager> resourceManager;
044
045  /**
046   * @param resourceStatusHandler the event handler to inform of resource changes.
047   */
048  @Inject
049  public ReefRunnableProcessObserver(@Parameter(RuntimeParameters.ResourceStatusHandler.class) final 
050                                     EventHandler<ResourceStatusEvent> resourceStatusHandler,
051                                     final InjectionFuture<ResourceManager> resourceManager) {
052    this.resourceStatusHandler = resourceStatusHandler;
053    this.resourceManager = resourceManager;
054  }
055
056  @Override
057  public void onProcessStarted(final String processId) {
058    this.onResourceStatus(
059        ResourceStatusEventImpl.newBuilder()
060            .setIdentifier(processId)
061            .setState(State.RUNNING)
062            .build()
063    );
064  }
065
066  @Override
067  public void onProcessExit(final String processId, final int exitCode) {
068    // Note that the order here matters: We need to first inform the Driver's event handlers about the process exit
069    // and then release the resources. Otherwise, the Driver might be shutdown because of an idle condition before the
070    // message about the evaluator exit could have been sent and processed.
071    switch (exitCode) {
072    case 0:
073      this.onCleanExit(processId);
074      break;
075    default:
076      this.onUncleanExit(processId, exitCode);
077    }
078    this.resourceManager.get().onEvaluatorExit(processId);
079  }
080
081  /**
082   * Inform REEF of a cleanly exited process.
083   *
084   * @param processId
085   */
086  private void onCleanExit(final String processId) {
087    this.onResourceStatus(
088        ResourceStatusEventImpl.newBuilder()
089            .setIdentifier(processId)
090            .setState(State.DONE)
091            .setExitCode(0)
092            .build()
093    );
094  }
095
096  /**
097   * Inform REEF of an unclean process exit.
098   *
099   * @param processId
100   * @param exitCode
101   */
102  private void onUncleanExit(final String processId, final int exitCode) {
103    this.onResourceStatus(
104        ResourceStatusEventImpl.newBuilder()
105            .setIdentifier(processId)
106            .setState(State.FAILED)
107            .setExitCode(exitCode)
108            .build()
109    );
110  }
111
112  private void onResourceStatus(final ResourceStatusEvent resourceStatus) {
113    LOG.log(Level.INFO, "Sending resource status: {0} ", resourceStatus);
114
115    // Here, we introduce an arbitrary wait. This is to make sure that at the exit of an Evaluator, the last
116    // heartbeat from that Evaluator arrives before this message. This makes sure that the local runtime behaves like
117    // a resource manager with regard to that timing.
118    try {
119      Thread.sleep(100);
120    } catch (final InterruptedException e) {
121      LOG.log(Level.FINEST, "Sleep interrupted. Event will be fired earlier than usual.");
122    }
123    this.resourceStatusHandler.onNext(resourceStatus);
124  }
125
126}