This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.driver.resourcemanager;
020
021import org.apache.reef.annotations.audience.DriverSide;
022import org.apache.reef.annotations.audience.Private;
023import org.apache.reef.runtime.common.driver.DriverStatusManager;
024import org.apache.reef.runtime.common.driver.evaluator.pojos.State;
025import org.apache.reef.runtime.common.driver.idle.DriverIdleManager;
026import org.apache.reef.runtime.common.driver.idle.DriverIdlenessSource;
027import org.apache.reef.runtime.common.driver.idle.IdleMessage;
028import org.apache.reef.tang.InjectionFuture;
029import org.apache.reef.wake.EventHandler;
030
031import javax.inject.Inject;
032import java.util.logging.Level;
033import java.util.logging.Logger;
034
035/**
036 * Manages the status of the Resource Manager and tracks whether it is idle.
037 */
038@DriverSide
039@Private
040public final class ResourceManagerStatus implements EventHandler<RuntimeStatusEvent>, DriverIdlenessSource {
041
042  private static final Logger LOG = Logger.getLogger(ResourceManagerStatus.class.getName());
043
044  private static final String COMPONENT_NAME = "ResourceManager";
045
046  private static final IdleMessage IDLE_MESSAGE =
047      new IdleMessage(COMPONENT_NAME, "No outstanding requests or allocations", true);
048
049  private final ResourceManagerErrorHandler resourceManagerErrorHandler;
050  private final DriverStatusManager driverStatusManager;
051  private final InjectionFuture<DriverIdleManager> driverIdleManager;
052
053  /** Mutable RM state. */
054  private State state = State.INIT;
055
056  /** Number of container requests outstanding with the RM, as per latest RuntimeStatusEvent message. */
057  private int outstandingContainerRequests = 0;
058
059  /** Number of containers currently allocated, as per latest RuntimeStatusEvent message. */
060  private int containerAllocationCount = 0;
061
062  @Inject
063  private ResourceManagerStatus(
064      final ResourceManagerErrorHandler resourceManagerErrorHandler,
065      final DriverStatusManager driverStatusManager,
066      final InjectionFuture<DriverIdleManager> driverIdleManager) {
067
068    this.resourceManagerErrorHandler = resourceManagerErrorHandler;
069    this.driverStatusManager = driverStatusManager;
070    this.driverIdleManager = driverIdleManager;
071  }
072
073  @Override
074  public void onNext(final RuntimeStatusEvent runtimeStatusEvent) {
075
076    final State newState = runtimeStatusEvent.getState();
077
078    LOG.log(Level.FINEST, "Runtime status: {0}", runtimeStatusEvent);
079
080    synchronized(this) {
081      this.outstandingContainerRequests = runtimeStatusEvent.getOutstandingContainerRequests().orElse(0);
082      this.containerAllocationCount = runtimeStatusEvent.getContainerAllocationList().size();
083
084      this.setState(newState);
085    }
086
087    switch (newState) {
088    case FAILED:
089      this.onRMFailure(runtimeStatusEvent);
090      break;
091    case DONE:
092      this.onRMDone(runtimeStatusEvent);
093      break;
094    case RUNNING:
095      this.onRMRunning(runtimeStatusEvent);
096      break;
097    case INIT:
098    case SUSPEND:
099    case KILLED:
100      break;
101    default:
102      throw new RuntimeException("Unknown state: " + newState);
103    }
104  }
105
106  /**
107   * Change the state of the Resource Manager to be RUNNING.
108   */
109  public synchronized void setRunning() {
110    this.setState(State.RUNNING);
111  }
112
113  /**
114   * Driver is idle if, regardless of status, it has no evaluators allocated and no pending container requests.
115   * @return true if the driver can be considered idle, false otherwise.
116   */
117  private synchronized boolean isIdle() {
118    return this.outstandingContainerRequests == 0 && this.containerAllocationCount == 0;
119  }
120
121  /**
122   * Driver is idle if, regardless of status, it has no evaluators allocated
123   * and no pending container requests. This method is used in the DriverIdleManager.
124   * If all DriverIdlenessSource components are idle, DriverIdleManager will initiate Driver shutdown.
125   * @return idle, if there are no outstanding requests or allocations. Not idle otherwise.
126   */
127  @Override
128  public synchronized IdleMessage getIdleStatus() {
129
130    if (this.isIdle()) {
131      return IDLE_MESSAGE;
132    }
133
134    final String message = String.format(
135        "There are %d outstanding container requests and %d allocated containers",
136        this.outstandingContainerRequests, this.containerAllocationCount);
137
138    return new IdleMessage(COMPONENT_NAME, message, false);
139  }
140
141  private synchronized void onRMFailure(final RuntimeStatusEvent runtimeStatusEvent) {
142    assert runtimeStatusEvent.getState() == State.FAILED;
143    this.resourceManagerErrorHandler.onNext(runtimeStatusEvent.getError().get());
144  }
145
146  private synchronized void onRMDone(final RuntimeStatusEvent runtimeStatusEvent) {
147    assert runtimeStatusEvent.getState() == State.DONE;
148    LOG.log(Level.INFO, "Resource Manager shutdown happened. Triggering Driver shutdown.");
149    this.driverStatusManager.onComplete();
150  }
151
152  private void onRMRunning(final RuntimeStatusEvent runtimeStatusEvent) {
153    assert runtimeStatusEvent.getState() == State.RUNNING;
154    if (this.isIdle()) {
155      this.driverIdleManager.get().onPotentiallyIdle(IDLE_MESSAGE);
156    }
157  }
158
159  private synchronized void setState(final State toState) {
160    if (this.state == toState) {
161      LOG.log(Level.FINE, "Transition from {0} state to the same state.", this.state);
162    } else if (this.state.isLegalTransition(toState)) {
163      LOG.log(Level.FINEST, "State transition: {0} -> {1}", new State[] {this.state, toState});
164      this.state = toState;
165    } else {
166      throw new IllegalStateException(
167          "Resource manager attempts illegal state transition from " + this.state + " to " + toState);
168    }
169  }
170}