001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.javabridge; 020 021import org.apache.commons.lang.StringUtils; 022import org.apache.reef.annotations.audience.Interop; 023import org.apache.reef.annotations.audience.Private; 024import org.apache.reef.driver.evaluator.EvaluatorRequest; 025import org.apache.reef.driver.evaluator.EvaluatorRequestor; 026import org.apache.reef.javabridge.avro.DefinedRuntimes; 027import org.apache.reef.javabridge.utils.DefinedRuntimesSerializer; 028import org.apache.reef.util.logging.LoggingScope; 029import org.apache.reef.util.logging.LoggingScopeFactory; 030 031import java.util.ArrayList; 032import java.util.List; 033import java.util.Set; 034import java.util.logging.Level; 035import java.util.logging.Logger; 036 037/** 038 * The Java-CLR bridge object for {@link org.apache.reef.driver.evaluator.EvaluatorRequestor}. 039 */ 040@Private 041@Interop( 042 CppFiles = { "Clr2JavaImpl.h", "EvaluatorRequestorClr2Java.cpp" }, 043 CsFiles = { "IEvaluatorRequestorClr2Java.cs", "EvaluatorRequestor.cs" }) 044public final class EvaluatorRequestorBridge extends NativeBridge { 045 private static final Logger LOG = Logger.getLogger(EvaluatorRequestorBridge.class.getName()); 046 private final boolean isBlocked; 047 private final EvaluatorRequestor jevaluatorRequestor; 048 private final LoggingScopeFactory loggingScopeFactory; 049 private final Set<String> definedRuntimes; 050 051 // accumulate how many evaluators have been submitted through this instance 052 // of EvaluatorRequestorBridge 053 private int clrEvaluatorsNumber; 054 055 public EvaluatorRequestorBridge(final EvaluatorRequestor evaluatorRequestor, 056 final boolean isBlocked, 057 final LoggingScopeFactory loggingScopeFactory, 058 final Set<String> definedRuntimes) { 059 this.jevaluatorRequestor = evaluatorRequestor; 060 this.clrEvaluatorsNumber = 0; 061 this.isBlocked = isBlocked; 062 this.loggingScopeFactory = loggingScopeFactory; 063 this.definedRuntimes = definedRuntimes; 064 } 065 066 public void submit(final int evaluatorsNumber, 067 final int memory, 068 final int virtualCore, 069 final String rack, 070 final String runtimeName) { 071 if (this.isBlocked) { 072 throw new RuntimeException("Cannot request additional Evaluator, this is probably because " + 073 "the Driver has crashed and restarted, and cannot ask for new container due to YARN-2433."); 074 } 075 076 if (rack != null && !rack.isEmpty()) { 077 LOG.log(Level.WARNING, "Ignoring rack preference."); 078 } 079 080 try (final LoggingScope ls = loggingScopeFactory.evaluatorRequestSubmitToJavaDriver(evaluatorsNumber)) { 081 clrEvaluatorsNumber += evaluatorsNumber; 082 083 final EvaluatorRequest request = EvaluatorRequest.newBuilder() 084 .setNumber(evaluatorsNumber) 085 .setMemory(memory) 086 .setNumberOfCores(virtualCore) 087 .setRuntimeName(runtimeName) 088 .build(); 089 090 LOG.log(Level.FINE, "submitting evaluator request {0}", request); 091 jevaluatorRequestor.submit(request); 092 } 093 } 094 095 public int getEvaluatorNumber() { 096 return clrEvaluatorsNumber; 097 } 098 099 @Override 100 public void close() { 101 } 102 103 public byte[] getDefinedRuntimes(){ 104 if(LOG.isLoggable(Level.FINE)) { 105 LOG.log(Level.FINE, "Defined Runtimes :" + StringUtils.join(this.definedRuntimes, ',')); 106 } 107 108 final DefinedRuntimes dr = new DefinedRuntimes(); 109 final List<CharSequence> runtimeNames = new ArrayList<>(); 110 for(final String name : this.definedRuntimes) { 111 runtimeNames.add(name); 112 } 113 dr.setRuntimeNames(runtimeNames); 114 final DefinedRuntimesSerializer drs = new DefinedRuntimesSerializer(); 115 final byte[] ret = drs.toBytes(dr); 116 return ret; 117 } 118}