This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.driver;
020
021import org.apache.reef.driver.parameters.DriverRestartHandler;
022import org.apache.reef.driver.parameters.ServiceDriverRestartedHandlers;
023import org.apache.reef.driver.restart.DriverRestartManager;
024import org.apache.reef.driver.restart.DriverRestarted;
025import org.apache.reef.exception.DriverFatalRuntimeException;
026import org.apache.reef.tang.annotations.Parameter;
027import org.apache.reef.wake.EventHandler;
028import org.apache.reef.wake.time.event.StartTime;
029
030import javax.inject.Inject;
031import java.util.ArrayList;
032import java.util.List;
033import java.util.Set;
034import java.util.logging.Level;
035import java.util.logging.Logger;
036
037/**
038 * This is bound to the start event of the clock and dispatches it to the appropriate application code.
039 */
040public final class DriverStartHandler implements EventHandler<StartTime> {
041  private static final Logger LOG = Logger.getLogger(DriverStartHandler.class.getName());
042
043  private final Set<EventHandler<StartTime>> startHandlers;
044  private final Set<EventHandler<DriverRestarted>> restartHandlers;
045  private final Set<EventHandler<DriverRestarted>> serviceRestartHandlers;
046  private final DriverRestartManager driverRestartManager;
047
048  @Inject
049  DriverStartHandler(@Parameter(org.apache.reef.driver.parameters.DriverStartHandler.class)
050                     final Set<EventHandler<StartTime>> startHandlers,
051                     @Parameter(DriverRestartHandler.class)
052                     final Set<EventHandler<DriverRestarted>> restartHandlers,
053                     @Parameter(ServiceDriverRestartedHandlers.class)
054                     final Set<EventHandler<DriverRestarted>> serviceRestartHandlers,
055                     final DriverRestartManager driverRestartManager) {
056    this.startHandlers = startHandlers;
057    this.restartHandlers = restartHandlers;
058    this.serviceRestartHandlers = serviceRestartHandlers;
059    this.driverRestartManager = driverRestartManager;
060    LOG.log(Level.FINE, "Instantiated `DriverStartHandler with StartHandlers [{0}], RestartHandlers [{1}]," +
061            "and ServiceRestartHandlers [{2}].",
062        new String[] {this.startHandlers.toString(), this.restartHandlers.toString(),
063            this.serviceRestartHandlers.toString()});
064  }
065
066  @Override
067  public void onNext(final StartTime startTime) {
068    if (this.driverRestartManager.detectRestart()) {
069      this.onRestart(startTime);
070    } else {
071      this.onStart(startTime);
072    }
073  }
074
075  private void onRestart(final StartTime startTime) {
076    if (this.restartHandlers.size() > 0) {
077      final List<EventHandler<DriverRestarted>> orderedRestartHandlers =
078          new ArrayList<>(this.serviceRestartHandlers.size() + this.restartHandlers.size());
079
080      orderedRestartHandlers.addAll(this.serviceRestartHandlers);
081      orderedRestartHandlers.addAll(this.restartHandlers);
082
083      // This can only be called after calling client restart handlers because REEF.NET
084      // JobDriver requires making this call to set up the InterOp handlers.
085      this.driverRestartManager.onRestart(startTime, orderedRestartHandlers);
086    } else {
087      throw new DriverFatalRuntimeException("Driver restart happened, but no ON_DRIVER_RESTART handler is bound.");
088    }
089  }
090
091  private void onStart(final StartTime startTime) {
092    for (final EventHandler<StartTime> startHandler : this.startHandlers) {
093      startHandler.onNext(startTime);
094    }
095  }
096}