001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.examples.helloCLR; 020 021import org.apache.reef.driver.context.ContextConfiguration; 022import org.apache.reef.driver.evaluator.AllocatedEvaluator; 023import org.apache.reef.driver.evaluator.CLRProcessFactory; 024import org.apache.reef.driver.evaluator.EvaluatorRequest; 025import org.apache.reef.driver.evaluator.EvaluatorRequestor; 026import org.apache.reef.driver.task.TaskConfiguration; 027import org.apache.reef.examples.hello.HelloTask; 028import org.apache.reef.tang.ClassHierarchy; 029import org.apache.reef.tang.Configuration; 030import org.apache.reef.tang.ConfigurationBuilder; 031import org.apache.reef.tang.Tang; 032import org.apache.reef.tang.annotations.Unit; 033import org.apache.reef.tang.exceptions.BindException; 034import org.apache.reef.tang.implementation.protobuf.ProtocolBufferClassHierarchy; 035import org.apache.reef.tang.proto.ClassHierarchyProto; 036import org.apache.reef.wake.EventHandler; 037import org.apache.reef.wake.time.event.StartTime; 038 039import javax.inject.Inject; 040import java.io.FileInputStream; 041import java.io.IOException; 042import java.io.InputStream; 043import java.util.logging.Level; 044import java.util.logging.Logger; 045 046/** 047 * The Driver code for the Hello REEF Application. 048 */ 049@Unit 050public final class HelloDriver { 051 052 private static final Logger LOG = Logger.getLogger(HelloDriver.class.getName()); 053 054 private final EvaluatorRequestor requestor; 055 private final CLRProcessFactory clrProcessFactory; 056 057 private int nJVMTasks = 1; // guarded by this 058 private int nCLRTasks = 1; // guarded by this 059 060 061 /** 062 * Job driver constructor - instantiated via TANG. 063 * 064 * @param requestor evaluator requestor object used to create new evaluator containers. 065 */ 066 @Inject 067 public HelloDriver(final EvaluatorRequestor requestor, 068 final CLRProcessFactory clrProcessFactory) { 069 this.requestor = requestor; 070 this.clrProcessFactory = clrProcessFactory; 071 } 072 073 /** 074 * Makes a task configuration for the CLR Task. 075 * 076 * @param taskId 077 * @return task configuration for the CLR Task. 078 * @throws BindException 079 */ 080 private static Configuration getCLRTaskConfiguration(final String taskId) throws BindException { 081 final ConfigurationBuilder taskConfigurationBuilder = Tang.Factory.getTang() 082 .newConfigurationBuilder(loadClassHierarchy()); 083 taskConfigurationBuilder.bind("Org.Apache.Reef.Tasks.TaskConfigurationOptions+Identifier," + 084 "Org.Apache.Reef.Tasks.ITask, Version=1.0.0.0, Culture=neutral, PublicKeyToken=69c3241e6f0468ca", taskId); 085 taskConfigurationBuilder.bind("Org.Apache.Reef.Tasks.ITask, Org.Apache.Reef.Tasks.ITask, Version=1.0.0.0, " + 086 "Culture=neutral, PublicKeyToken=69c3241e6f0468ca", "Org.Apache.Reef.Tasks.HelloTask, " + 087 "Org.Apache.Reef.Tasks.HelloTask, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null"); 088 089 return taskConfigurationBuilder.build(); 090 } 091 092 /** 093 * Loads the class hierarchy. 094 * 095 * @return 096 */ 097 private static ClassHierarchy loadClassHierarchy() { 098 // TODO[JIRA REEF-400] The file should be created by AvroClassHierarchySerializer 099 try (final InputStream chin = new FileInputStream(HelloCLR.CLASS_HIERARCHY_FILENAME)) { 100 // TODO[JIRA REEF-400] Use AvroClassHierarchySerializer instead 101 final ClassHierarchyProto.Node root = ClassHierarchyProto.Node.parseFrom(chin); // A 102 final ClassHierarchy ch = new ProtocolBufferClassHierarchy(root); 103 return ch; 104 } catch (final IOException e) { 105 final String message = "Unable to load class hierarchy."; 106 LOG.log(Level.SEVERE, message, e); 107 throw new RuntimeException(message, e); 108 } 109 } 110 111 /** 112 * Uses the AllocatedEvaluator to launch a CLR task. 113 * 114 * @param allocatedEvaluator 115 */ 116 void onNextCLR(final AllocatedEvaluator allocatedEvaluator) { 117 try { 118 allocatedEvaluator.setProcess(clrProcessFactory.newEvaluatorProcess()); 119 final Configuration contextConfiguration = ContextConfiguration.CONF 120 .set(ContextConfiguration.IDENTIFIER, "HelloREEFContext") 121 .build(); 122 123 final Configuration taskConfiguration = getCLRTaskConfiguration("Hello_From_CLR"); 124 125 allocatedEvaluator.submitContextAndTask(contextConfiguration, taskConfiguration); 126 } catch (final BindException ex) { 127 final String message = "Unable to setup Task or Context configuration."; 128 LOG.log(Level.SEVERE, message, ex); 129 throw new RuntimeException(message, ex); 130 } 131 } 132 133 /** 134 * Uses the AllocatedEvaluator to launch a JVM task. 135 * 136 * @param allocatedEvaluator 137 */ 138 void onNextJVM(final AllocatedEvaluator allocatedEvaluator) { 139 try { 140 final Configuration contextConfiguration = ContextConfiguration.CONF 141 .set(ContextConfiguration.IDENTIFIER, "HelloREEFContext") 142 .build(); 143 144 final Configuration taskConfiguration = TaskConfiguration.CONF 145 .set(TaskConfiguration.IDENTIFIER, "HelloREEFTask") 146 .set(TaskConfiguration.TASK, HelloTask.class) 147 .build(); 148 149 allocatedEvaluator.submitContextAndTask(contextConfiguration, taskConfiguration); 150 } catch (final BindException ex) { 151 final String message = "Unable to setup Task or Context configuration."; 152 LOG.log(Level.SEVERE, message, ex); 153 throw new RuntimeException(message, ex); 154 } 155 } 156 157 /** 158 * Handles the StartTime event: Request as single Evaluator. 159 */ 160 final class StartHandler implements EventHandler<StartTime> { 161 @Override 162 public void onNext(final StartTime startTime) { 163 LOG.log(Level.INFO, "StartTime: ", startTime); 164 HelloDriver.this.requestor.submit(EvaluatorRequest.newBuilder() 165 .setNumber(nCLRTasks + nJVMTasks) 166 .setMemory(128) 167 .setNumberOfCores(1) 168 .build()); 169 } 170 } 171 172 /** 173 * Handles AllocatedEvaluator: Submit an empty context and the HelloTask. 174 */ 175 final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> { 176 @Override 177 public void onNext(final AllocatedEvaluator allocatedEvaluator) { 178 synchronized (HelloDriver.this) { 179 if (HelloDriver.this.nJVMTasks > 0) { 180 HelloDriver.this.onNextJVM(allocatedEvaluator); 181 HelloDriver.this.nJVMTasks -= 1; 182 } else if (HelloDriver.this.nCLRTasks > 0) { 183 HelloDriver.this.onNextCLR(allocatedEvaluator); 184 HelloDriver.this.nCLRTasks -= 1; 185 } 186 } 187 } 188 } 189} 190