This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.yarn.driver;
020
021import org.apache.hadoop.yarn.api.records.Priority;
022import org.apache.hadoop.yarn.api.records.Resource;
023import org.apache.hadoop.yarn.client.api.AMRMClient;
024import org.apache.hadoop.yarn.util.Records;
025import org.apache.reef.annotations.audience.DriverSide;
026import org.apache.reef.annotations.audience.Private;
027import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent;
028import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler;
029
030import javax.inject.Inject;
031import java.util.logging.Level;
032import java.util.logging.Logger;
033
034/**
035 * Accepts resource requests from the REEF layer, translates them into requests for YARN and hands them to the
036 * appropriate handler for those.
037 */
038@DriverSide
039@Private
040public final class YarnResourceRequestHandler implements ResourceRequestHandler {
041
042  private static final Logger LOG = Logger.getLogger(YarnResourceRequestHandler.class.getName());
043  private final YarnContainerRequestHandler yarnContainerRequestHandler;
044  private final ApplicationMasterRegistration registration;
045
046  @Inject
047  YarnResourceRequestHandler(final YarnContainerRequestHandler yarnContainerRequestHandler,
048                             final ApplicationMasterRegistration registration) {
049    this.yarnContainerRequestHandler = yarnContainerRequestHandler;
050    this.registration = registration;
051  }
052
053  @Override
054  public synchronized void onNext(final ResourceRequestEvent resourceRequestEvent) {
055    LOG.log(Level.FINEST, "Got ResourceRequestEvent in YarnResourceRequestHandler: memory = {0}, cores = {1}.",
056        new Object[]{resourceRequestEvent.getMemorySize(), resourceRequestEvent.getVirtualCores()});
057
058    final String[] nodes = resourceRequestEvent.getNodeNameList().size() == 0 ? null :
059        resourceRequestEvent.getNodeNameList().toArray(new String[resourceRequestEvent.getNodeNameList().size()]);
060    final String[] racks = resourceRequestEvent.getRackNameList().size() == 0 ? null :
061        resourceRequestEvent.getRackNameList().toArray(new String[resourceRequestEvent.getRackNameList().size()]);
062
063    // set the priority for the request
064    final Priority pri = getPriority(resourceRequestEvent);
065    final Resource resource = getResource(resourceRequestEvent);
066    final boolean relaxLocality = resourceRequestEvent.getRelaxLocality().orElse(true);
067
068    final AMRMClient.ContainerRequest[] containerRequests =
069        new AMRMClient.ContainerRequest[resourceRequestEvent.getResourceCount()];
070
071    for (int i = 0; i < resourceRequestEvent.getResourceCount(); i++) {
072      containerRequests[i] = new AMRMClient.ContainerRequest(resource, nodes, racks, pri, relaxLocality);
073    }
074    this.yarnContainerRequestHandler.onContainerRequest(containerRequests);
075  }
076
077  private synchronized Resource getResource(final ResourceRequestEvent resourceRequestEvent) {
078    final Resource result = Records.newRecord(Resource.class);
079    final int memory = getMemory(resourceRequestEvent.getMemorySize().get());
080    final int core = resourceRequestEvent.getVirtualCores().get();
081    LOG.log(Level.FINEST, "Resource requested: memory = {0}, virtual core count = {1}.", new Object[]{memory, core});
082    result.setMemory(memory);
083    result.setVirtualCores(core);
084    return result;
085  }
086
087  private synchronized Priority getPriority(final ResourceRequestEvent resourceRequestEvent) {
088    final Priority pri = Records.newRecord(Priority.class);
089    pri.setPriority(resourceRequestEvent.getPriority().orElse(1));
090    return pri;
091  }
092
093  private synchronized int getMemory(final int requestedMemory) {
094    final int result;
095    if (!this.registration.isPresent()) {
096      LOG.log(Level.WARNING, "AM doesn't seem to be registered. Proceed with fingers crossed.");
097      result = requestedMemory;
098    } else {
099      final int maxMemory = registration.getRegistration().getMaximumResourceCapability().getMemory();
100      if (requestedMemory > maxMemory) {
101        LOG.log(Level.WARNING, "Asking for {0}MB of memory, but max on this cluster is {1}MB ",
102            new Object[]{requestedMemory, maxMemory});
103        result = maxMemory;
104      } else {
105        result = requestedMemory;
106      }
107    }
108    return result;
109  }
110
111
112}