This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.driver.resourcemanager;
020
021import org.apache.reef.annotations.audience.DriverSide;
022import org.apache.reef.annotations.audience.Private;
023import org.apache.reef.proto.DriverRuntimeProtocol;
024import org.apache.reef.proto.ReefServiceProtos;
025import org.apache.reef.runtime.common.driver.DriverStatusManager;
026import org.apache.reef.runtime.common.driver.idle.DriverIdleManager;
027import org.apache.reef.runtime.common.driver.idle.DriverIdlenessSource;
028import org.apache.reef.runtime.common.driver.idle.IdleMessage;
029import org.apache.reef.tang.InjectionFuture;
030import org.apache.reef.wake.EventHandler;
031
032import javax.inject.Inject;
033import java.util.logging.Level;
034import java.util.logging.Logger;
035
036/**
037 * Manages the status of the Resource Manager.
038 */
039@DriverSide
040@Private
041public final class ResourceManagerStatus implements EventHandler<DriverRuntimeProtocol.RuntimeStatusProto>,
042    DriverIdlenessSource {
043  private static final Logger LOG = Logger.getLogger(ResourceManagerStatus.class.getName());
044
045  private static final String COMPONENT_NAME = "ResourceManager";
046  private static final IdleMessage IDLE_MESSAGE = new IdleMessage(COMPONENT_NAME, "No outstanding requests or allocations", true);
047
048  private final ResourceManagerErrorHandler resourceManagerErrorHandler;
049  private final DriverStatusManager driverStatusManager;
050  private final InjectionFuture<DriverIdleManager> driverIdleManager;
051
052  // Mutable state.
053  private ReefServiceProtos.State state = ReefServiceProtos.State.INIT;
054  private int outstandingContainerRequests = 0;
055  private int containerAllocationCount = 0;
056
057  @Inject
058  ResourceManagerStatus(final ResourceManagerErrorHandler resourceManagerErrorHandler,
059                        final DriverStatusManager driverStatusManager,
060                        final InjectionFuture<DriverIdleManager> driverIdleManager) {
061    this.resourceManagerErrorHandler = resourceManagerErrorHandler;
062    this.driverStatusManager = driverStatusManager;
063    this.driverIdleManager = driverIdleManager;
064  }
065
066  @Override
067  public synchronized void onNext(final DriverRuntimeProtocol.RuntimeStatusProto runtimeStatusProto) {
068    final ReefServiceProtos.State newState = runtimeStatusProto.getState();
069    LOG.log(Level.FINEST, "Runtime status " + runtimeStatusProto);
070    this.outstandingContainerRequests = runtimeStatusProto.getOutstandingContainerRequests();
071    this.containerAllocationCount = runtimeStatusProto.getContainerAllocationCount();
072    this.setState(runtimeStatusProto.getState());
073
074    switch (newState) {
075      case FAILED:
076        this.onRMFailure(runtimeStatusProto);
077        break;
078      case DONE:
079        this.onRMDone(runtimeStatusProto);
080        break;
081      case RUNNING:
082        this.onRMRunning(runtimeStatusProto);
083        break;
084    }
085  }
086
087  /**
088   * Change the state of the Resource Manager to be RUNNING.
089   */
090  public synchronized void setRunning() {
091    this.setState(ReefServiceProtos.State.RUNNING);
092  }
093
094  /**
095   * @return idle, if there are no outstanding requests or allocations. Not idle else.
096   */
097  @Override
098  public synchronized IdleMessage getIdleStatus() {
099    if (this.isIdle()) {
100      return IDLE_MESSAGE;
101    } else {
102      final String message = new StringBuilder("There are ")
103          .append(this.outstandingContainerRequests)
104          .append(" outstanding container requests and ")
105          .append(this.containerAllocationCount)
106          .append(" allocated containers")
107          .toString();
108      return new IdleMessage(COMPONENT_NAME, message, false);
109    }
110  }
111
112
113  private synchronized void onRMFailure(final DriverRuntimeProtocol.RuntimeStatusProto runtimeStatusProto) {
114    assert (runtimeStatusProto.getState() == ReefServiceProtos.State.FAILED);
115    this.resourceManagerErrorHandler.onNext(runtimeStatusProto.getError());
116  }
117
118  private synchronized void onRMDone(final DriverRuntimeProtocol.RuntimeStatusProto runtimeStatusProto) {
119    assert (runtimeStatusProto.getState() == ReefServiceProtos.State.DONE);
120    LOG.log(Level.INFO, "Resource Manager shutdown happened. Triggering Driver shutdown.");
121    this.driverStatusManager.onComplete();
122  }
123
124  private synchronized void onRMRunning(final DriverRuntimeProtocol.RuntimeStatusProto runtimeStatusProto) {
125    assert (runtimeStatusProto.getState() == ReefServiceProtos.State.RUNNING);
126    if (this.isIdle()) {
127      this.driverIdleManager.get().onPotentiallyIdle(IDLE_MESSAGE);
128    }
129  }
130
131
132  private synchronized boolean isIdle() {
133    return this.hasNoOutstandingRequests()
134        && this.hasNoContainersAllocated();
135  }
136
137  private synchronized boolean isRunning() {
138    return ReefServiceProtos.State.RUNNING.equals(this.state);
139  }
140
141
142  private synchronized void setState(ReefServiceProtos.State state) {
143    // TODO: Add state transition check
144    this.state = state;
145  }
146
147
148  private synchronized boolean hasNoOutstandingRequests() {
149    return this.outstandingContainerRequests == 0;
150  }
151
152  private synchronized boolean hasNoContainersAllocated() {
153    return this.containerAllocationCount == 0;
154  }
155
156}