001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.examples.suspend; 020 021import org.apache.reef.tang.Configuration; 022import org.apache.reef.tang.Injector; 023import org.apache.reef.tang.JavaConfigurationBuilder; 024import org.apache.reef.tang.Tang; 025import org.apache.reef.tang.annotations.Name; 026import org.apache.reef.tang.annotations.NamedParameter; 027import org.apache.reef.tang.annotations.Parameter; 028import org.apache.reef.tang.exceptions.BindException; 029import org.apache.reef.tang.formats.CommandLine; 030import org.apache.reef.wake.EStage; 031import org.apache.reef.wake.EventHandler; 032import org.apache.reef.wake.impl.LoggingEventHandler; 033import org.apache.reef.wake.impl.ThreadPoolStage; 034import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; 035import org.apache.reef.wake.remote.impl.TransportEvent; 036import org.apache.reef.wake.remote.transport.Link; 037import org.apache.reef.wake.remote.transport.Transport; 038import org.apache.reef.wake.remote.transport.TransportFactory; 039 040import javax.inject.Inject; 041import java.io.IOException; 042import java.net.InetSocketAddress; 043import java.util.logging.Level; 044import java.util.logging.Logger; 045 046/** 047 * Control process which sends suspend/resume commands. 048 */ 049public final class Control { 050 051 private static final Logger LOG = Logger.getLogger(Control.class.getName()); 052 private final transient String command; 053 private final transient String taskId; 054 private final transient int port; 055 private final TransportFactory tpFactory; 056 057 @Inject 058 public Control(@Parameter(SuspendClientControl.Port.class) final int port, 059 @Parameter(TaskId.class) final String taskId, 060 @Parameter(Command.class) final String command, 061 final TransportFactory tpFactory) { 062 this.command = command.trim().toLowerCase(); 063 this.taskId = taskId; 064 this.port = port; 065 this.tpFactory = tpFactory; 066 } 067 068 private static Configuration getConfig(final String[] args) throws IOException, BindException { 069 final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder(); 070 new CommandLine(cb).processCommandLine(args, SuspendClientControl.Port.class, TaskId.class, Command.class); 071 return cb.build(); 072 } 073 074 public static void main(final String[] args) throws Exception { 075 final Configuration config = getConfig(args); 076 final Injector injector = Tang.Factory.getTang().newInjector(config); 077 final Control control = injector.getInstance(Control.class); 078 control.run(); 079 } 080 081 public void run() throws Exception { 082 083 LOG.log(Level.INFO, "command: {0} task: {1} port: {2}", 084 new Object[]{this.command, this.taskId, this.port}); 085 086 final ObjectSerializableCodec<String> codec = new ObjectSerializableCodec<>(); 087 088 final EStage<TransportEvent> stage = new ThreadPoolStage<>("suspend-control-client", 089 new LoggingEventHandler<TransportEvent>(), 1, new EventHandler<Throwable>() { 090 @Override 091 public void onNext(final Throwable throwable) { 092 throw new RuntimeException(throwable); 093 } 094 }); 095 096 try (final Transport transport = tpFactory.newInstance("localhost", 0, stage, stage, 1, 10000)) { 097 final Link link = transport.open(new InetSocketAddress("localhost", this.port), codec, null); 098 link.write(this.command + " " + this.taskId); 099 } 100 } 101 102 /** 103 * Task id. 104 */ 105 @NamedParameter(doc = "Task id", short_name = "task") 106 public static final class TaskId implements Name<String> { 107 } 108 109 /** 110 * Command: 'suspend' or 'resume'. 111 */ 112 @NamedParameter(doc = "Command: 'suspend' or 'resume'", short_name = "cmd") 113 public static final class Command implements Name<String> { 114 } 115}