001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.standalone.driver; 020 021import com.jcraft.jsch.*; 022import org.apache.reef.driver.evaluator.EvaluatorProcess; 023import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent; 024import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent; 025import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent; 026import org.apache.reef.runtime.common.driver.evaluator.pojos.State; 027import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent; 028import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl; 029import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl; 030import org.apache.reef.runtime.common.files.FileResource; 031import org.apache.reef.runtime.common.files.REEFFileNames; 032import org.apache.reef.runtime.common.parameters.JVMHeapSlack; 033import org.apache.reef.runtime.common.utils.RemoteManager; 034import org.apache.reef.runtime.standalone.client.parameters.RootFolder; 035import org.apache.reef.runtime.local.process.ReefRunnableProcessObserver; 036import org.apache.reef.runtime.standalone.client.parameters.SshPortNum; 037import org.apache.reef.runtime.yarn.driver.REEFEventHandlers; 038import org.apache.reef.runtime.standalone.client.parameters.NodeFolder; 039import org.apache.reef.runtime.standalone.client.parameters.NodeInfoSet; 040import org.apache.reef.tang.annotations.Parameter; 041import org.apache.reef.tang.exceptions.BindException; 042import org.apache.reef.tang.formats.ConfigurationSerializer; 043import org.apache.reef.util.CollectionUtils; 044import org.apache.reef.util.Optional; 045 046import javax.inject.Inject; 047import java.io.File; 048import java.io.IOException; 049import java.util.*; 050import java.util.logging.Level; 051import java.util.logging.Logger; 052 053/** 054 * Management module for remote nodes in standalone runtime. 055 */ 056public final class RemoteNodeManager { 057 058 private static final Logger LOG = Logger.getLogger(RemoteNodeManager.class.getName()); 059 060 private final ThreadGroup containerThreads = new ThreadGroup("SshContainerManagerThreadGroup"); 061 062 /** 063 * Map from containerID -> SshProcessContainer. 064 */ 065 private final Map<String, SshProcessContainer> containers = new HashMap<>(); 066 067 private final ConfigurationSerializer configurationSerializer; 068 private final REEFFileNames fileNames; 069 private final double jvmHeapFactor; 070 private final REEFEventHandlers reefEventHandlers; 071 private final String errorHandlerRID; 072 private final Set<String> nodeInfoSet; 073 private Iterator<String> nodeSetIterator; 074 private final ReefRunnableProcessObserver processObserver; 075 private final String rootFolder; 076 private final String nodeFolder; 077 private final int sshPortNum; 078 079 @Inject 080 RemoteNodeManager(final ConfigurationSerializer configurationSerializer, 081 final REEFFileNames fileNames, 082 final RemoteManager remoteManager, 083 final REEFEventHandlers reefEventHandlers, 084 final ReefRunnableProcessObserver processObserver, 085 @Parameter(JVMHeapSlack.class) final double jvmHeapSlack, 086 @Parameter(NodeInfoSet.class) final Set<String> nodeInfoSet, 087 @Parameter(RootFolder.class) final String rootFolder, 088 @Parameter(NodeFolder.class) final String nodeFolder, 089 @Parameter(SshPortNum.class) final int sshPortNum) { 090 this.configurationSerializer = configurationSerializer; 091 this.fileNames = fileNames; 092 this.processObserver = processObserver; 093 this.errorHandlerRID = remoteManager.getMyIdentifier(); 094 this.reefEventHandlers = reefEventHandlers; 095 this.jvmHeapFactor = 1.0 - jvmHeapSlack; 096 this.nodeInfoSet = nodeInfoSet; 097 this.rootFolder = rootFolder; 098 this.nodeFolder = nodeFolder; 099 this.sshPortNum = sshPortNum; 100 101 this.nodeSetIterator = this.nodeInfoSet.iterator(); 102 103 LOG.log(Level.FINEST, "Initialized RemoteNodeManager."); 104 } 105 106 private void release(final String containerID) { 107 synchronized (this.containers) { 108 final SshProcessContainer sshProcessContainer = this.containers.get(containerID); 109 if (null != sshProcessContainer) { 110 LOG.log(Level.INFO, "Releasing Container with containerId [{0}]", sshProcessContainer); 111 if (sshProcessContainer.isRunning()) { 112 sshProcessContainer.close(); 113 } 114 this.containers.remove(containerID); 115 } else { 116 LOG.log(Level.INFO, "Ignoring release request for unknown containerID [{0}]", containerID); 117 } 118 } 119 } 120 121 void onResourceLaunchRequest(final ResourceLaunchEvent resourceLaunchEvent) { 122 LOG.log(Level.INFO, "RemoteNodeManager:onResourceLaunchRequest"); 123 124 // connect to the remote node. 125 final String remoteNode; 126 try { 127 synchronized (this.nodeSetIterator) { 128 remoteNode = this.getNode(); 129 } 130 } catch (Exception e) { 131 throw new RuntimeException("Unable to get remote node", e); 132 } 133 final String username; 134 final String hostname; 135 if (remoteNode.indexOf('@') < 0) { 136 username = System.getProperty("user.name"); 137 hostname = remoteNode; 138 } else { 139 username = remoteNode.substring(0, remoteNode.indexOf('@')); 140 hostname = remoteNode.substring(remoteNode.indexOf('@') + 1, remoteNode.length()); 141 } 142 final String userHomeDir = System.getProperty("user.home"); 143 final String privatekey = userHomeDir + "/.ssh/id_dsa"; 144 145 synchronized (this.containers) { 146 try { 147 final JSch remoteConnection = new JSch(); 148 remoteConnection.addIdentity(privatekey); 149 final Session sshSession = remoteConnection.getSession(username, hostname, sshPortNum); 150 151 final Properties jschConfig = new Properties(); 152 jschConfig.put("StrictHostKeyChecking", "no"); 153 sshSession.setConfig(jschConfig); 154 155 try { 156 sshSession.connect(); 157 } catch (JSchException ex) { 158 throw new RuntimeException("Unable to connect to " + remoteNode + ". " + 159 "Check your authorized_keys settings. It should contain the public key of " + privatekey, ex); 160 } 161 162 LOG.log(Level.FINEST, "Established connection with {0}", hostname); 163 164 final SshProcessContainer sshProcessContainer = this.containers.get(resourceLaunchEvent.getIdentifier()) 165 .withRemoteConnection(sshSession, remoteNode); 166 167 // Add the global files and libraries. 168 sshProcessContainer.addGlobalFiles(this.fileNames.getGlobalFolder()); 169 sshProcessContainer.addLocalFiles(getLocalFiles(resourceLaunchEvent)); 170 171 // Make the configuration file of the evaluator. 172 final File evaluatorConfigurationFile = 173 new File(sshProcessContainer.getFolder(), fileNames.getEvaluatorConfigurationPath()); 174 175 try { 176 this.configurationSerializer.toFile(resourceLaunchEvent.getEvaluatorConf(), evaluatorConfigurationFile); 177 } catch (final IOException | BindException e) { 178 throw new RuntimeException("Unable to write configuration.", e); 179 } 180 181 // Copy files to remote node 182 final Channel channel = sshSession.openChannel("exec"); 183 final String mkdirCommand = "mkdir " + nodeFolder; 184 ((ChannelExec) channel).setCommand(mkdirCommand); 185 channel.connect(); 186 187 final List<String> copyCommand = 188 new ArrayList<>(Arrays.asList("scp", "-r", 189 sshProcessContainer.getFolder().toString(), 190 remoteNode + ":~/" + nodeFolder + "/" + sshProcessContainer.getContainerID())); 191 LOG.log(Level.INFO, "Copying files: {0}", copyCommand); 192 final Process copyProcess = new ProcessBuilder(copyCommand).start(); 193 try { 194 copyProcess.waitFor(); 195 } catch (final InterruptedException ex) { 196 throw new RuntimeException("Copying Interrupted: ", ex); 197 } 198 199 final List<String> command = getLaunchCommand(resourceLaunchEvent, sshProcessContainer.getMemory()); 200 LOG.log(Level.FINEST, "Launching container: {0}", sshProcessContainer); 201 sshProcessContainer.run(command); 202 } catch (final JSchException | IOException ex) { 203 LOG.log(Level.WARNING, "Failed to establish connection with {0}@{1}:\n Exception:{2}", 204 new Object[]{username, hostname, ex}); 205 } 206 } 207 } 208 209 private String getNode() { 210 if (!nodeSetIterator.hasNext()) { 211 nodeSetIterator = this.nodeInfoSet.iterator(); 212 } 213 return nodeSetIterator.next().trim(); 214 } 215 216 private static List<File> getLocalFiles(final ResourceLaunchEvent launchRequest) { 217 final List<File> files = new ArrayList<>(); // Libraries local to this evaluator 218 for (final FileResource frp : launchRequest.getFileSet()) { 219 files.add(new File(frp.getPath()).getAbsoluteFile()); 220 } 221 return files; 222 } 223 224 private List<String> getLaunchCommand(final ResourceLaunchEvent launchRequest, 225 final int containerMemory) { 226 final EvaluatorProcess process = launchRequest.getProcess() 227 .setConfigurationFileName(this.fileNames.getEvaluatorConfigurationPath()); 228 229 if (process.isOptionSet()) { 230 return process.getCommandLine(); 231 } else { 232 return process 233 .setMemory((int) (this.jvmHeapFactor * containerMemory)) 234 .getCommandLine(); 235 } 236 } 237 238 void onResourceRequest(final ResourceRequestEvent resourceRequestEvent) { 239 final Optional<String> node = selectNode(resourceRequestEvent); 240 final String nodeId; 241 242 if (node.isPresent()) { 243 nodeId = node.get(); 244 } else { 245 // Allocate new container 246 nodeId = this.getNode() + ":" + String.valueOf(sshPortNum); 247 } 248 249 final String processID = nodeId + "-" + String.valueOf(System.currentTimeMillis()); 250 final File processFolder = new File(this.rootFolder, processID); 251 252 final SshProcessContainer sshProcessContainer = new SshProcessContainer(errorHandlerRID, nodeId, processID, 253 processFolder, resourceRequestEvent.getMemorySize().get(), resourceRequestEvent.getVirtualCores().get(), 254 null, this.fileNames, this.nodeFolder, this.processObserver, this.containerThreads); 255 256 this.containers.put(processID, sshProcessContainer); 257 258 final ResourceAllocationEvent alloc = ResourceEventImpl.newAllocationBuilder() 259 .setIdentifier(processID) 260 .setNodeId(nodeId) 261 .setResourceMemory(resourceRequestEvent.getMemorySize().get()) 262 .setVirtualCores(resourceRequestEvent.getVirtualCores().get()) 263 .setRuntimeName("STANDALONE") 264 .build(); 265 reefEventHandlers.onResourceAllocation(alloc); 266 267 // set the status as RUNNING. 268 updateRuntimeStatus(); 269 } 270 271 void onResourceReleaseRequest(final ResourceReleaseEvent releaseRequest) { 272 synchronized (this.containers) { 273 LOG.log(Level.FINEST, "Release container: {0}", releaseRequest.getIdentifier()); 274 this.release(releaseRequest.getIdentifier()); 275 } 276 } 277 278 public synchronized void close() { 279 synchronized (this.containers) { 280 if (this.containers.isEmpty()) { 281 LOG.log(Level.FINEST, "Clean shutdown with no outstanding containers."); 282 } else { 283 LOG.log(Level.WARNING, "Dirty shutdown with outstanding containers."); 284 for (final SshProcessContainer c : this.containers.values()) { 285 LOG.log(Level.WARNING, "Force shutdown of: {0}", c); 286 c.close(); 287 } 288 } 289 } 290 } 291 292 private Optional<String> selectNode(final ResourceRequestEvent resourceRequestEvent) { 293 if (CollectionUtils.isNotEmpty(resourceRequestEvent.getNodeNameList())) { 294 for (final String nodeName : resourceRequestEvent.getNodeNameList()) { 295 return Optional.of(nodeName); 296 } 297 } 298 if (CollectionUtils.isNotEmpty(resourceRequestEvent.getRackNameList())) { 299 for (final String nodeName : resourceRequestEvent.getRackNameList()) { 300 return Optional.of(nodeName); 301 } 302 } 303 return Optional.empty(); 304 } 305 306 private synchronized void updateRuntimeStatus() { 307 final RuntimeStatusEventImpl.Builder builder = RuntimeStatusEventImpl.newBuilder() 308 .setName("STANDALONE") 309 .setState(State.RUNNING); 310 311 this.reefEventHandlers.onRuntimeStatus(builder.build()); 312 } 313}