001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.local.process; 020 021import net.jcip.annotations.ThreadSafe; 022import org.apache.reef.runtime.common.driver.api.RuntimeParameters; 023import org.apache.reef.runtime.common.driver.evaluator.pojos.State; 024import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent; 025import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl; 026import org.apache.reef.runtime.local.driver.ResourceManager; 027import org.apache.reef.tang.InjectionFuture; 028import org.apache.reef.tang.annotations.Parameter; 029import org.apache.reef.wake.EventHandler; 030 031import javax.inject.Inject; 032import java.util.logging.Level; 033import java.util.logging.Logger; 034 035/** 036 * a RunnableProcessObserver that relies events to REEF's ResourceStatusHandler. 037 */ 038@ThreadSafe 039public final class ReefRunnableProcessObserver implements RunnableProcessObserver { 040 private static final Logger LOG = Logger.getLogger(ReefRunnableProcessObserver.class.getName()); 041 042 private final EventHandler<ResourceStatusEvent> resourceStatusHandler; 043 private final InjectionFuture<ResourceManager> resourceManager; 044 045 /** 046 * @param resourceStatusHandler the event handler to inform of resource changes. 047 */ 048 @Inject 049 public ReefRunnableProcessObserver(@Parameter(RuntimeParameters.ResourceStatusHandler.class) final 050 EventHandler<ResourceStatusEvent> resourceStatusHandler, 051 final InjectionFuture<ResourceManager> resourceManager) { 052 this.resourceStatusHandler = resourceStatusHandler; 053 this.resourceManager = resourceManager; 054 } 055 056 @Override 057 public void onProcessStarted(final String processId) { 058 this.onResourceStatus( 059 ResourceStatusEventImpl.newBuilder() 060 .setIdentifier(processId) 061 .setState(State.RUNNING) 062 .build() 063 ); 064 } 065 066 @Override 067 public void onProcessExit(final String processId, final int exitCode) { 068 // Note that the order here matters: We need to first inform the Driver's event handlers about the process exit 069 // and then release the resources. Otherwise, the Driver might be shutdown because of an idle condition before the 070 // message about the evaluator exit could have been sent and processed. 071 switch (exitCode) { 072 case 0: 073 this.onCleanExit(processId); 074 break; 075 default: 076 this.onUncleanExit(processId, exitCode); 077 } 078 this.resourceManager.get().onEvaluatorExit(processId); 079 } 080 081 /** 082 * Inform REEF of a cleanly exited process. 083 * 084 * @param processId 085 */ 086 private void onCleanExit(final String processId) { 087 this.onResourceStatus( 088 ResourceStatusEventImpl.newBuilder() 089 .setIdentifier(processId) 090 .setState(State.DONE) 091 .setExitCode(0) 092 .build() 093 ); 094 } 095 096 /** 097 * Inform REEF of an unclean process exit. 098 * 099 * @param processId 100 * @param exitCode 101 */ 102 private void onUncleanExit(final String processId, final int exitCode) { 103 this.onResourceStatus( 104 ResourceStatusEventImpl.newBuilder() 105 .setIdentifier(processId) 106 .setState(State.FAILED) 107 .setExitCode(exitCode) 108 .build() 109 ); 110 } 111 112 private void onResourceStatus(final ResourceStatusEvent resourceStatus) { 113 LOG.log(Level.INFO, "Sending resource status: {0} ", resourceStatus); 114 115 // Here, we introduce an arbitrary wait. This is to make sure that at the exit of an Evaluator, the last 116 // heartbeat from that Evaluator arrives before this message. This makes sure that the local runtime behaves like 117 // a resource manager with regard to that timing. 118 try { 119 Thread.sleep(100); 120 } catch (final InterruptedException e) { 121 LOG.log(Level.FINEST, "Sleep interrupted. Event will be fired earlier than usual."); 122 } 123 this.resourceStatusHandler.onNext(resourceStatus); 124 } 125 126}