001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.javabridge;
020
021import org.apache.commons.lang.StringUtils;
022import org.apache.reef.annotations.audience.Interop;
023import org.apache.reef.annotations.audience.Private;
024import org.apache.reef.driver.evaluator.EvaluatorRequest;
025import org.apache.reef.driver.evaluator.EvaluatorRequestor;
026import org.apache.reef.javabridge.avro.DefinedRuntimes;
027import org.apache.reef.javabridge.utils.DefinedRuntimesSerializer;
028import org.apache.reef.util.logging.LoggingScope;
029import org.apache.reef.util.logging.LoggingScopeFactory;
030
031import java.util.ArrayList;
032import java.util.List;
033import java.util.Set;
034import java.util.logging.Level;
035import java.util.logging.Logger;
036
037/**
038 * The Java-CLR bridge object for {@link org.apache.reef.driver.evaluator.EvaluatorRequestor}.
039 */
040@Private
041@Interop(
042    CppFiles = { "Clr2JavaImpl.h", "EvaluatorRequestorClr2Java.cpp" },
043    CsFiles = { "IEvaluatorRequestorClr2Java.cs", "EvaluatorRequestor.cs" })
044public final class EvaluatorRequestorBridge extends NativeBridge {
045  private static final Logger LOG = Logger.getLogger(EvaluatorRequestorBridge.class.getName());
046  private final boolean isBlocked;
047  private final EvaluatorRequestor jevaluatorRequestor;
048  private final LoggingScopeFactory loggingScopeFactory;
049  private final Set<String> definedRuntimes;
050
051  // accumulate how many evaluators have been submitted through this instance
052  // of EvaluatorRequestorBridge
053  private int clrEvaluatorsNumber;
054
055  public EvaluatorRequestorBridge(final EvaluatorRequestor evaluatorRequestor,
056                                  final boolean isBlocked,
057                                  final LoggingScopeFactory loggingScopeFactory,
058                                  final Set<String> definedRuntimes) {
059    this.jevaluatorRequestor = evaluatorRequestor;
060    this.clrEvaluatorsNumber = 0;
061    this.isBlocked = isBlocked;
062    this.loggingScopeFactory = loggingScopeFactory;
063    this.definedRuntimes = definedRuntimes;
064  }
065
066  public void submit(final int evaluatorsNumber,
067                     final int memory,
068                     final int virtualCore,
069                     final String rack,
070                     final String runtimeName) {
071    if (this.isBlocked) {
072      throw new RuntimeException("Cannot request additional Evaluator, this is probably because " +
073          "the Driver has crashed and restarted, and cannot ask for new container due to YARN-2433.");
074    }
075
076    if (rack != null && !rack.isEmpty()) {
077      LOG.log(Level.WARNING, "Ignoring rack preference.");
078    }
079
080    try (final LoggingScope ls = loggingScopeFactory.evaluatorRequestSubmitToJavaDriver(evaluatorsNumber)) {
081      clrEvaluatorsNumber += evaluatorsNumber;
082
083      final EvaluatorRequest request = EvaluatorRequest.newBuilder()
084          .setNumber(evaluatorsNumber)
085          .setMemory(memory)
086          .setNumberOfCores(virtualCore)
087          .setRuntimeName(runtimeName)
088          .build();
089
090      LOG.log(Level.FINE, "submitting evaluator request {0}", request);
091      jevaluatorRequestor.submit(request);
092    }
093  }
094
095  public int getEvaluatorNumber() {
096    return clrEvaluatorsNumber;
097  }
098
099  @Override
100  public void close() {
101  }
102
103  public byte[] getDefinedRuntimes(){
104    if(LOG.isLoggable(Level.FINE)) {
105      LOG.log(Level.FINE, "Defined Runtimes :" + StringUtils.join(this.definedRuntimes, ','));
106    }
107
108    final DefinedRuntimes dr = new DefinedRuntimes();
109    final List<CharSequence> runtimeNames = new ArrayList<>();
110    for(final String name : this.definedRuntimes) {
111      runtimeNames.add(name);
112    }
113    dr.setRuntimeNames(runtimeNames);
114    final DefinedRuntimesSerializer drs = new DefinedRuntimesSerializer();
115    final byte[] ret = drs.toBytes(dr);
116    return ret;
117  }
118}