This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.yarn.driver;
020
021import org.apache.hadoop.yarn.api.records.Priority;
022import org.apache.hadoop.yarn.api.records.Resource;
023import org.apache.hadoop.yarn.client.api.AMRMClient;
024import org.apache.hadoop.yarn.util.Records;
025import org.apache.reef.annotations.audience.DriverSide;
026import org.apache.reef.annotations.audience.Private;
027import org.apache.reef.proto.DriverRuntimeProtocol;
028import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler;
029
030import javax.inject.Inject;
031import java.util.logging.Level;
032import java.util.logging.Logger;
033
034/**
035 * Accepts resource requests from the REEF layer, translates them into requests for YARN and hands them to the
036 * appropriate handler for those.
037 */
038@DriverSide
039@Private
040public final class YarnResourceRequestHandler implements ResourceRequestHandler {
041
042  private static final Logger LOG = Logger.getLogger(YarnResourceRequestHandler.class.getName());
043  private final YarnContainerRequestHandler yarnContainerRequestHandler;
044  private final ApplicationMasterRegistration registration;
045
046  @Inject
047  YarnResourceRequestHandler(final YarnContainerRequestHandler yarnContainerRequestHandler,
048                             final ApplicationMasterRegistration registration) {
049    this.yarnContainerRequestHandler = yarnContainerRequestHandler;
050    this.registration = registration;
051  }
052
053  @Override
054  public synchronized void onNext(final DriverRuntimeProtocol.ResourceRequestProto resourceRequestProto) {
055    LOG.log(Level.FINEST, "Got ResourceRequestProto in YarnResourceRequestHandler: memory = {0}, cores = {1}.", new Object[]{resourceRequestProto.getMemorySize(), resourceRequestProto.getVirtualCores()});
056
057    final String[] nodes = resourceRequestProto.getNodeNameCount() == 0 ? null :
058        resourceRequestProto.getNodeNameList().toArray(new String[resourceRequestProto.getNodeNameCount()]);
059    final String[] racks = resourceRequestProto.getRackNameCount() == 0 ? null :
060        resourceRequestProto.getRackNameList().toArray(new String[resourceRequestProto.getRackNameCount()]);
061
062    // set the priority for the request
063    final Priority pri = getPriority(resourceRequestProto);
064    final Resource resource = getResource(resourceRequestProto);
065    final boolean relax_locality = !resourceRequestProto.hasRelaxLocality() || resourceRequestProto.getRelaxLocality();
066
067    final AMRMClient.ContainerRequest[] containerRequests =
068        new AMRMClient.ContainerRequest[resourceRequestProto.getResourceCount()];
069
070    for (int i = 0; i < resourceRequestProto.getResourceCount(); i++) {
071      containerRequests[i] = new AMRMClient.ContainerRequest(resource, nodes, racks, pri, relax_locality);
072    }
073    this.yarnContainerRequestHandler.onContainerRequest(containerRequests);
074  }
075
076  private synchronized Resource getResource(final DriverRuntimeProtocol.ResourceRequestProto resourceRequestProto) {
077    final Resource result = Records.newRecord(Resource.class);
078    final int memory = getMemory(resourceRequestProto.getMemorySize());
079    final int core = resourceRequestProto.getVirtualCores();
080    LOG.log(Level.FINEST, "Resource requested: memory = {0}, virtual core count = {1}.", new Object[]{memory, core});
081    result.setMemory(memory);
082    result.setVirtualCores(core);
083    return result;
084  }
085
086  private synchronized Priority getPriority(final DriverRuntimeProtocol.ResourceRequestProto resourceRequestProto) {
087    final Priority pri = Records.newRecord(Priority.class);
088    pri.setPriority(resourceRequestProto.hasPriority() ? resourceRequestProto.getPriority() : 1);
089    return pri;
090  }
091
092  private synchronized int getMemory(final int requestedMemory) {
093    final int result;
094    if (!this.registration.isPresent()) {
095      LOG.log(Level.WARNING, "AM doesn't seem to be registered. Proceed with fingers crossed.");
096      result = requestedMemory;
097    } else {
098      final int maxMemory = registration.getRegistration().getMaximumResourceCapability().getMemory();
099      if (requestedMemory > maxMemory) {
100        LOG.log(Level.WARNING, "Asking for {0}MB of memory, but max on this cluster is {1}MB ",
101            new Object[]{requestedMemory, maxMemory});
102        result = maxMemory;
103      } else {
104        result = requestedMemory;
105      }
106    }
107    return result;
108  }
109
110
111}