001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.common.driver.resourcemanager; 020 021import org.apache.reef.annotations.audience.DriverSide; 022import org.apache.reef.annotations.audience.Private; 023import org.apache.reef.proto.DriverRuntimeProtocol; 024import org.apache.reef.proto.ReefServiceProtos; 025import org.apache.reef.runtime.common.driver.DriverStatusManager; 026import org.apache.reef.runtime.common.driver.idle.DriverIdleManager; 027import org.apache.reef.runtime.common.driver.idle.DriverIdlenessSource; 028import org.apache.reef.runtime.common.driver.idle.IdleMessage; 029import org.apache.reef.tang.InjectionFuture; 030import org.apache.reef.wake.EventHandler; 031 032import javax.inject.Inject; 033import java.util.logging.Level; 034import java.util.logging.Logger; 035 036/** 037 * Manages the status of the Resource Manager. 038 */ 039@DriverSide 040@Private 041public final class ResourceManagerStatus implements EventHandler<DriverRuntimeProtocol.RuntimeStatusProto>, 042 DriverIdlenessSource { 043 private static final Logger LOG = Logger.getLogger(ResourceManagerStatus.class.getName()); 044 045 private static final String COMPONENT_NAME = "ResourceManager"; 046 private static final IdleMessage IDLE_MESSAGE = new IdleMessage(COMPONENT_NAME, "No outstanding requests or allocations", true); 047 048 private final ResourceManagerErrorHandler resourceManagerErrorHandler; 049 private final DriverStatusManager driverStatusManager; 050 private final InjectionFuture<DriverIdleManager> driverIdleManager; 051 052 // Mutable state. 053 private ReefServiceProtos.State state = ReefServiceProtos.State.INIT; 054 private int outstandingContainerRequests = 0; 055 private int containerAllocationCount = 0; 056 057 @Inject 058 ResourceManagerStatus(final ResourceManagerErrorHandler resourceManagerErrorHandler, 059 final DriverStatusManager driverStatusManager, 060 final InjectionFuture<DriverIdleManager> driverIdleManager) { 061 this.resourceManagerErrorHandler = resourceManagerErrorHandler; 062 this.driverStatusManager = driverStatusManager; 063 this.driverIdleManager = driverIdleManager; 064 } 065 066 @Override 067 public synchronized void onNext(final DriverRuntimeProtocol.RuntimeStatusProto runtimeStatusProto) { 068 final ReefServiceProtos.State newState = runtimeStatusProto.getState(); 069 LOG.log(Level.FINEST, "Runtime status " + runtimeStatusProto); 070 this.outstandingContainerRequests = runtimeStatusProto.getOutstandingContainerRequests(); 071 this.containerAllocationCount = runtimeStatusProto.getContainerAllocationCount(); 072 this.setState(runtimeStatusProto.getState()); 073 074 switch (newState) { 075 case FAILED: 076 this.onRMFailure(runtimeStatusProto); 077 break; 078 case DONE: 079 this.onRMDone(runtimeStatusProto); 080 break; 081 case RUNNING: 082 this.onRMRunning(runtimeStatusProto); 083 break; 084 } 085 } 086 087 /** 088 * Change the state of the Resource Manager to be RUNNING. 089 */ 090 public synchronized void setRunning() { 091 this.setState(ReefServiceProtos.State.RUNNING); 092 } 093 094 /** 095 * @return idle, if there are no outstanding requests or allocations. Not idle else. 096 */ 097 @Override 098 public synchronized IdleMessage getIdleStatus() { 099 if (this.isIdle()) { 100 return IDLE_MESSAGE; 101 } else { 102 final String message = new StringBuilder("There are ") 103 .append(this.outstandingContainerRequests) 104 .append(" outstanding container requests and ") 105 .append(this.containerAllocationCount) 106 .append(" allocated containers") 107 .toString(); 108 return new IdleMessage(COMPONENT_NAME, message, false); 109 } 110 } 111 112 113 private synchronized void onRMFailure(final DriverRuntimeProtocol.RuntimeStatusProto runtimeStatusProto) { 114 assert (runtimeStatusProto.getState() == ReefServiceProtos.State.FAILED); 115 this.resourceManagerErrorHandler.onNext(runtimeStatusProto.getError()); 116 } 117 118 private synchronized void onRMDone(final DriverRuntimeProtocol.RuntimeStatusProto runtimeStatusProto) { 119 assert (runtimeStatusProto.getState() == ReefServiceProtos.State.DONE); 120 LOG.log(Level.INFO, "Resource Manager shutdown happened. Triggering Driver shutdown."); 121 this.driverStatusManager.onComplete(); 122 } 123 124 private synchronized void onRMRunning(final DriverRuntimeProtocol.RuntimeStatusProto runtimeStatusProto) { 125 assert (runtimeStatusProto.getState() == ReefServiceProtos.State.RUNNING); 126 if (this.isIdle()) { 127 this.driverIdleManager.get().onPotentiallyIdle(IDLE_MESSAGE); 128 } 129 } 130 131 132 private synchronized boolean isIdle() { 133 return this.hasNoOutstandingRequests() 134 && this.hasNoContainersAllocated(); 135 } 136 137 private synchronized boolean isRunning() { 138 return ReefServiceProtos.State.RUNNING.equals(this.state); 139 } 140 141 142 private synchronized void setState(ReefServiceProtos.State state) { 143 // TODO: Add state transition check 144 this.state = state; 145 } 146 147 148 private synchronized boolean hasNoOutstandingRequests() { 149 return this.outstandingContainerRequests == 0; 150 } 151 152 private synchronized boolean hasNoContainersAllocated() { 153 return this.containerAllocationCount == 0; 154 } 155 156}