This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.driver.resourcemanager;
020
021import org.apache.reef.annotations.audience.DriverSide;
022import org.apache.reef.annotations.audience.Private;
023import org.apache.reef.runtime.common.driver.DriverStatusManager;
024import org.apache.reef.runtime.common.driver.evaluator.pojos.State;
025import org.apache.reef.runtime.common.driver.idle.DriverIdleManager;
026import org.apache.reef.runtime.common.driver.idle.DriverIdlenessSource;
027import org.apache.reef.runtime.common.driver.idle.IdleMessage;
028import org.apache.reef.tang.InjectionFuture;
029import org.apache.reef.wake.EventHandler;
030
031import javax.inject.Inject;
032import java.util.logging.Level;
033import java.util.logging.Logger;
034
035/**
036 * Manages the status of the Resource Manager.
037 */
038@DriverSide
039@Private
040public final class ResourceManagerStatus implements EventHandler<RuntimeStatusEvent>,
041    DriverIdlenessSource {
042  private static final Logger LOG = Logger.getLogger(ResourceManagerStatus.class.getName());
043
044  private static final String COMPONENT_NAME = "ResourceManager";
045  private static final IdleMessage IDLE_MESSAGE =
046      new IdleMessage(COMPONENT_NAME, "No outstanding requests or allocations", true);
047
048  private final ResourceManagerErrorHandler resourceManagerErrorHandler;
049  private final DriverStatusManager driverStatusManager;
050  private final InjectionFuture<DriverIdleManager> driverIdleManager;
051
052  // Mutable state.
053  private State state =  State.INIT;
054  private int outstandingContainerRequests = 0;
055  private int containerAllocationCount = 0;
056
057  @Inject
058  ResourceManagerStatus(final ResourceManagerErrorHandler resourceManagerErrorHandler,
059                        final DriverStatusManager driverStatusManager,
060                        final InjectionFuture<DriverIdleManager> driverIdleManager) {
061    this.resourceManagerErrorHandler = resourceManagerErrorHandler;
062    this.driverStatusManager = driverStatusManager;
063    this.driverIdleManager = driverIdleManager;
064  }
065
066  @Override
067  public synchronized void onNext(final RuntimeStatusEvent runtimeStatusEvent) {
068    final State newState = runtimeStatusEvent.getState();
069    LOG.log(Level.FINEST, "Runtime status " + runtimeStatusEvent);
070    this.outstandingContainerRequests = runtimeStatusEvent.getOutstandingContainerRequests().orElse(0);
071    this.containerAllocationCount = runtimeStatusEvent.getContainerAllocationList().size();
072    this.setState(runtimeStatusEvent.getState());
073
074    switch (newState) {
075    case FAILED:
076      this.onRMFailure(runtimeStatusEvent);
077      break;
078    case DONE:
079      this.onRMDone(runtimeStatusEvent);
080      break;
081    case RUNNING:
082      this.onRMRunning(runtimeStatusEvent);
083      break;
084    case INIT:
085    case SUSPEND:
086    case KILLED:
087      break;
088    default:
089      throw new RuntimeException("Unknown state: " + newState);
090    }
091  }
092
093  /**
094   * Change the state of the Resource Manager to be RUNNING.
095   */
096  public synchronized void setRunning() {
097    this.setState(State.RUNNING);
098  }
099
100  /**
101   * @return idle, if there are no outstanding requests or allocations. Not idle else.
102   */
103  @Override
104  public synchronized IdleMessage getIdleStatus() {
105    if (this.isIdle()) {
106      return IDLE_MESSAGE;
107    } else {
108      final String message = new StringBuilder("There are ")
109          .append(this.outstandingContainerRequests)
110          .append(" outstanding container requests and ")
111          .append(this.containerAllocationCount)
112          .append(" allocated containers")
113          .toString();
114      return new IdleMessage(COMPONENT_NAME, message, false);
115    }
116  }
117
118
119  private synchronized void onRMFailure(final RuntimeStatusEvent runtimeStatusEvent) {
120    assert runtimeStatusEvent.getState() == State.FAILED;
121    this.resourceManagerErrorHandler.onNext(runtimeStatusEvent.getError().get());
122  }
123
124  private synchronized void onRMDone(final RuntimeStatusEvent runtimeStatusEvent) {
125    assert runtimeStatusEvent.getState() == State.DONE;
126    LOG.log(Level.INFO, "Resource Manager shutdown happened. Triggering Driver shutdown.");
127    this.driverStatusManager.onComplete();
128  }
129
130  private synchronized void onRMRunning(final RuntimeStatusEvent runtimeStatusEvent) {
131    assert runtimeStatusEvent.getState() == State.RUNNING;
132    if (this.isIdle()) {
133      this.driverIdleManager.get().onPotentiallyIdle(IDLE_MESSAGE);
134    }
135  }
136
137
138  private synchronized boolean isIdle() {
139    return this.hasNoOutstandingRequests()
140        && this.hasNoContainersAllocated();
141  }
142
143  private synchronized boolean isRunning() {
144    return State.RUNNING.equals(this.state);
145  }
146
147  /**
148  *
149  * Checks if the ResourceManager can switch from the current state to the target state.
150  * See REEF-826 for the state transition matrix.
151  *
152  * @param from current state
153  * @param to   state to switch to
154  *
155  * @return true if the transition is legal; false otherwise
156  *
157  */
158  private synchronized boolean isLegalStateTransition(final State from,
159                                                      final State to) {
160
161    // handle diagonal elements of the transition matrix
162    if (from.equals(to)){
163      LOG.finest("Transition from " + from + " state to the same state.");
164      return true;
165    }
166
167    // handle non-diagonal elements
168    switch (from) {
169
170    case INIT:
171      switch (to) {
172      case RUNNING:
173      case SUSPEND:
174      case DONE:
175      case FAILED:
176      case KILLED:
177        return true;
178      default:
179        return false;
180      }
181
182    case RUNNING:
183      switch (to) {
184      case SUSPEND:
185      case DONE:
186      case FAILED:
187      case KILLED:
188        return true;
189      default:
190        return false;
191      }
192
193    case SUSPEND:
194      switch (to) {
195      case RUNNING:
196      case FAILED:
197      case KILLED:
198        return true;
199      default:
200        return false;
201      }
202
203    case DONE:
204    case FAILED:
205    case KILLED:
206      return false;
207
208    default:
209      return false;
210
211    }
212
213  }
214
215  private synchronized void setState(final State newState) {
216
217    if (isLegalStateTransition(this.state, newState)) {
218      this.state = newState;
219    } else {
220      throw new IllegalStateException("Resource manager attempts illegal state transition from "
221                + this.state + " to "
222                + newState);
223    }
224
225  }
226
227
228  private synchronized boolean hasNoOutstandingRequests() {
229    return this.outstandingContainerRequests == 0;
230  }
231
232  private synchronized boolean hasNoContainersAllocated() {
233    return this.containerAllocationCount == 0;
234  }
235
236}