001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.examples.helloCLR; 020 021import org.apache.reef.driver.context.ContextConfiguration; 022import org.apache.reef.driver.evaluator.AllocatedEvaluator; 023import org.apache.reef.driver.evaluator.EvaluatorRequest; 024import org.apache.reef.driver.evaluator.EvaluatorRequestor; 025import org.apache.reef.driver.evaluator.EvaluatorType; 026import org.apache.reef.driver.task.TaskConfiguration; 027import org.apache.reef.examples.hello.HelloTask; 028import org.apache.reef.tang.ClassHierarchy; 029import org.apache.reef.tang.Configuration; 030import org.apache.reef.tang.ConfigurationBuilder; 031import org.apache.reef.tang.Tang; 032import org.apache.reef.tang.annotations.Unit; 033import org.apache.reef.tang.exceptions.BindException; 034import org.apache.reef.tang.implementation.protobuf.ProtocolBufferClassHierarchy; 035import org.apache.reef.tang.proto.ClassHierarchyProto; 036import org.apache.reef.wake.EventHandler; 037import org.apache.reef.wake.time.event.StartTime; 038 039import javax.inject.Inject; 040import java.io.FileInputStream; 041import java.io.IOException; 042import java.io.InputStream; 043import java.util.logging.Level; 044import java.util.logging.Logger; 045 046/** 047 * The Driver code for the Hello REEF Application 048 */ 049@Unit 050public final class HelloDriver { 051 052 private static final Logger LOG = Logger.getLogger(HelloDriver.class.getName()); 053 054 private final EvaluatorRequestor requestor; 055 056 private int nJVMTasks = 1; // guarded by this 057 private int nCLRTasks = 1; // guarded by this 058 059 060 /** 061 * Job driver constructor - instantiated via TANG. 062 * 063 * @param requestor evaluator requestor object used to create new evaluator containers. 064 */ 065 @Inject 066 public HelloDriver(final EvaluatorRequestor requestor) { 067 this.requestor = requestor; 068 } 069 070 /** 071 * Makes a task configuration for the CLR Task. 072 * 073 * @param taskId 074 * @return task configuration for the CLR Task. 075 * @throws BindException 076 */ 077 private static final Configuration getCLRTaskConfiguration(final String taskId) throws BindException { 078 final ConfigurationBuilder taskConfigurationBuilder = Tang.Factory.getTang() 079 .newConfigurationBuilder(loadClassHierarchy()); 080 taskConfigurationBuilder.bind("Microsoft.Reef.Tasks.TaskConfigurationOptions+Identifier, Microsoft.Reef.Tasks.ITask, Version=1.0.0.0, Culture=neutral, PublicKeyToken=69c3241e6f0468ca", taskId); 081 taskConfigurationBuilder.bind("Microsoft.Reef.Tasks.ITask, Microsoft.Reef.Tasks.ITask, Version=1.0.0.0, Culture=neutral, PublicKeyToken=69c3241e6f0468ca", "Microsoft.Reef.Tasks.HelloTask, Microsoft.Reef.Tasks.HelloTask, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null"); 082 083 return taskConfigurationBuilder.build(); 084 } 085 086 /** 087 * Loads the class hierarchy. 088 * 089 * @return 090 */ 091 private static ClassHierarchy loadClassHierarchy() { 092 try (final InputStream chin = new FileInputStream(HelloCLR.CLASS_HIERARCHY_FILENAME)) { 093 final ClassHierarchyProto.Node root = ClassHierarchyProto.Node.parseFrom(chin); // A 094 final ClassHierarchy ch = new ProtocolBufferClassHierarchy(root); 095 return ch; 096 } catch (final IOException e) { 097 final String message = "Unable to load class hierarchy."; 098 LOG.log(Level.SEVERE, message, e); 099 throw new RuntimeException(message, e); 100 } 101 } 102 103 /** 104 * Uses the AllocatedEvaluator to launch a CLR task. 105 * 106 * @param allocatedEvaluator 107 */ 108 final void onNextCLR(final AllocatedEvaluator allocatedEvaluator) { 109 try { 110 allocatedEvaluator.setType(EvaluatorType.CLR); 111 final Configuration contextConfiguration = ContextConfiguration.CONF 112 .set(ContextConfiguration.IDENTIFIER, "HelloREEFContext") 113 .build(); 114 115 final Configuration taskConfiguration = getCLRTaskConfiguration("Hello_From_CLR"); 116 117 allocatedEvaluator.submitContextAndTask(contextConfiguration, taskConfiguration); 118 } catch (final BindException ex) { 119 final String message = "Unable to setup Task or Context configuration."; 120 LOG.log(Level.SEVERE, message, ex); 121 throw new RuntimeException(message, ex); 122 } 123 } 124 125 /** 126 * Uses the AllocatedEvaluator to launch a JVM task. 127 * 128 * @param allocatedEvaluator 129 */ 130 final void onNextJVM(final AllocatedEvaluator allocatedEvaluator) { 131 try { 132 allocatedEvaluator.setType(EvaluatorType.JVM); 133 final Configuration contextConfiguration = ContextConfiguration.CONF 134 .set(ContextConfiguration.IDENTIFIER, "HelloREEFContext") 135 .build(); 136 137 final Configuration taskConfiguration = TaskConfiguration.CONF 138 .set(TaskConfiguration.IDENTIFIER, "HelloREEFTask") 139 .set(TaskConfiguration.TASK, HelloTask.class) 140 .build(); 141 142 allocatedEvaluator.submitContextAndTask(contextConfiguration, taskConfiguration); 143 } catch (final BindException ex) { 144 final String message = "Unable to setup Task or Context configuration."; 145 LOG.log(Level.SEVERE, message, ex); 146 throw new RuntimeException(message, ex); 147 } 148 } 149 150 /** 151 * Handles the StartTime event: Request as single Evaluator. 152 */ 153 final class StartHandler implements EventHandler<StartTime> { 154 @Override 155 public void onNext(final StartTime startTime) { 156 LOG.log(Level.INFO, "StartTime: ", startTime); 157 HelloDriver.this.requestor.submit(EvaluatorRequest.newBuilder() 158 .setNumber(nCLRTasks + nJVMTasks) 159 .setMemory(128) 160 .setNumberOfCores(1) 161 .build()); 162 } 163 } 164 165 /** 166 * Handles AllocatedEvaluator: Submit an empty context and the HelloTask 167 */ 168 final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> { 169 @Override 170 public void onNext(final AllocatedEvaluator allocatedEvaluator) { 171 synchronized (HelloDriver.this) { 172 if (HelloDriver.this.nJVMTasks > 0) { 173 HelloDriver.this.onNextJVM(allocatedEvaluator); 174 HelloDriver.this.nJVMTasks -= 1; 175 } else if (HelloDriver.this.nCLRTasks > 0) { 176 HelloDriver.this.onNextCLR(allocatedEvaluator); 177 HelloDriver.this.nCLRTasks -= 1; 178 } 179 } 180 } 181 } 182} 183