This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.standalone.driver;
020
021import com.jcraft.jsch.*;
022import org.apache.reef.driver.evaluator.EvaluatorProcess;
023import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent;
024import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent;
025import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent;
026import org.apache.reef.runtime.common.driver.evaluator.pojos.State;
027import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent;
028import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl;
029import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl;
030import org.apache.reef.runtime.common.files.FileResource;
031import org.apache.reef.runtime.common.files.REEFFileNames;
032import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
033import org.apache.reef.runtime.common.utils.RemoteManager;
034import org.apache.reef.runtime.standalone.client.parameters.RootFolder;
035import org.apache.reef.runtime.local.process.ReefRunnableProcessObserver;
036import org.apache.reef.runtime.standalone.client.parameters.SshPortNum;
037import org.apache.reef.runtime.yarn.driver.REEFEventHandlers;
038import org.apache.reef.runtime.standalone.client.parameters.NodeFolder;
039import org.apache.reef.runtime.standalone.client.parameters.NodeInfoSet;
040import org.apache.reef.tang.annotations.Parameter;
041import org.apache.reef.tang.exceptions.BindException;
042import org.apache.reef.tang.formats.ConfigurationSerializer;
043import org.apache.reef.util.CollectionUtils;
044import org.apache.reef.util.Optional;
045
046import javax.inject.Inject;
047import java.io.File;
048import java.io.IOException;
049import java.util.*;
050import java.util.logging.Level;
051import java.util.logging.Logger;
052
053/**
054 * Management module for remote nodes in standalone runtime.
055 */
056public final class RemoteNodeManager {
057
058  private static final Logger LOG = Logger.getLogger(RemoteNodeManager.class.getName());
059
060  private final ThreadGroup containerThreads = new ThreadGroup("SshContainerManagerThreadGroup");
061
062  /**
063   * Map from containerID -> SshProcessContainer.
064   */
065  private final Map<String, SshProcessContainer> containers = new HashMap<>();
066
067  private final ConfigurationSerializer configurationSerializer;
068  private final REEFFileNames fileNames;
069  private final double jvmHeapFactor;
070  private final REEFEventHandlers reefEventHandlers;
071  private final String errorHandlerRID;
072  private final Set<String> nodeInfoSet;
073  private Iterator<String> nodeSetIterator;
074  private final ReefRunnableProcessObserver processObserver;
075  private final String rootFolder;
076  private final String nodeFolder;
077  private final int sshPortNum;
078
079  @Inject
080  RemoteNodeManager(final ConfigurationSerializer configurationSerializer,
081                    final REEFFileNames fileNames,
082                    final RemoteManager remoteManager,
083                    final REEFEventHandlers reefEventHandlers,
084                    final ReefRunnableProcessObserver processObserver,
085                    @Parameter(JVMHeapSlack.class) final double jvmHeapSlack,
086                    @Parameter(NodeInfoSet.class) final Set<String> nodeInfoSet,
087                    @Parameter(RootFolder.class) final String rootFolder,
088                    @Parameter(NodeFolder.class) final String nodeFolder,
089                    @Parameter(SshPortNum.class) final int sshPortNum) {
090    this.configurationSerializer = configurationSerializer;
091    this.fileNames = fileNames;
092    this.processObserver = processObserver;
093    this.errorHandlerRID = remoteManager.getMyIdentifier();
094    this.reefEventHandlers = reefEventHandlers;
095    this.jvmHeapFactor = 1.0 - jvmHeapSlack;
096    this.nodeInfoSet = nodeInfoSet;
097    this.rootFolder = rootFolder;
098    this.nodeFolder = nodeFolder;
099    this.sshPortNum = sshPortNum;
100
101    this.nodeSetIterator = this.nodeInfoSet.iterator();
102
103    LOG.log(Level.FINEST, "Initialized RemoteNodeManager.");
104  }
105
106  private void release(final String containerID) {
107    synchronized (this.containers) {
108      final SshProcessContainer sshProcessContainer = this.containers.get(containerID);
109      if (null != sshProcessContainer) {
110        LOG.log(Level.INFO, "Releasing Container with containerId [{0}]", sshProcessContainer);
111        if (sshProcessContainer.isRunning()) {
112          sshProcessContainer.close();
113        }
114        this.containers.remove(containerID);
115      } else {
116        LOG.log(Level.INFO, "Ignoring release request for unknown containerID [{0}]", containerID);
117      }
118    }
119  }
120
121  void onResourceLaunchRequest(final ResourceLaunchEvent resourceLaunchEvent) {
122    LOG.log(Level.INFO, "RemoteNodeManager:onResourceLaunchRequest");
123
124    // connect to the remote node.
125    final String remoteNode;
126    try {
127      synchronized (this.nodeSetIterator) {
128        remoteNode = this.getNode();
129      }
130    } catch (Exception e) {
131      throw new RuntimeException("Unable to get remote node", e);
132    }
133    final String username;
134    final String hostname;
135    if (remoteNode.indexOf('@') < 0) {
136      username = System.getProperty("user.name");
137      hostname = remoteNode;
138    } else {
139      username = remoteNode.substring(0, remoteNode.indexOf('@'));
140      hostname = remoteNode.substring(remoteNode.indexOf('@') + 1, remoteNode.length());
141    }
142    final String userHomeDir = System.getProperty("user.home");
143    final String privatekey = userHomeDir + "/.ssh/id_dsa";
144
145    synchronized (this.containers) {
146      try {
147        final JSch remoteConnection = new JSch();
148        remoteConnection.addIdentity(privatekey);
149        final Session sshSession = remoteConnection.getSession(username, hostname, sshPortNum);
150
151        final Properties jschConfig = new Properties();
152        jschConfig.put("StrictHostKeyChecking", "no");
153        sshSession.setConfig(jschConfig);
154
155        try {
156          sshSession.connect();
157        } catch (JSchException ex) {
158          throw new RuntimeException("Unable to connect to " + remoteNode + ". " +
159              "Check your authorized_keys settings. It should contain the public key of " + privatekey, ex);
160        }
161
162        LOG.log(Level.FINEST, "Established connection with {0}", hostname);
163
164        final SshProcessContainer sshProcessContainer = this.containers.get(resourceLaunchEvent.getIdentifier())
165            .withRemoteConnection(sshSession, remoteNode);
166
167        // Add the global files and libraries.
168        sshProcessContainer.addGlobalFiles(this.fileNames.getGlobalFolder());
169        sshProcessContainer.addLocalFiles(getLocalFiles(resourceLaunchEvent));
170
171        // Make the configuration file of the evaluator.
172        final File evaluatorConfigurationFile =
173            new File(sshProcessContainer.getFolder(), fileNames.getEvaluatorConfigurationPath());
174
175        try {
176          this.configurationSerializer.toFile(resourceLaunchEvent.getEvaluatorConf(), evaluatorConfigurationFile);
177        } catch (final IOException | BindException e) {
178          throw new RuntimeException("Unable to write configuration.", e);
179        }
180
181        // Copy files to remote node
182        final Channel channel = sshSession.openChannel("exec");
183        final String mkdirCommand = "mkdir " + nodeFolder;
184        ((ChannelExec) channel).setCommand(mkdirCommand);
185        channel.connect();
186
187        final List<String> copyCommand =
188            new ArrayList<>(Arrays.asList("scp", "-r",
189                sshProcessContainer.getFolder().toString(),
190                remoteNode + ":~/" + nodeFolder + "/" + sshProcessContainer.getContainerID()));
191        LOG.log(Level.INFO, "Copying files: {0}", copyCommand);
192        final Process copyProcess = new ProcessBuilder(copyCommand).start();
193        try {
194          copyProcess.waitFor();
195        } catch (final InterruptedException ex) {
196          throw new RuntimeException("Copying Interrupted: ", ex);
197        }
198
199        final List<String> command = getLaunchCommand(resourceLaunchEvent, sshProcessContainer.getMemory());
200        LOG.log(Level.FINEST, "Launching container: {0}", sshProcessContainer);
201        sshProcessContainer.run(command);
202      } catch (final JSchException | IOException ex) {
203        LOG.log(Level.WARNING, "Failed to establish connection with {0}@{1}:\n Exception:{2}",
204            new Object[]{username, hostname, ex});
205      }
206    }
207  }
208
209  private String getNode() {
210    if (!nodeSetIterator.hasNext()) {
211      nodeSetIterator = this.nodeInfoSet.iterator();
212    }
213    return nodeSetIterator.next().trim();
214  }
215
216  private static List<File> getLocalFiles(final ResourceLaunchEvent launchRequest) {
217    final List<File> files = new ArrayList<>();  // Libraries local to this evaluator
218    for (final FileResource frp : launchRequest.getFileSet()) {
219      files.add(new File(frp.getPath()).getAbsoluteFile());
220    }
221    return files;
222  }
223
224  private List<String> getLaunchCommand(final ResourceLaunchEvent launchRequest,
225                                        final int containerMemory) {
226    final EvaluatorProcess process = launchRequest.getProcess()
227        .setConfigurationFileName(this.fileNames.getEvaluatorConfigurationPath());
228
229    if (process.isOptionSet()) {
230      return process.getCommandLine();
231    } else {
232      return process
233          .setMemory((int) (this.jvmHeapFactor * containerMemory))
234          .getCommandLine();
235    }
236  }
237
238  void onResourceRequest(final ResourceRequestEvent resourceRequestEvent) {
239    final Optional<String> node = selectNode(resourceRequestEvent);
240    final String nodeId;
241
242    if (node.isPresent()) {
243      nodeId = node.get();
244    } else {
245      // Allocate new container
246      nodeId = this.getNode() + ":" + String.valueOf(sshPortNum);
247    }
248
249    final String processID = nodeId + "-" + String.valueOf(System.currentTimeMillis());
250    final File processFolder = new File(this.rootFolder, processID);
251
252    final SshProcessContainer sshProcessContainer = new SshProcessContainer(errorHandlerRID, nodeId, processID,
253        processFolder, resourceRequestEvent.getMemorySize().get(), resourceRequestEvent.getVirtualCores().get(),
254        null, this.fileNames, this.nodeFolder, this.processObserver, this.containerThreads);
255
256    this.containers.put(processID, sshProcessContainer);
257
258    final ResourceAllocationEvent alloc = ResourceEventImpl.newAllocationBuilder()
259        .setIdentifier(processID)
260        .setNodeId(nodeId)
261        .setResourceMemory(resourceRequestEvent.getMemorySize().get())
262        .setVirtualCores(resourceRequestEvent.getVirtualCores().get())
263        .setRuntimeName("STANDALONE")
264        .build();
265    reefEventHandlers.onResourceAllocation(alloc);
266
267    // set the status as RUNNING.
268    updateRuntimeStatus();
269  }
270
271  void onResourceReleaseRequest(final ResourceReleaseEvent releaseRequest) {
272    synchronized (this.containers) {
273      LOG.log(Level.FINEST, "Release container: {0}", releaseRequest.getIdentifier());
274      this.release(releaseRequest.getIdentifier());
275    }
276  }
277
278  public synchronized void close() {
279    synchronized (this.containers) {
280      if (this.containers.isEmpty()) {
281        LOG.log(Level.FINEST, "Clean shutdown with no outstanding containers.");
282      } else {
283        LOG.log(Level.WARNING, "Dirty shutdown with outstanding containers.");
284        for (final SshProcessContainer c : this.containers.values()) {
285          LOG.log(Level.WARNING, "Force shutdown of: {0}", c);
286          c.close();
287        }
288      }
289    }
290  }
291
292  private Optional<String> selectNode(final ResourceRequestEvent resourceRequestEvent) {
293    if (CollectionUtils.isNotEmpty(resourceRequestEvent.getNodeNameList())) {
294      for (final String nodeName : resourceRequestEvent.getNodeNameList()) {
295        return Optional.of(nodeName);
296      }
297    }
298    if (CollectionUtils.isNotEmpty(resourceRequestEvent.getRackNameList())) {
299      for (final String nodeName : resourceRequestEvent.getRackNameList()) {
300        return Optional.of(nodeName);
301      }
302    }
303    return Optional.empty();
304  }
305
306  private synchronized void updateRuntimeStatus() {
307    final RuntimeStatusEventImpl.Builder builder = RuntimeStatusEventImpl.newBuilder()
308        .setName("STANDALONE")
309        .setState(State.RUNNING);
310
311    this.reefEventHandlers.onRuntimeStatus(builder.build());
312  }
313}