001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.driver.resourcemanager; 020 021import org.apache.reef.annotations.audience.DriverSide; 022import org.apache.reef.annotations.audience.Private; 023import org.apache.reef.runtime.common.driver.DriverStatusManager; 024import org.apache.reef.runtime.common.driver.evaluator.pojos.State; 025import org.apache.reef.runtime.common.driver.idle.DriverIdleManager; 026import org.apache.reef.runtime.common.driver.idle.DriverIdlenessSource; 027import org.apache.reef.runtime.common.driver.idle.IdleMessage; 028import org.apache.reef.tang.InjectionFuture; 029import org.apache.reef.wake.EventHandler; 030 031import javax.inject.Inject; 032import java.util.logging.Level; 033import java.util.logging.Logger; 034 035/** 036 * Manages the status of the Resource Manager and tracks whether it is idle. 037 */ 038@DriverSide 039@Private 040public final class ResourceManagerStatus implements EventHandler<RuntimeStatusEvent>, DriverIdlenessSource { 041 042 private static final Logger LOG = Logger.getLogger(ResourceManagerStatus.class.getName()); 043 044 private static final String COMPONENT_NAME = "ResourceManager"; 045 046 private static final IdleMessage IDLE_MESSAGE = 047 new IdleMessage(COMPONENT_NAME, "No outstanding requests or allocations", true); 048 049 private final ResourceManagerErrorHandler resourceManagerErrorHandler; 050 private final DriverStatusManager driverStatusManager; 051 private final InjectionFuture<DriverIdleManager> driverIdleManager; 052 053 /** Mutable RM state. */ 054 private State state = State.INIT; 055 056 /** Number of container requests outstanding with the RM, as per latest RuntimeStatusEvent message. */ 057 private int outstandingContainerRequests = 0; 058 059 /** Number of containers currently allocated, as per latest RuntimeStatusEvent message. */ 060 private int containerAllocationCount = 0; 061 062 @Inject 063 private ResourceManagerStatus( 064 final ResourceManagerErrorHandler resourceManagerErrorHandler, 065 final DriverStatusManager driverStatusManager, 066 final InjectionFuture<DriverIdleManager> driverIdleManager) { 067 068 this.resourceManagerErrorHandler = resourceManagerErrorHandler; 069 this.driverStatusManager = driverStatusManager; 070 this.driverIdleManager = driverIdleManager; 071 } 072 073 @Override 074 public void onNext(final RuntimeStatusEvent runtimeStatusEvent) { 075 076 final State newState = runtimeStatusEvent.getState(); 077 078 LOG.log(Level.FINEST, "Runtime status: {0}", runtimeStatusEvent); 079 080 synchronized(this) { 081 this.outstandingContainerRequests = runtimeStatusEvent.getOutstandingContainerRequests().orElse(0); 082 this.containerAllocationCount = runtimeStatusEvent.getContainerAllocationList().size(); 083 084 this.setState(newState); 085 } 086 087 switch (newState) { 088 case FAILED: 089 this.onRMFailure(runtimeStatusEvent); 090 break; 091 case DONE: 092 this.onRMDone(runtimeStatusEvent); 093 break; 094 case RUNNING: 095 this.onRMRunning(runtimeStatusEvent); 096 break; 097 case INIT: 098 case SUSPEND: 099 case KILLED: 100 break; 101 default: 102 throw new RuntimeException("Unknown state: " + newState); 103 } 104 } 105 106 /** 107 * Change the state of the Resource Manager to be RUNNING. 108 */ 109 public synchronized void setRunning() { 110 this.setState(State.RUNNING); 111 } 112 113 /** 114 * Driver is idle if, regardless of status, it has no evaluators allocated and no pending container requests. 115 * @return true if the driver can be considered idle, false otherwise. 116 */ 117 private synchronized boolean isIdle() { 118 return this.outstandingContainerRequests == 0 && this.containerAllocationCount == 0; 119 } 120 121 /** 122 * Driver is idle if, regardless of status, it has no evaluators allocated 123 * and no pending container requests. This method is used in the DriverIdleManager. 124 * If all DriverIdlenessSource components are idle, DriverIdleManager will initiate Driver shutdown. 125 * @return idle, if there are no outstanding requests or allocations. Not idle otherwise. 126 */ 127 @Override 128 public synchronized IdleMessage getIdleStatus() { 129 130 if (this.isIdle()) { 131 return IDLE_MESSAGE; 132 } 133 134 final String message = String.format( 135 "There are %d outstanding container requests and %d allocated containers", 136 this.outstandingContainerRequests, this.containerAllocationCount); 137 138 return new IdleMessage(COMPONENT_NAME, message, false); 139 } 140 141 private synchronized void onRMFailure(final RuntimeStatusEvent runtimeStatusEvent) { 142 assert runtimeStatusEvent.getState() == State.FAILED; 143 this.resourceManagerErrorHandler.onNext(runtimeStatusEvent.getError().get()); 144 } 145 146 private synchronized void onRMDone(final RuntimeStatusEvent runtimeStatusEvent) { 147 assert runtimeStatusEvent.getState() == State.DONE; 148 LOG.log(Level.INFO, "Resource Manager shutdown happened. Triggering Driver shutdown."); 149 this.driverStatusManager.onComplete(); 150 } 151 152 private void onRMRunning(final RuntimeStatusEvent runtimeStatusEvent) { 153 assert runtimeStatusEvent.getState() == State.RUNNING; 154 if (this.isIdle()) { 155 this.driverIdleManager.get().onPotentiallyIdle(IDLE_MESSAGE); 156 } 157 } 158 159 private synchronized void setState(final State toState) { 160 if (this.state == toState) { 161 LOG.log(Level.FINE, "Transition from {0} state to the same state.", this.state); 162 } else if (this.state.isLegalTransition(toState)) { 163 LOG.log(Level.FINEST, "State transition: {0} -> {1}", new State[] {this.state, toState}); 164 this.state = toState; 165 } else { 166 throw new IllegalStateException( 167 "Resource manager attempts illegal state transition from " + this.state + " to " + toState); 168 } 169 } 170}