001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.driver.resourcemanager; 020 021import org.apache.reef.annotations.audience.DriverSide; 022import org.apache.reef.annotations.audience.Private; 023import org.apache.reef.runtime.common.driver.DriverStatusManager; 024import org.apache.reef.runtime.common.driver.evaluator.pojos.State; 025import org.apache.reef.runtime.common.driver.idle.DriverIdleManager; 026import org.apache.reef.runtime.common.driver.idle.DriverIdlenessSource; 027import org.apache.reef.runtime.common.driver.idle.IdleMessage; 028import org.apache.reef.tang.InjectionFuture; 029import org.apache.reef.wake.EventHandler; 030 031import javax.inject.Inject; 032import java.util.logging.Level; 033import java.util.logging.Logger; 034 035/** 036 * Manages the status of the Resource Manager. 037 */ 038@DriverSide 039@Private 040public final class ResourceManagerStatus implements EventHandler<RuntimeStatusEvent>, 041 DriverIdlenessSource { 042 private static final Logger LOG = Logger.getLogger(ResourceManagerStatus.class.getName()); 043 044 private static final String COMPONENT_NAME = "ResourceManager"; 045 private static final IdleMessage IDLE_MESSAGE = 046 new IdleMessage(COMPONENT_NAME, "No outstanding requests or allocations", true); 047 048 private final ResourceManagerErrorHandler resourceManagerErrorHandler; 049 private final DriverStatusManager driverStatusManager; 050 private final InjectionFuture<DriverIdleManager> driverIdleManager; 051 052 // Mutable state. 053 private State state = State.INIT; 054 private int outstandingContainerRequests = 0; 055 private int containerAllocationCount = 0; 056 057 @Inject 058 ResourceManagerStatus(final ResourceManagerErrorHandler resourceManagerErrorHandler, 059 final DriverStatusManager driverStatusManager, 060 final InjectionFuture<DriverIdleManager> driverIdleManager) { 061 this.resourceManagerErrorHandler = resourceManagerErrorHandler; 062 this.driverStatusManager = driverStatusManager; 063 this.driverIdleManager = driverIdleManager; 064 } 065 066 @Override 067 public synchronized void onNext(final RuntimeStatusEvent runtimeStatusEvent) { 068 final State newState = runtimeStatusEvent.getState(); 069 LOG.log(Level.FINEST, "Runtime status " + runtimeStatusEvent); 070 this.outstandingContainerRequests = runtimeStatusEvent.getOutstandingContainerRequests().orElse(0); 071 this.containerAllocationCount = runtimeStatusEvent.getContainerAllocationList().size(); 072 this.setState(runtimeStatusEvent.getState()); 073 074 switch (newState) { 075 case FAILED: 076 this.onRMFailure(runtimeStatusEvent); 077 break; 078 case DONE: 079 this.onRMDone(runtimeStatusEvent); 080 break; 081 case RUNNING: 082 this.onRMRunning(runtimeStatusEvent); 083 break; 084 case INIT: 085 case SUSPEND: 086 case KILLED: 087 break; 088 default: 089 throw new RuntimeException("Unknown state: " + newState); 090 } 091 } 092 093 /** 094 * Change the state of the Resource Manager to be RUNNING. 095 */ 096 public synchronized void setRunning() { 097 this.setState(State.RUNNING); 098 } 099 100 /** 101 * @return idle, if there are no outstanding requests or allocations. Not idle else. 102 */ 103 @Override 104 public synchronized IdleMessage getIdleStatus() { 105 if (this.isIdle()) { 106 return IDLE_MESSAGE; 107 } else { 108 final String message = new StringBuilder("There are ") 109 .append(this.outstandingContainerRequests) 110 .append(" outstanding container requests and ") 111 .append(this.containerAllocationCount) 112 .append(" allocated containers") 113 .toString(); 114 return new IdleMessage(COMPONENT_NAME, message, false); 115 } 116 } 117 118 119 private synchronized void onRMFailure(final RuntimeStatusEvent runtimeStatusEvent) { 120 assert runtimeStatusEvent.getState() == State.FAILED; 121 this.resourceManagerErrorHandler.onNext(runtimeStatusEvent.getError().get()); 122 } 123 124 private synchronized void onRMDone(final RuntimeStatusEvent runtimeStatusEvent) { 125 assert runtimeStatusEvent.getState() == State.DONE; 126 LOG.log(Level.INFO, "Resource Manager shutdown happened. Triggering Driver shutdown."); 127 this.driverStatusManager.onComplete(); 128 } 129 130 private synchronized void onRMRunning(final RuntimeStatusEvent runtimeStatusEvent) { 131 assert runtimeStatusEvent.getState() == State.RUNNING; 132 if (this.isIdle()) { 133 this.driverIdleManager.get().onPotentiallyIdle(IDLE_MESSAGE); 134 } 135 } 136 137 138 private synchronized boolean isIdle() { 139 return this.hasNoOutstandingRequests() 140 && this.hasNoContainersAllocated(); 141 } 142 143 private synchronized boolean isRunning() { 144 return State.RUNNING.equals(this.state); 145 } 146 147 /** 148 * 149 * Checks if the ResourceManager can switch from the current state to the target state. 150 * See REEF-826 for the state transition matrix. 151 * 152 * @param from current state 153 * @param to state to switch to 154 * 155 * @return true if the transition is legal; false otherwise 156 * 157 */ 158 private synchronized boolean isLegalStateTransition(final State from, 159 final State to) { 160 161 // handle diagonal elements of the transition matrix 162 if (from.equals(to)){ 163 LOG.finest("Transition from " + from + " state to the same state."); 164 return true; 165 } 166 167 // handle non-diagonal elements 168 switch (from) { 169 170 case INIT: 171 switch (to) { 172 case RUNNING: 173 case SUSPEND: 174 case DONE: 175 case FAILED: 176 case KILLED: 177 return true; 178 default: 179 return false; 180 } 181 182 case RUNNING: 183 switch (to) { 184 case SUSPEND: 185 case DONE: 186 case FAILED: 187 case KILLED: 188 return true; 189 default: 190 return false; 191 } 192 193 case SUSPEND: 194 switch (to) { 195 case RUNNING: 196 case FAILED: 197 case KILLED: 198 return true; 199 default: 200 return false; 201 } 202 203 case DONE: 204 case FAILED: 205 case KILLED: 206 return false; 207 208 default: 209 return false; 210 211 } 212 213 } 214 215 private synchronized void setState(final State newState) { 216 217 if (isLegalStateTransition(this.state, newState)) { 218 this.state = newState; 219 } else { 220 throw new IllegalStateException("Resource manager attempts illegal state transition from " 221 + this.state + " to " 222 + newState); 223 } 224 225 } 226 227 228 private synchronized boolean hasNoOutstandingRequests() { 229 return this.outstandingContainerRequests == 0; 230 } 231 232 private synchronized boolean hasNoContainersAllocated() { 233 return this.containerAllocationCount == 0; 234 } 235 236}