This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.webserver;
020
021import org.apache.reef.driver.catalog.NodeDescriptor;
022import org.apache.reef.driver.context.ActiveContext;
023import org.apache.reef.driver.evaluator.AllocatedEvaluator;
024import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
025import org.apache.reef.driver.restart.DriverRestarted;
026import org.apache.reef.driver.task.RunningTask;
027import org.apache.reef.runtime.common.driver.DriverStatusManager;
028import org.apache.reef.runtime.common.utils.RemoteManager;
029import org.apache.reef.tang.annotations.Unit;
030import org.apache.reef.wake.EventHandler;
031import org.apache.reef.wake.time.event.StartTime;
032import org.apache.reef.wake.time.event.StopTime;
033
034import javax.inject.Inject;
035import java.text.Format;
036import java.text.SimpleDateFormat;
037import java.util.*;
038import java.util.logging.Level;
039import java.util.logging.Logger;
040
041/**
042 * Reef Event Manager that manages Reef states.
043 */
044@Unit
045public final class ReefEventStateManager {
046  /**
047   * Standard Java logger.
048   */
049  private static final Logger LOG = Logger.getLogger(ReefEventStateManager.class.getName());
050
051  /**
052   * Date format.
053   */
054  private static final Format FORMAT = new SimpleDateFormat("yyyy MM dd HH:mm:ss");
055
056  /**
057   * Map of evaluators.
058   */
059  private final Map<String, EvaluatorDescriptor> evaluators = new HashMap<>();
060
061  /**
062   * Map from context ID to running evaluator context.
063   */
064  private final Map<String, ActiveContext> contexts = new HashMap<>();
065
066  private final List<AvroReefServiceInfo> serviceInfoList = new ArrayList<>();
067
068  /**
069   * Remote manager in driver the carries information such as driver endpoint identifier.
070   */
071  private final RemoteManager remoteManager;
072
073  /**
074   * Driver Status Manager that controls the driver status.
075   */
076  private final DriverStatusManager driverStatusManager;
077
078  /**
079   * Evaluator start time.
080   */
081  private StartTime startTime;
082
083  /**
084   * Evaluator stop time.
085   */
086  private StopTime stopTime;
087
088  /**
089   * ReefEventStateManager that keeps the states of Reef components.
090   */
091  @Inject
092  public ReefEventStateManager(final RemoteManager remoteManager, final DriverStatusManager driverStatusManager) {
093    this.remoteManager = remoteManager;
094    this.driverStatusManager = driverStatusManager;
095  }
096
097  /**
098   * get start time.
099   *
100   * @return
101   */
102  public String getStartTime() {
103    if (startTime != null) {
104      return convertTime(startTime.getTimestamp());
105    }
106    return null;
107  }
108
109  /**
110   * get stop time.
111   *
112   * @return
113   */
114  public String getStopTime() {
115    if (stopTime != null) {
116      return convertTime(stopTime.getTimestamp());
117    }
118    return null;
119  }
120
121  /**
122   * convert time from long to formatted string.
123   *
124   * @param time
125   * @return
126   */
127  private String convertTime(final long time) {
128    final Date date = new Date(time);
129    return FORMAT.format(date);
130  }
131
132  /**
133   * get evaluator map.
134   *
135   * @return
136   */
137  public Map<String, EvaluatorDescriptor> getEvaluators() {
138    return evaluators;
139  }
140
141  /**
142   * get driver endpoint identifier.
143   */
144  public String getDriverEndpointIdentifier() {
145    return remoteManager.getMyIdentifier();
146  }
147
148  public List<AvroReefServiceInfo> getServicesInfo() {
149    return this.serviceInfoList;
150  }
151
152  public void registerServiceInfo(final AvroReefServiceInfo serviceInfo) {
153    synchronized (this.serviceInfoList) {
154      serviceInfoList.add(serviceInfo);
155      LOG.log(Level.INFO, "Registered Service [{0}] with Info [{1}]",
156          new Object[]{serviceInfo.getServiceName(), serviceInfo.getServiceInfo()});
157    }
158  }
159
160  /**
161   * get a map of contexts.
162   *
163   * @return
164   */
165  public Map<String, ActiveContext> getContexts() {
166    return contexts;
167  }
168
169  /**
170   * pus a entry to evaluators.
171   *
172   * @param key
173   * @param value
174   */
175  public void put(final String key, final EvaluatorDescriptor value) {
176    evaluators.put(key, value);
177  }
178
179  /**
180   * get a value from evaluators by key.
181   *
182   * @param key
183   * @return
184   */
185  public EvaluatorDescriptor get(final String key) {
186    return evaluators.get(key);
187  }
188
189  /**
190   * getEvaluatorDescriptor.
191   *
192   * @param evaluatorId
193   * @return
194   */
195  public EvaluatorDescriptor getEvaluatorDescriptor(final String evaluatorId) {
196    return evaluators.get(evaluatorId);
197  }
198
199  /**
200   * get Evaluator NodeDescriptor.
201   *
202   * @param evaluatorId
203   * @return
204   */
205  public NodeDescriptor getEvaluatorNodeDescriptor(final String evaluatorId) {
206    return evaluators.get(evaluatorId).getNodeDescriptor();
207  }
208
209  /**
210   * Kill driver by calling onComplete() . This method is called when client wants to kill the driver and evaluators.
211   */
212  public void onClientKill() {
213    driverStatusManager.onComplete();
214  }
215
216  /**
217   * Job Driver is ready and the clock is set up.
218   */
219  public final class StartStateHandler implements EventHandler<StartTime> {
220    @Override
221    @SuppressWarnings("checkstyle:hiddenfield")
222    public void onNext(final StartTime startTime) {
223      LOG.log(Level.INFO,
224          "StartStateHandler: Driver started with endpoint identifier [{0}]  and StartTime [{1}]",
225          new Object[]{ReefEventStateManager.this.remoteManager.getMyIdentifier(), startTime});
226      ReefEventStateManager.this.startTime = startTime;
227    }
228  }
229
230  /**
231   * Job Driver has been restarted.
232   */
233  public final class DriverRestartHandler implements EventHandler<DriverRestarted> {
234    @Override
235    @SuppressWarnings("checkstyle:hiddenfield")
236    public void onNext(final DriverRestarted restartTime) {
237      LOG.log(Level.INFO, "DriverRestartHandler called. StartTime: {0}", restartTime);
238    }
239  }
240
241  /**
242   * Job driver stopped, log the stop time.
243   */
244  public final class StopStateHandler implements EventHandler<StopTime> {
245    @Override
246    @SuppressWarnings("checkstyle:hiddenfield")
247    public void onNext(final StopTime stopTime) {
248      LOG.log(Level.INFO, "StopStateHandler called. StopTime: {0}", stopTime);
249      ReefEventStateManager.this.stopTime = stopTime;
250    }
251  }
252
253  /**
254   * Receive notification that an Evaluator had been allocated.
255   */
256  public final class AllocatedEvaluatorStateHandler implements EventHandler<AllocatedEvaluator> {
257    @Override
258    public void onNext(final AllocatedEvaluator eval) {
259      synchronized (ReefEventStateManager.this) {
260        ReefEventStateManager.this.put(eval.getId(), eval.getEvaluatorDescriptor());
261      }
262    }
263  }
264
265  /**
266   * Receive event when task is running.
267   */
268  public final class TaskRunningStateHandler implements EventHandler<RunningTask> {
269    @Override
270    public void onNext(final RunningTask runningTask) {
271      LOG.log(Level.INFO, "Running task {0} received.", runningTask.getId());
272    }
273  }
274
275  /**
276   * Receive event during driver restart that a task is running in previous evaluator.
277   */
278  public final class DriverRestartTaskRunningStateHandler implements EventHandler<RunningTask> {
279    @Override
280    public void onNext(final RunningTask runningTask) {
281      LOG.log(Level.INFO, "Running task {0} received during driver restart.", runningTask.getId());
282    }
283  }
284
285  /**
286   * Receive notification that a new Context is available.
287   */
288  public final class ActiveContextStateHandler implements EventHandler<ActiveContext> {
289    @Override
290    public void onNext(final ActiveContext context) {
291      synchronized (ReefEventStateManager.this) {
292        LOG.log(Level.INFO, "Active Context {0} received and handled in state handler", context);
293        contexts.put(context.getId(), context);
294      }
295    }
296  }
297
298  /**
299   * Receive notification that a new Context is available.
300   */
301  public final class DriverRestartActiveContextStateHandler implements EventHandler<ActiveContext> {
302    @Override
303    public void onNext(final ActiveContext context) {
304      synchronized (ReefEventStateManager.this) {
305        LOG.log(Level.INFO, "Active Context {0} received and handled in state handler during driver restart.", context);
306        evaluators.put(context.getEvaluatorId(), context.getEvaluatorDescriptor());
307        contexts.put(context.getId(), context);
308      }
309    }
310  }
311
312  /**
313   * Receive notification from the client.
314   */
315  public final class ClientMessageStateHandler implements EventHandler<byte[]> {
316    @Override
317    public void onNext(final byte[] message) {
318      synchronized (ReefEventStateManager.this) {
319        LOG.log(Level.INFO, "ClientMessageStateHandler OnNext called");
320      }
321    }
322  }
323}