This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.webserver;
020
021import org.apache.reef.driver.catalog.NodeDescriptor;
022import org.apache.reef.driver.context.ActiveContext;
023import org.apache.reef.driver.evaluator.AllocatedEvaluator;
024import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
025import org.apache.reef.driver.task.RunningTask;
026import org.apache.reef.runtime.common.driver.DriverStatusManager;
027import org.apache.reef.runtime.common.utils.RemoteManager;
028import org.apache.reef.tang.annotations.Unit;
029import org.apache.reef.wake.EventHandler;
030import org.apache.reef.wake.time.event.StartTime;
031import org.apache.reef.wake.time.event.StopTime;
032
033import javax.inject.Inject;
034import java.text.Format;
035import java.text.SimpleDateFormat;
036import java.util.*;
037import java.util.logging.Level;
038import java.util.logging.Logger;
039
040/**
041 * Reef Event Manager that manages Reef states
042 */
043@Unit
044public final class ReefEventStateManager {
045  /**
046   * Standard Java logger.
047   */
048  private static final Logger LOG = Logger.getLogger(ReefEventStateManager.class.getName());
049
050  /**
051   * date format
052   */
053  private static final Format format = new SimpleDateFormat("yyyy MM dd HH:mm:ss");
054
055  /**
056   * Map of evaluators
057   */
058  private final Map<String, EvaluatorDescriptor> evaluators = new HashMap<>();
059
060  /**
061   * Map from context ID to running evaluator context.
062   */
063  private final Map<String, ActiveContext> contexts = new HashMap<>();
064
065  private final List<AvroReefServiceInfo> serviceInfoList = new ArrayList<AvroReefServiceInfo>();
066
067  /**
068   * Remote manager in driver the carries information such as driver endpoint identifier
069   */
070  private final RemoteManager remoteManager;
071
072  /**
073   * Driver Status Manager that controls the driver status
074   */
075  private final DriverStatusManager driverStatusManager;
076
077  /**
078   * Evaluator start time
079   */
080  private StartTime startTime;
081
082  /**
083   * Evaluator stop time
084   */
085  private StopTime stopTime;
086
087  /**
088   * ReefEventStateManager that keeps the states of Reef components
089   */
090  @Inject
091  public ReefEventStateManager(final RemoteManager remoteManager, final DriverStatusManager driverStatusManager) {
092    this.remoteManager = remoteManager;
093    this.driverStatusManager = driverStatusManager;
094  }
095
096  /**
097   * get start time
098   *
099   * @return
100   */
101  public String getStartTime() {
102    if (startTime != null) {
103      return convertTime(startTime.getTimeStamp());
104    }
105    return null;
106  }
107
108  /**
109   * get stop time
110   *
111   * @return
112   */
113  public String getStopTime() {
114    if (stopTime != null) {
115      return convertTime(stopTime.getTimeStamp());
116    }
117    return null;
118  }
119
120  /**
121   * convert time from long to formatted string
122   *
123   * @param time
124   * @return
125   */
126  private String convertTime(final long time) {
127    final Date date = new Date(time);
128    return format.format(date).toString();
129  }
130
131  /**
132   * get evaluator map
133   *
134   * @return
135   */
136  public Map<String, EvaluatorDescriptor> getEvaluators() {
137    return evaluators;
138  }
139
140  /**
141   * get driver endpoint identifier
142   */
143  public String getDriverEndpointIdentifier() {
144    return remoteManager.getMyIdentifier();
145  }
146
147  public List<AvroReefServiceInfo> getServicesInfo() {
148    return this.serviceInfoList;
149  }
150
151  public void registerServiceInfo(AvroReefServiceInfo serviceInfo) {
152    synchronized (this.serviceInfoList) {
153      serviceInfoList.add(serviceInfo);
154      LOG.log(Level.INFO, "Registered Service [{0}] with Info [{1}]", new Object[]{serviceInfo.getServiceName(), serviceInfo.getServiceInfo()});
155    }
156  }
157
158  /**
159   * get a map of contexts
160   *
161   * @return
162   */
163  public Map<String, ActiveContext> getContexts() {
164    return contexts;
165  }
166
167  /**
168   * pus a entry to evaluators
169   *
170   * @param key
171   * @param value
172   */
173  public void put(final String key, final EvaluatorDescriptor value) {
174    evaluators.put(key, value);
175  }
176
177  /**
178   * get a value from evaluators by key
179   *
180   * @param key
181   * @return
182   */
183  public EvaluatorDescriptor get(final String key) {
184    return evaluators.get(key);
185  }
186
187  /**
188   * getEvaluatorDescriptor
189   *
190   * @param evaluatorId
191   * @return
192   */
193  public EvaluatorDescriptor getEvaluatorDescriptor(final String evaluatorId) {
194    return evaluators.get(evaluatorId);
195  }
196
197  /**
198   * get Evaluator NodeDescriptor
199   *
200   * @param evaluatorId
201   * @return
202   */
203  public NodeDescriptor getEvaluatorNodeDescriptor(final String evaluatorId) {
204    return evaluators.get(evaluatorId).getNodeDescriptor();
205  }
206
207  /**
208   * Kill driver by calling onComplete() . This method is called when client wants to kill the driver and evaluators.
209   */
210  public void OnClientKill() {
211    driverStatusManager.onComplete();
212  }
213
214  /**
215   * Job Driver is ready and the clock is set up
216   */
217  public final class StartStateHandler implements EventHandler<StartTime> {
218    @Override
219    public void onNext(final StartTime startTime) {
220      LOG.log(Level.INFO,
221          "StartStateHandler: Driver started with endpoint identifier [{0}]  and StartTime [{1}]",
222          new Object[]{ReefEventStateManager.this.remoteManager.getMyIdentifier(), startTime});
223      ReefEventStateManager.this.startTime = startTime;
224    }
225  }
226
227  /**
228   * Job driver stopped, log the stop time.
229   */
230  public final class StopStateHandler implements EventHandler<StopTime> {
231    @Override
232    public void onNext(final StopTime stopTime) {
233      LOG.log(Level.INFO, "StopStateHandler called. StopTime: {0}", stopTime);
234      ReefEventStateManager.this.stopTime = stopTime;
235    }
236  }
237
238  /**
239   * Receive notification that an Evaluator had been allocated
240   */
241  public final class AllocatedEvaluatorStateHandler implements EventHandler<AllocatedEvaluator> {
242    @Override
243    public void onNext(final AllocatedEvaluator eval) {
244      synchronized (ReefEventStateManager.this) {
245        ReefEventStateManager.this.put(eval.getId(), eval.getEvaluatorDescriptor());
246      }
247    }
248  }
249
250  /**
251   * Receive event when task is running
252   */
253  public final class TaskRunningStateHandler implements EventHandler<RunningTask> {
254    @Override
255    public void onNext(final RunningTask runningTask) {
256      LOG.log(Level.INFO, "Running task {0} received.", runningTask.getId());
257    }
258  }
259
260  /**
261   * Receive event during driver restart that a task is running in previous evaluator
262   */
263  public final class DriverRestartTaskRunningStateHandler implements EventHandler<RunningTask> {
264    @Override
265    public void onNext(final RunningTask runningTask) {
266      LOG.log(Level.INFO, "Running task {0} received during driver restart.", runningTask.getId());
267    }
268  }
269
270  /**
271   * Receive notification that a new Context is available.
272   */
273  public final class ActiveContextStateHandler implements EventHandler<ActiveContext> {
274    @Override
275    public void onNext(final ActiveContext context) {
276      synchronized (ReefEventStateManager.this) {
277        LOG.log(Level.INFO, "Active Context {0} received and handled in state handler", context);
278        contexts.put(context.getId(), context);
279      }
280    }
281  }
282
283  /**
284   * Receive notification that a new Context is available.
285   */
286  public final class DrivrRestartActiveContextStateHandler implements EventHandler<ActiveContext> {
287    @Override
288    public void onNext(final ActiveContext context) {
289      synchronized (ReefEventStateManager.this) {
290        LOG.log(Level.INFO, "Active Context {0} received and handled in state handler during driver restart.", context);
291        evaluators.put(context.getEvaluatorId(), context.getEvaluatorDescriptor());
292        contexts.put(context.getId(), context);
293      }
294    }
295  }
296
297  /**
298   * Receive notification from the client.
299   */
300  public final class ClientMessageStateHandler implements EventHandler<byte[]> {
301    @Override
302    public void onNext(final byte[] message) {
303      synchronized (ReefEventStateManager.this) {
304        LOG.log(Level.INFO, "ClientMessageStateHandler OnNext called");
305      }
306    }
307  }
308}