001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.yarn.driver; 020 021import org.apache.hadoop.yarn.api.records.Priority; 022import org.apache.hadoop.yarn.api.records.Resource; 023import org.apache.hadoop.yarn.client.api.AMRMClient; 024import org.apache.hadoop.yarn.util.Records; 025import org.apache.reef.annotations.audience.DriverSide; 026import org.apache.reef.annotations.audience.Private; 027import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent; 028import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler; 029 030import javax.inject.Inject; 031import java.util.logging.Level; 032import java.util.logging.Logger; 033 034/** 035 * Accepts resource requests from the REEF layer, translates them into requests for YARN and hands them to the 036 * appropriate handler for those. 037 */ 038@DriverSide 039@Private 040public final class YarnResourceRequestHandler implements ResourceRequestHandler { 041 042 private static final Logger LOG = Logger.getLogger(YarnResourceRequestHandler.class.getName()); 043 private final YarnContainerRequestHandler yarnContainerRequestHandler; 044 private final ApplicationMasterRegistration registration; 045 046 @Inject 047 YarnResourceRequestHandler(final YarnContainerRequestHandler yarnContainerRequestHandler, 048 final ApplicationMasterRegistration registration) { 049 this.yarnContainerRequestHandler = yarnContainerRequestHandler; 050 this.registration = registration; 051 } 052 053 @Override 054 public synchronized void onNext(final ResourceRequestEvent resourceRequestEvent) { 055 LOG.log(Level.FINEST, "Got ResourceRequestEvent in YarnResourceRequestHandler: memory = {0}, cores = {1}.", 056 new Object[]{resourceRequestEvent.getMemorySize(), resourceRequestEvent.getVirtualCores()}); 057 058 final String[] nodes = resourceRequestEvent.getNodeNameList().size() == 0 ? null : 059 resourceRequestEvent.getNodeNameList().toArray(new String[resourceRequestEvent.getNodeNameList().size()]); 060 final String[] racks = resourceRequestEvent.getRackNameList().size() == 0 ? null : 061 resourceRequestEvent.getRackNameList().toArray(new String[resourceRequestEvent.getRackNameList().size()]); 062 063 // set the priority for the request 064 final Priority pri = getPriority(resourceRequestEvent); 065 final Resource resource = getResource(resourceRequestEvent); 066 final boolean relaxLocality = resourceRequestEvent.getRelaxLocality().orElse(true); 067 068 final AMRMClient.ContainerRequest[] containerRequests = 069 new AMRMClient.ContainerRequest[resourceRequestEvent.getResourceCount()]; 070 071 for (int i = 0; i < resourceRequestEvent.getResourceCount(); i++) { 072 containerRequests[i] = new AMRMClient.ContainerRequest(resource, nodes, racks, pri, relaxLocality); 073 } 074 this.yarnContainerRequestHandler.onContainerRequest(containerRequests); 075 } 076 077 private synchronized Resource getResource(final ResourceRequestEvent resourceRequestEvent) { 078 final Resource result = Records.newRecord(Resource.class); 079 final int memory = getMemory(resourceRequestEvent.getMemorySize().get()); 080 final int core = resourceRequestEvent.getVirtualCores().get(); 081 LOG.log(Level.FINEST, "Resource requested: memory = {0}, virtual core count = {1}.", new Object[]{memory, core}); 082 result.setMemory(memory); 083 result.setVirtualCores(core); 084 return result; 085 } 086 087 private synchronized Priority getPriority(final ResourceRequestEvent resourceRequestEvent) { 088 final Priority pri = Records.newRecord(Priority.class); 089 pri.setPriority(resourceRequestEvent.getPriority().orElse(1)); 090 return pri; 091 } 092 093 private synchronized int getMemory(final int requestedMemory) { 094 final int result; 095 if (!this.registration.isPresent()) { 096 LOG.log(Level.WARNING, "AM doesn't seem to be registered. Proceed with fingers crossed."); 097 result = requestedMemory; 098 } else { 099 final int maxMemory = registration.getRegistration().getMaximumResourceCapability().getMemory(); 100 if (requestedMemory > maxMemory) { 101 LOG.log(Level.WARNING, "Asking for {0}MB of memory, but max on this cluster is {1}MB ", 102 new Object[]{requestedMemory, maxMemory}); 103 result = maxMemory; 104 } else { 105 result = requestedMemory; 106 } 107 } 108 return result; 109 } 110 111 112}