001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.runtime.common.driver;
020
021import org.apache.reef.driver.catalog.ResourceCatalog;
022import org.apache.reef.driver.evaluator.EvaluatorRequest;
023import org.apache.reef.driver.evaluator.EvaluatorRequestor;
024import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent;
025import org.apache.reef.runtime.common.driver.api.ResourceRequestEventImpl;
026import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler;
027import org.apache.reef.runtime.common.utils.Constants;
028import org.apache.reef.util.logging.LoggingScope;
029import org.apache.reef.util.logging.LoggingScopeFactory;
030
031import javax.inject.Inject;
032import java.util.logging.Level;
033import java.util.logging.Logger;
034
035
036/**
037 * Implementation of the EvaluatorRequestor that translates the request and hands it down to the underlying RM.
038 */
039public final class EvaluatorRequestorImpl implements EvaluatorRequestor {
040
041  private static final Logger LOG = Logger.getLogger(EvaluatorRequestorImpl.class.getName());
042
043  private final ResourceCatalog resourceCatalog;
044  private final ResourceRequestHandler resourceRequestHandler;
045  private final LoggingScopeFactory loggingScopeFactory;
046
047  /**
048   * @param resourceCatalog
049   * @param resourceRequestHandler
050   * @param loggingScopeFactory
051   */
052  @Inject
053  public EvaluatorRequestorImpl(final ResourceCatalog resourceCatalog,
054                                final ResourceRequestHandler resourceRequestHandler,
055                                final LoggingScopeFactory loggingScopeFactory) {
056    this.resourceCatalog = resourceCatalog;
057    this.resourceRequestHandler = resourceRequestHandler;
058    this.loggingScopeFactory = loggingScopeFactory;
059  }
060
061  @Override
062  public synchronized void submit(final EvaluatorRequest req) {
063    LOG.log(Level.FINEST, "Got an EvaluatorRequest: number: {0}, memory = {1}, cores = {2}.",
064        new Object[] {req.getNumber(), req.getMegaBytes(), req.getNumberOfCores()});
065
066    if (req.getMegaBytes() <= 0) {
067      throw new IllegalArgumentException("Given an unsupported memory size: " + req.getMegaBytes());
068    }
069    if (req.getNumberOfCores() <= 0) {
070      throw new IllegalArgumentException("Given an unsupported core number: " + req.getNumberOfCores());
071    }
072    if (req.getNumber() <= 0) {
073      throw new IllegalArgumentException("Given an unsupported number of evaluators: " + req.getNumber());
074    }
075    if (req.getNodeNames() == null) {
076      throw new IllegalArgumentException("Node names cannot be null");
077    }
078    if (req.getRackNames() == null) {
079      throw new IllegalArgumentException("Rack names cannot be null");
080    }
081    if(req.getRuntimeName() == null) {
082      throw new IllegalArgumentException("Runtime name cannot be null");
083    }
084    // for backwards compatibility, we will always set the relax locality flag
085    // to true unless the user configured racks, in which case we will check for
086    // the ANY modifier (*), if not there, then we won't relax the locality
087    boolean relaxLocality = true;
088    if (!req.getRackNames().isEmpty()) {
089      for (final String rackName : req.getRackNames()) {
090        if (Constants.ANY_RACK.equals(rackName)) {
091          relaxLocality = true;
092          break;
093        }
094        relaxLocality = false;
095      }
096    }
097    // if the user specified any node, then we assume they do not want to relax locality
098    if (!req.getNodeNames().isEmpty()) {
099      relaxLocality = false;
100    }
101
102    try (LoggingScope ls = this.loggingScopeFactory.evaluatorSubmit(req.getNumber())) {
103      final ResourceRequestEvent request = ResourceRequestEventImpl
104          .newBuilder()
105          .setResourceCount(req.getNumber())
106          .setVirtualCores(req.getNumberOfCores())
107          .setMemorySize(req.getMegaBytes())
108          .addNodeNames(req.getNodeNames())
109          .addRackNames(req.getRackNames())
110          .setRelaxLocality(relaxLocality)
111          .setRuntimeName(req.getRuntimeName())
112          .build();
113      this.resourceRequestHandler.onNext(request);
114    }
115  }
116
117  /**
118   * Get a new builder.
119   *
120   * @return a new EvaluatorRequest Builder extended with the new submit method.
121   */
122  @Override
123  public Builder newRequest() {
124    return new Builder();
125  }
126
127  /**
128   * {@link EvaluatorRequest.Builder} extended with a new submit method.
129   * {@link EvaluatorRequest}s are built using this builder.
130   */
131  public final class Builder extends EvaluatorRequest.Builder<Builder> {
132    public synchronized void submit() {
133      EvaluatorRequestorImpl.this.submit(this.build());
134    }
135  }
136}