This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.examples.suspend;
020
021import org.apache.reef.client.RunningJob;
022import org.apache.reef.tang.annotations.Name;
023import org.apache.reef.tang.annotations.NamedParameter;
024import org.apache.reef.tang.annotations.Parameter;
025import org.apache.reef.wake.EStage;
026import org.apache.reef.wake.EventHandler;
027import org.apache.reef.wake.impl.ThreadPoolStage;
028import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
029import org.apache.reef.wake.remote.impl.TransportEvent;
030import org.apache.reef.wake.remote.transport.Transport;
031import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
032
033import javax.inject.Inject;
034import java.io.IOException;
035import java.util.logging.Level;
036import java.util.logging.Logger;
037
038/**
039 * (Wake) listener to get suspend/resume commands from Control process.
040 */
041public class SuspendClientControl implements AutoCloseable {
042
043  private static final Logger LOG = Logger.getLogger(Control.class.getName());
044  private static final ObjectSerializableCodec<byte[]> CODEC = new ObjectSerializableCodec<>();
045  private final transient Transport transport;
046  private transient RunningJob runningJob;
047
048  @Inject
049  public SuspendClientControl(
050      final @Parameter(SuspendClientControl.Port.class) int port) throws IOException {
051
052    LOG.log(Level.INFO, "Listen to control port {0}", port);
053
054    final EStage<TransportEvent> stage = new ThreadPoolStage<>(
055        "suspend-control-server", new ControlMessageHandler(), 1, new EventHandler<Throwable>() {
056      @Override
057      public void onNext(final Throwable throwable) {
058        throw new RuntimeException(throwable);
059      }
060    });
061
062    this.transport = new NettyMessagingTransport("localhost", port, stage, stage, 1, 10000);
063  }
064
065  public synchronized void setRunningJob(final RunningJob job) {
066    this.runningJob = job;
067  }
068
069  @Override
070  public void close() throws Exception {
071    this.transport.close();
072  }
073
074  @NamedParameter(doc = "Port for suspend/resume control commands",
075      short_name = "port", default_value = "7008")
076  public static final class Port implements Name<Integer> {
077  }
078
079  /**
080   * Forward remote message to the job driver.
081   */
082  private class ControlMessageHandler implements EventHandler<TransportEvent> {
083    @Override
084    public synchronized void onNext(final TransportEvent msg) {
085      LOG.log(Level.INFO, "Control message: {0} destination: {1}",
086          new Object[]{CODEC.decode(msg.getData()), runningJob});
087      if (runningJob != null) {
088        runningJob.send(msg.getData());
089      }
090    }
091  }
092}