001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.runtime.yarn.driver; 020 021import org.apache.hadoop.yarn.api.records.Priority; 022import org.apache.hadoop.yarn.api.records.Resource; 023import org.apache.hadoop.yarn.client.api.AMRMClient; 024import org.apache.hadoop.yarn.util.Records; 025import org.apache.reef.annotations.audience.DriverSide; 026import org.apache.reef.annotations.audience.Private; 027import org.apache.reef.proto.DriverRuntimeProtocol; 028import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler; 029 030import javax.inject.Inject; 031import java.util.logging.Level; 032import java.util.logging.Logger; 033 034/** 035 * Accepts resource requests from the REEF layer, translates them into requests for YARN and hands them to the 036 * appropriate handler for those. 037 */ 038@DriverSide 039@Private 040public final class YarnResourceRequestHandler implements ResourceRequestHandler { 041 042 private static final Logger LOG = Logger.getLogger(YarnResourceRequestHandler.class.getName()); 043 private final YarnContainerRequestHandler yarnContainerRequestHandler; 044 private final ApplicationMasterRegistration registration; 045 046 @Inject 047 YarnResourceRequestHandler(final YarnContainerRequestHandler yarnContainerRequestHandler, 048 final ApplicationMasterRegistration registration) { 049 this.yarnContainerRequestHandler = yarnContainerRequestHandler; 050 this.registration = registration; 051 } 052 053 @Override 054 public synchronized void onNext(final DriverRuntimeProtocol.ResourceRequestProto resourceRequestProto) { 055 LOG.log(Level.FINEST, "Got ResourceRequestProto in YarnResourceRequestHandler: memory = {0}, cores = {1}.", new Object[]{resourceRequestProto.getMemorySize(), resourceRequestProto.getVirtualCores()}); 056 057 final String[] nodes = resourceRequestProto.getNodeNameCount() == 0 ? null : 058 resourceRequestProto.getNodeNameList().toArray(new String[resourceRequestProto.getNodeNameCount()]); 059 final String[] racks = resourceRequestProto.getRackNameCount() == 0 ? null : 060 resourceRequestProto.getRackNameList().toArray(new String[resourceRequestProto.getRackNameCount()]); 061 062 // set the priority for the request 063 final Priority pri = getPriority(resourceRequestProto); 064 final Resource resource = getResource(resourceRequestProto); 065 final boolean relax_locality = !resourceRequestProto.hasRelaxLocality() || resourceRequestProto.getRelaxLocality(); 066 067 final AMRMClient.ContainerRequest[] containerRequests = 068 new AMRMClient.ContainerRequest[resourceRequestProto.getResourceCount()]; 069 070 for (int i = 0; i < resourceRequestProto.getResourceCount(); i++) { 071 containerRequests[i] = new AMRMClient.ContainerRequest(resource, nodes, racks, pri, relax_locality); 072 } 073 this.yarnContainerRequestHandler.onContainerRequest(containerRequests); 074 } 075 076 private synchronized Resource getResource(final DriverRuntimeProtocol.ResourceRequestProto resourceRequestProto) { 077 final Resource result = Records.newRecord(Resource.class); 078 final int memory = getMemory(resourceRequestProto.getMemorySize()); 079 final int core = resourceRequestProto.getVirtualCores(); 080 LOG.log(Level.FINEST, "Resource requested: memory = {0}, virtual core count = {1}.", new Object[]{memory, core}); 081 result.setMemory(memory); 082 result.setVirtualCores(core); 083 return result; 084 } 085 086 private synchronized Priority getPriority(final DriverRuntimeProtocol.ResourceRequestProto resourceRequestProto) { 087 final Priority pri = Records.newRecord(Priority.class); 088 pri.setPriority(resourceRequestProto.hasPriority() ? resourceRequestProto.getPriority() : 1); 089 return pri; 090 } 091 092 private synchronized int getMemory(final int requestedMemory) { 093 final int result; 094 if (!this.registration.isPresent()) { 095 LOG.log(Level.WARNING, "AM doesn't seem to be registered. Proceed with fingers crossed."); 096 result = requestedMemory; 097 } else { 098 final int maxMemory = registration.getRegistration().getMaximumResourceCapability().getMemory(); 099 if (requestedMemory > maxMemory) { 100 LOG.log(Level.WARNING, "Asking for {0}MB of memory, but max on this cluster is {1}MB ", 101 new Object[]{requestedMemory, maxMemory}); 102 result = maxMemory; 103 } else { 104 result = requestedMemory; 105 } 106 } 107 return result; 108 } 109 110 111}