This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.examples.data.output;
020
021import org.apache.reef.annotations.audience.ClientSide;
022import org.apache.reef.client.DriverConfiguration;
023import org.apache.reef.client.DriverLauncher;
024import org.apache.reef.client.LauncherStatus;
025import org.apache.reef.io.data.output.*;
026import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
027import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
028import org.apache.reef.tang.Configuration;
029import org.apache.reef.tang.Injector;
030import org.apache.reef.tang.JavaConfigurationBuilder;
031import org.apache.reef.tang.Tang;
032import org.apache.reef.tang.annotations.Name;
033import org.apache.reef.tang.annotations.NamedParameter;
034import org.apache.reef.tang.exceptions.BindException;
035import org.apache.reef.tang.exceptions.InjectionException;
036import org.apache.reef.tang.formats.CommandLine;
037import org.apache.reef.util.EnvironmentUtils;
038
039import java.io.File;
040import java.io.IOException;
041import java.util.logging.Level;
042import java.util.logging.Logger;
043
044/**
045 * Client for the output service demo app.
046 */
047@ClientSide
048public final class OutputServiceREEF {
049  private static final Logger LOG = Logger.getLogger(OutputServiceREEF.class.getName());
050
051  public static void main(final String[] args)
052      throws InjectionException, BindException, IOException {
053
054    final Tang tang = Tang.Factory.getTang();
055    final JavaConfigurationBuilder cb = tang.newConfigurationBuilder();
056    new CommandLine(cb)
057        .registerShortNameOfClass(Local.class)
058        .registerShortNameOfClass(TimeOut.class)
059        .registerShortNameOfClass(OutputDir.class)
060        .processCommandLine(args);
061
062    final Injector injector = tang.newInjector(cb.build());
063    final boolean isLocal = injector.getNamedInstance(Local.class);
064    final String outputDir = injector.getNamedInstance(OutputDir.class);
065    final int jobTimeout = injector.getNamedInstance(TimeOut.class) * 60 * 1000;
066
067    final Configuration driverConf = getDriverConf();
068    final Configuration outputServiceConf = getOutputServiceConf(isLocal, outputDir);
069    final Configuration submittedConfiguration = Tang.Factory.getTang()
070        .newConfigurationBuilder(driverConf, outputServiceConf)
071        .build();
072    final LauncherStatus state = DriverLauncher.getLauncher(getRuntimeConf(isLocal))
073        .run(submittedConfiguration, jobTimeout);
074
075    LOG.log(Level.INFO, "REEF job completed: {0}", state);
076  }
077
078  /**
079   * @param isLocal true for local runtime, or false for YARN runtime.
080   * @return The runtime configuration
081   */
082  private static Configuration getRuntimeConf(final boolean isLocal) {
083    final Configuration runtimeConf;
084    if (isLocal) {
085      LOG.log(Level.INFO, "Running the output service demo on the local runtime");
086      runtimeConf = LocalRuntimeConfiguration.CONF
087          .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, 3)
088          .build();
089    } else {
090      LOG.log(Level.INFO, "Running the output service demo on YARN");
091      runtimeConf = YarnClientConfiguration.CONF.build();
092    }
093    return runtimeConf;
094  }
095
096  /**
097   * @return The Driver configuration.
098   */
099  private static Configuration getDriverConf() {
100    final Configuration driverConf = DriverConfiguration.CONF
101        .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(OutputServiceDriver.class))
102        .set(DriverConfiguration.DRIVER_IDENTIFIER, "OutputServiceREEF")
103        .set(DriverConfiguration.ON_DRIVER_STARTED, OutputServiceDriver.StartHandler.class)
104        .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, OutputServiceDriver.EvaluatorAllocatedHandler.class)
105        .set(DriverConfiguration.ON_CONTEXT_ACTIVE, OutputServiceDriver.ActiveContextHandler.class)
106        .build();
107
108    return driverConf;
109  }
110
111  /**
112   * @param isLocal true for local runtime, or false for YARN runtime.
113   * @param outputDir path of the output directory.
114   * @return The configuration to use OutputService
115   */
116  private static Configuration getOutputServiceConf(final boolean isLocal, final String outputDir) {
117    final Configuration outputServiceConf;
118    if (isLocal) {
119      outputServiceConf = TaskOutputServiceBuilder.CONF
120          .set(TaskOutputServiceBuilder.TASK_OUTPUT_STREAM_PROVIDER, TaskOutputStreamProviderLocal.class)
121          .set(TaskOutputServiceBuilder.OUTPUT_PATH, getAbsolutePath(outputDir))
122          .build();
123    } else {
124      outputServiceConf = TaskOutputServiceBuilder.CONF
125          .set(TaskOutputServiceBuilder.TASK_OUTPUT_STREAM_PROVIDER, TaskOutputStreamProviderHDFS.class)
126          .set(TaskOutputServiceBuilder.OUTPUT_PATH, outputDir)
127          .build();
128    }
129    return outputServiceConf;
130  }
131
132  /**
133   * transform the given relative path into the absolute path based on the current directory where a user runs the demo.
134   * @param relativePath relative path
135   * @return absolute path
136   */
137  private static String getAbsolutePath(final String relativePath) {
138    final File outputFile = new File(relativePath);
139    return outputFile.getAbsolutePath();
140  }
141
142  /**
143   * Command line parameter = true to run locally, or false to run on YARN.
144   */
145  @NamedParameter(doc = "Whether or not to run on the local runtime",
146      short_name = "local", default_value = "true")
147  public static final class Local implements Name<Boolean> {
148  }
149
150  /**
151   * Command line parameter = number of minutes before timeout.
152   */
153  @NamedParameter(doc = "Number of minutes before timeout",
154      short_name = "timeout", default_value = "2")
155  public static final class TimeOut implements Name<Integer> {
156  }
157
158  /**
159   * Command line parameter = path of the output directory.
160   */
161  @NamedParameter(doc = "Path of the output directory",
162      short_name = "output")
163  public static final class OutputDir implements Name<String> {
164  }
165
166  /**
167   * Empty private constructor to prohibit instantiation of utility class.
168   */
169  private OutputServiceREEF() {
170  }
171}