001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.local.process; 020 021import net.jcip.annotations.ThreadSafe; 022import org.apache.reef.proto.DriverRuntimeProtocol; 023import org.apache.reef.proto.ReefServiceProtos; 024import org.apache.reef.runtime.common.driver.api.RuntimeParameters; 025import org.apache.reef.runtime.local.driver.ResourceManager; 026import org.apache.reef.tang.InjectionFuture; 027import org.apache.reef.tang.annotations.Parameter; 028import org.apache.reef.wake.EventHandler; 029 030import javax.inject.Inject; 031import java.util.logging.Level; 032import java.util.logging.Logger; 033 034/** 035 * a RunnableProcessObserver that relies events to REEF's ResourceStatusHandler 036 */ 037@ThreadSafe 038public final class ReefRunnableProcessObserver implements RunnableProcessObserver { 039 private static final Logger LOG = Logger.getLogger(ReefRunnableProcessObserver.class.getName()); 040 041 private final EventHandler<DriverRuntimeProtocol.ResourceStatusProto> resourceStatusHandler; 042 private final InjectionFuture<ResourceManager> resourceManager; 043 044 /** 045 * @param resourceStatusHandler the event handler to inform of resource changes. 046 */ 047 @Inject 048 public ReefRunnableProcessObserver(final @Parameter(RuntimeParameters.ResourceStatusHandler.class) 049 EventHandler<DriverRuntimeProtocol.ResourceStatusProto> resourceStatusHandler, 050 final InjectionFuture<ResourceManager> resourceManager) { 051 this.resourceStatusHandler = resourceStatusHandler; 052 this.resourceManager = resourceManager; 053 } 054 055 @Override 056 public void onProcessStarted(final String processId) { 057 this.onResourceStatus( 058 DriverRuntimeProtocol.ResourceStatusProto.newBuilder() 059 .setIdentifier(processId) 060 .setState(ReefServiceProtos.State.RUNNING) 061 .build() 062 ); 063 } 064 065 @Override 066 public void onProcessExit(final String processId, final int exitCode) { 067 // Note that the order here matters: We need to first inform the Driver's event handlers about the process exit 068 // and then release the resources. Otherwise, the Driver might be shutdown because of an idle condition before the 069 // message about the evaluator exit could have been sent and processed. 070 switch (exitCode) { 071 case 0: 072 this.onCleanExit(processId); 073 break; 074 default: 075 this.onUncleanExit(processId, exitCode); 076 } 077 this.resourceManager.get().onEvaluatorExit(processId); 078 } 079 080 /** 081 * Inform REEF of a cleanly exited process. 082 * 083 * @param processId 084 */ 085 private void onCleanExit(final String processId) { 086 this.onResourceStatus( 087 DriverRuntimeProtocol.ResourceStatusProto.newBuilder() 088 .setIdentifier(processId) 089 .setState(ReefServiceProtos.State.DONE) 090 .setExitCode(0) 091 .build() 092 ); 093 } 094 095 /** 096 * Inform REEF of an unclean process exit 097 * 098 * @param processId 099 * @param exitCode 100 */ 101 private void onUncleanExit(final String processId, final int exitCode) { 102 this.onResourceStatus( 103 DriverRuntimeProtocol.ResourceStatusProto.newBuilder() 104 .setIdentifier(processId) 105 .setState(ReefServiceProtos.State.FAILED) 106 .setExitCode(exitCode) 107 .build() 108 ); 109 } 110 111 private void onResourceStatus(final DriverRuntimeProtocol.ResourceStatusProto resourceStatus) { 112 LOG.log(Level.INFO, "Sending resource status: {0} ", resourceStatus); 113 114 // Here, we introduce an arbitrary wait. This is to make sure that at the exit of an Evaluator, the last 115 // heartbeat from that Evaluator arrives before this message. This makes sure that the local runtime behaves like 116 // a resource manager with regard to that timing. 117 try { 118 Thread.sleep(100); 119 } catch (final InterruptedException e) { 120 LOG.log(Level.FINEST, "Sleep interrupted. Event will be fired earlier than usual."); 121 } 122 this.resourceStatusHandler.onNext(resourceStatus); 123 } 124 125}