001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.yarn.driver; 020 021import org.apache.commons.lang.StringUtils; 022import org.apache.hadoop.yarn.api.ApplicationConstants; 023import org.apache.hadoop.yarn.api.records.Container; 024import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; 025import org.apache.hadoop.yarn.api.records.LocalResource; 026import org.apache.reef.driver.evaluator.EvaluatorProcess; 027import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent; 028import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler; 029import org.apache.reef.runtime.common.files.REEFFileNames; 030import org.apache.reef.runtime.common.parameters.JVMHeapSlack; 031import org.apache.reef.runtime.yarn.client.SecurityTokenProvider; 032import org.apache.reef.runtime.yarn.util.YarnTypes; 033import org.apache.reef.runtime.yarn.util.YarnUtilities; 034import org.apache.reef.tang.InjectionFuture; 035import org.apache.reef.tang.annotations.Parameter; 036 037import javax.inject.Inject; 038import java.io.IOException; 039import java.util.List; 040import java.util.Map; 041import java.util.logging.Level; 042import java.util.logging.Logger; 043 044/** 045 * Resource launch handler for YARN. 046 */ 047public final class YARNResourceLaunchHandler implements ResourceLaunchHandler { 048 049 private static final Logger LOG = Logger.getLogger(YARNResourceLaunchHandler.class.getName()); 050 051 private final Containers containers; 052 private final InjectionFuture<YarnContainerManager> yarnContainerManager; 053 private final EvaluatorSetupHelper evaluatorSetupHelper; 054 private final REEFFileNames filenames; 055 private final double jvmHeapFactor; 056 private final SecurityTokenProvider tokenProvider; 057 058 @Inject 059 YARNResourceLaunchHandler(final Containers containers, 060 final InjectionFuture<YarnContainerManager> yarnContainerManager, 061 final EvaluatorSetupHelper evaluatorSetupHelper, 062 final REEFFileNames filenames, 063 @Parameter(JVMHeapSlack.class) final double jvmHeapSlack, 064 final SecurityTokenProvider tokenProvider) { 065 this.jvmHeapFactor = 1.0 - jvmHeapSlack; 066 LOG.log(Level.FINEST, "Instantiating 'YARNResourceLaunchHandler'"); 067 this.containers = containers; 068 this.yarnContainerManager = yarnContainerManager; 069 this.evaluatorSetupHelper = evaluatorSetupHelper; 070 this.filenames = filenames; 071 this.tokenProvider = tokenProvider; 072 LOG.log(Level.FINE, "Instantiated 'YARNResourceLaunchHandler'"); 073 } 074 075 @Override 076 public void onNext(final ResourceLaunchEvent resourceLaunchEvent) { 077 try { 078 079 final String containerId = resourceLaunchEvent.getIdentifier(); 080 LOG.log(Level.FINEST, "TIME: Start ResourceLaunch {0}", containerId); 081 final Container container = this.containers.get(containerId); 082 LOG.log(Level.FINEST, "Setting up container launch container for id={0}", container.getId()); 083 final Map<String, LocalResource> localResources = 084 this.evaluatorSetupHelper.getResources(resourceLaunchEvent); 085 086 final List<String> command = getLaunchCommand(resourceLaunchEvent, container.getResource().getMemory()); 087 if (LOG.isLoggable(Level.FINEST)) { 088 LOG.log(Level.FINEST, 089 "TIME: Run ResourceLaunchProto {0} command: `{1}` with resources: `{2}`", 090 new Object[]{containerId, StringUtils.join(command, ' '), localResources}); 091 } 092 093 final byte[] securityTokensBuffer = this.tokenProvider.getTokens(); 094 final ContainerLaunchContext ctx = YarnTypes.getContainerLaunchContext( 095 command, localResources, securityTokensBuffer, YarnUtilities.getApplicationId()); 096 this.yarnContainerManager.get().submit(container, ctx); 097 098 LOG.log(Level.FINEST, "TIME: End ResourceLaunch {0}", containerId); 099 100 } catch (final IOException e) { 101 LOG.log(Level.WARNING, "Error handling resource launch message: " + resourceLaunchEvent, e); 102 throw new RuntimeException(e); 103 } 104 } 105 106 private List<String> getLaunchCommand(final ResourceLaunchEvent resourceLaunchEvent, 107 final int containerMemory) { 108 final EvaluatorProcess process = resourceLaunchEvent.getProcess() 109 .setConfigurationFileName(this.filenames.getEvaluatorConfigurationPath()) 110 .setStandardErr(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + 111 this.filenames.getEvaluatorStderrFileName()) 112 .setStandardOut(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + 113 this.filenames.getEvaluatorStdoutFileName()); 114 115 if (process.isOptionSet()) { 116 return process.getCommandLine(); 117 } else { 118 return process 119 .setMemory((int) (this.jvmHeapFactor * containerMemory)) 120 .getCommandLine(); 121 } 122 } 123}