001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.yarn.driver; 020 021import org.apache.commons.lang.StringUtils; 022import org.apache.hadoop.yarn.api.ApplicationConstants; 023import org.apache.hadoop.yarn.api.records.Container; 024import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; 025import org.apache.hadoop.yarn.api.records.LocalResource; 026import org.apache.reef.proto.DriverRuntimeProtocol; 027import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler; 028import org.apache.reef.runtime.common.files.ClasspathProvider; 029import org.apache.reef.runtime.common.files.REEFFileNames; 030import org.apache.reef.runtime.common.launch.CLRLaunchCommandBuilder; 031import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder; 032import org.apache.reef.runtime.common.launch.LaunchCommandBuilder; 033import org.apache.reef.runtime.common.parameters.JVMHeapSlack; 034import org.apache.reef.runtime.yarn.util.YarnTypes; 035import org.apache.reef.tang.InjectionFuture; 036import org.apache.reef.tang.annotations.Parameter; 037 038import javax.inject.Inject; 039import java.util.List; 040import java.util.Map; 041import java.util.logging.Level; 042import java.util.logging.Logger; 043 044/** 045 * Resource launch handler for YARN. 046 */ 047public final class YARNResourceLaunchHandler implements ResourceLaunchHandler { 048 049 private static final Logger LOG = Logger.getLogger(YARNResourceLaunchHandler.class.getName()); 050 051 private final Containers containers; 052 private final InjectionFuture<YarnContainerManager> yarnContainerManager; 053 private final EvaluatorSetupHelper evaluatorSetupHelper; 054 private final REEFFileNames filenames; 055 private final ClasspathProvider classpath; 056 private final double jvmHeapFactor; 057 058 @Inject 059 YARNResourceLaunchHandler(final Containers containers, 060 final InjectionFuture<YarnContainerManager> yarnContainerManager, 061 final EvaluatorSetupHelper evaluatorSetupHelper, 062 final REEFFileNames filenames, 063 final ClasspathProvider classpath, 064 final @Parameter(JVMHeapSlack.class) double jvmHeapSlack) { 065 this.jvmHeapFactor = 1.0 - jvmHeapSlack; 066 LOG.log(Level.FINEST, "Instantiating 'YARNResourceLaunchHandler'"); 067 this.containers = containers; 068 this.yarnContainerManager = yarnContainerManager; 069 this.evaluatorSetupHelper = evaluatorSetupHelper; 070 this.filenames = filenames; 071 this.classpath = classpath; 072 LOG.log(Level.FINE, "Instantiated 'YARNResourceLaunchHandler'"); 073 } 074 075 @Override 076 public void onNext(final DriverRuntimeProtocol.ResourceLaunchProto resourceLaunchProto) { 077 try { 078 079 final String containerId = resourceLaunchProto.getIdentifier(); 080 LOG.log(Level.FINEST, "TIME: Start ResourceLaunchProto {0}", containerId); 081 final Container container = this.containers.get(containerId); 082 LOG.log(Level.FINEST, "Setting up container launch container for id={0}", container.getId()); 083 final Map<String, LocalResource> localResources = 084 this.evaluatorSetupHelper.getResources(resourceLaunchProto); 085 086 final LaunchCommandBuilder commandBuilder; 087 switch (resourceLaunchProto.getType()) { 088 case JVM: 089 commandBuilder = new JavaLaunchCommandBuilder() 090 .setClassPath(this.classpath.getEvaluatorClasspath()); 091 break; 092 case CLR: 093 commandBuilder = new CLRLaunchCommandBuilder(); 094 break; 095 default: 096 throw new IllegalArgumentException( 097 "Unsupported container type: " + resourceLaunchProto.getType()); 098 } 099 100 final List<String> command = commandBuilder 101 .setErrorHandlerRID(resourceLaunchProto.getRemoteId()) 102 .setLaunchID(resourceLaunchProto.getIdentifier()) 103 .setConfigurationFileName(this.filenames.getEvaluatorConfigurationPath()) 104 .setMemory((int) (this.jvmHeapFactor * container.getResource().getMemory())) 105 .setStandardErr(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.filenames.getEvaluatorStderrFileName()) 106 .setStandardOut(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.filenames.getEvaluatorStdoutFileName()) 107 .build(); 108 109 if (LOG.isLoggable(Level.FINEST)) { 110 LOG.log(Level.FINEST, 111 "TIME: Run ResourceLaunchProto {0} command: `{1}` with resources: `{2}`", 112 new Object[]{containerId, StringUtils.join(command, ' '), localResources}); 113 } 114 115 final ContainerLaunchContext ctx = YarnTypes.getContainerLaunchContext(command, localResources); 116 this.yarnContainerManager.get().submit(container, ctx); 117 118 LOG.log(Level.FINEST, "TIME: End ResourceLaunchProto {0}", containerId); 119 120 } catch (final Throwable e) { 121 LOG.log(Level.WARNING, "Error handling resource launch message: " + resourceLaunchProto, e); 122 throw new RuntimeException(e); 123 } 124 } 125}