This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.examples.helloCLR;
020
021import org.apache.reef.driver.context.ContextConfiguration;
022import org.apache.reef.driver.evaluator.AllocatedEvaluator;
023import org.apache.reef.driver.evaluator.EvaluatorRequest;
024import org.apache.reef.driver.evaluator.EvaluatorRequestor;
025import org.apache.reef.driver.evaluator.EvaluatorType;
026import org.apache.reef.driver.task.TaskConfiguration;
027import org.apache.reef.examples.hello.HelloTask;
028import org.apache.reef.tang.ClassHierarchy;
029import org.apache.reef.tang.Configuration;
030import org.apache.reef.tang.ConfigurationBuilder;
031import org.apache.reef.tang.Tang;
032import org.apache.reef.tang.annotations.Unit;
033import org.apache.reef.tang.exceptions.BindException;
034import org.apache.reef.tang.implementation.protobuf.ProtocolBufferClassHierarchy;
035import org.apache.reef.tang.proto.ClassHierarchyProto;
036import org.apache.reef.wake.EventHandler;
037import org.apache.reef.wake.time.event.StartTime;
038
039import javax.inject.Inject;
040import java.io.FileInputStream;
041import java.io.IOException;
042import java.io.InputStream;
043import java.util.logging.Level;
044import java.util.logging.Logger;
045
046/**
047 * The Driver code for the Hello REEF Application
048 */
049@Unit
050public final class HelloDriver {
051
052  private static final Logger LOG = Logger.getLogger(HelloDriver.class.getName());
053
054  private final EvaluatorRequestor requestor;
055
056  private int nJVMTasks = 1;  // guarded by this
057  private int nCLRTasks = 1;  // guarded by this
058
059
060  /**
061   * Job driver constructor - instantiated via TANG.
062   *
063   * @param requestor evaluator requestor object used to create new evaluator containers.
064   */
065  @Inject
066  public HelloDriver(final EvaluatorRequestor requestor) {
067    this.requestor = requestor;
068  }
069
070  /**
071   * Makes a task configuration for the CLR Task.
072   *
073   * @param taskId
074   * @return task configuration for the CLR Task.
075   * @throws BindException
076   */
077  private static final Configuration getCLRTaskConfiguration(final String taskId) throws BindException {
078    final ConfigurationBuilder taskConfigurationBuilder = Tang.Factory.getTang()
079        .newConfigurationBuilder(loadClassHierarchy());
080    taskConfigurationBuilder.bind("Microsoft.Reef.Tasks.TaskConfigurationOptions+Identifier, Microsoft.Reef.Tasks.ITask, Version=1.0.0.0, Culture=neutral, PublicKeyToken=69c3241e6f0468ca", taskId);
081    taskConfigurationBuilder.bind("Microsoft.Reef.Tasks.ITask, Microsoft.Reef.Tasks.ITask, Version=1.0.0.0, Culture=neutral, PublicKeyToken=69c3241e6f0468ca", "Microsoft.Reef.Tasks.HelloTask, Microsoft.Reef.Tasks.HelloTask, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null");
082
083    return taskConfigurationBuilder.build();
084  }
085
086  /**
087   * Loads the class hierarchy.
088   *
089   * @return
090   */
091  private static ClassHierarchy loadClassHierarchy() {
092    try (final InputStream chin = new FileInputStream(HelloCLR.CLASS_HIERARCHY_FILENAME)) {
093      final ClassHierarchyProto.Node root = ClassHierarchyProto.Node.parseFrom(chin); // A
094      final ClassHierarchy ch = new ProtocolBufferClassHierarchy(root);
095      return ch;
096    } catch (final IOException e) {
097      final String message = "Unable to load class hierarchy.";
098      LOG.log(Level.SEVERE, message, e);
099      throw new RuntimeException(message, e);
100    }
101  }
102
103  /**
104   * Uses the AllocatedEvaluator to launch a CLR task.
105   *
106   * @param allocatedEvaluator
107   */
108  final void onNextCLR(final AllocatedEvaluator allocatedEvaluator) {
109    try {
110      allocatedEvaluator.setType(EvaluatorType.CLR);
111      final Configuration contextConfiguration = ContextConfiguration.CONF
112          .set(ContextConfiguration.IDENTIFIER, "HelloREEFContext")
113          .build();
114
115      final Configuration taskConfiguration = getCLRTaskConfiguration("Hello_From_CLR");
116
117      allocatedEvaluator.submitContextAndTask(contextConfiguration, taskConfiguration);
118    } catch (final BindException ex) {
119      final String message = "Unable to setup Task or Context configuration.";
120      LOG.log(Level.SEVERE, message, ex);
121      throw new RuntimeException(message, ex);
122    }
123  }
124
125  /**
126   * Uses the AllocatedEvaluator to launch a JVM task.
127   *
128   * @param allocatedEvaluator
129   */
130  final void onNextJVM(final AllocatedEvaluator allocatedEvaluator) {
131    try {
132      allocatedEvaluator.setType(EvaluatorType.JVM);
133      final Configuration contextConfiguration = ContextConfiguration.CONF
134          .set(ContextConfiguration.IDENTIFIER, "HelloREEFContext")
135          .build();
136
137      final Configuration taskConfiguration = TaskConfiguration.CONF
138          .set(TaskConfiguration.IDENTIFIER, "HelloREEFTask")
139          .set(TaskConfiguration.TASK, HelloTask.class)
140          .build();
141
142      allocatedEvaluator.submitContextAndTask(contextConfiguration, taskConfiguration);
143    } catch (final BindException ex) {
144      final String message = "Unable to setup Task or Context configuration.";
145      LOG.log(Level.SEVERE, message, ex);
146      throw new RuntimeException(message, ex);
147    }
148  }
149
150  /**
151   * Handles the StartTime event: Request as single Evaluator.
152   */
153  final class StartHandler implements EventHandler<StartTime> {
154    @Override
155    public void onNext(final StartTime startTime) {
156      LOG.log(Level.INFO, "StartTime: ", startTime);
157      HelloDriver.this.requestor.submit(EvaluatorRequest.newBuilder()
158          .setNumber(nCLRTasks + nJVMTasks)
159          .setMemory(128)
160          .setNumberOfCores(1)
161          .build());
162    }
163  }
164
165  /**
166   * Handles AllocatedEvaluator: Submit an empty context and the HelloTask
167   */
168  final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
169    @Override
170    public void onNext(final AllocatedEvaluator allocatedEvaluator) {
171      synchronized (HelloDriver.this) {
172        if (HelloDriver.this.nJVMTasks > 0) {
173          HelloDriver.this.onNextJVM(allocatedEvaluator);
174          HelloDriver.this.nJVMTasks -= 1;
175        } else if (HelloDriver.this.nCLRTasks > 0) {
176          HelloDriver.this.onNextCLR(allocatedEvaluator);
177          HelloDriver.this.nCLRTasks -= 1;
178        }
179      }
180    }
181  }
182}
183