This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.examples.suspend;
020
021import org.apache.reef.client.RunningJob;
022import org.apache.reef.tang.annotations.Name;
023import org.apache.reef.tang.annotations.NamedParameter;
024import org.apache.reef.tang.annotations.Parameter;
025import org.apache.reef.wake.EStage;
026import org.apache.reef.wake.EventHandler;
027import org.apache.reef.wake.impl.ThreadPoolStage;
028import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
029import org.apache.reef.wake.remote.impl.TransportEvent;
030import org.apache.reef.wake.remote.transport.Transport;
031import org.apache.reef.wake.remote.transport.TransportFactory;
032
033import javax.inject.Inject;
034import java.io.IOException;
035import java.util.logging.Level;
036import java.util.logging.Logger;
037
038/**
039 * (Wake) listener to get suspend/resume commands from Control process.
040 */
041public class SuspendClientControl implements AutoCloseable {
042
043  private static final Logger LOG = Logger.getLogger(Control.class.getName());
044  private static final ObjectSerializableCodec<byte[]> CODEC = new ObjectSerializableCodec<>();
045  private final transient Transport transport;
046  private transient RunningJob runningJob;
047
048
049  @Inject
050  public SuspendClientControl(
051      @Parameter(SuspendClientControl.Port.class) final int port,
052      final TransportFactory tpFactory) throws IOException {
053
054    LOG.log(Level.INFO, "Listen to control port {0}", port);
055
056    final EStage<TransportEvent> stage = new ThreadPoolStage<>(
057        "suspend-control-server", new ControlMessageHandler(), 1, new EventHandler<Throwable>() {
058          @Override
059          public void onNext(final Throwable throwable) {
060            throw new RuntimeException(throwable);
061          }
062        });
063
064    this.transport = tpFactory.newInstance("localhost", port, stage, stage, 1, 10000);
065  }
066
067  public synchronized void setRunningJob(final RunningJob job) {
068    this.runningJob = job;
069  }
070
071  @Override
072  public void close() throws Exception {
073    this.transport.close();
074  }
075
076  /**
077   * Port for suspend/resume control commands.
078   */
079  @NamedParameter(doc = "Port for suspend/resume control commands",
080      short_name = "port", default_value = "7008")
081  public static final class Port implements Name<Integer> {
082  }
083
084  /**
085   * Forward remote message to the job driver.
086   */
087  private class ControlMessageHandler implements EventHandler<TransportEvent> {
088    @Override
089    public synchronized void onNext(final TransportEvent msg) {
090      LOG.log(Level.INFO, "Control message: {0} destination: {1}",
091          new Object[]{CODEC.decode(msg.getData()), runningJob});
092      if (runningJob != null) {
093        runningJob.send(msg.getData());
094      }
095    }
096  }
097}