This project has retired. For details please refer to its Attic page.
Source code
001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.reef.io.data.output;
020
021import org.apache.hadoop.fs.FileSystem;
022import org.apache.hadoop.fs.Path;
023import org.apache.hadoop.mapred.JobConf;
024import org.apache.reef.annotations.audience.TaskSide;
025import org.apache.reef.tang.annotations.Parameter;
026
027import javax.inject.Inject;
028import java.io.DataOutputStream;
029import java.io.IOException;
030
031/**
032 * Implementation of {@link TaskOutputStreamProvider}.
033 * It provides FileOutputStreams on HDFS.
034 */
035@TaskSide
036public final class TaskOutputStreamProviderHDFS extends TaskOutputStreamProvider {
037
038  /**
039   * Path of the output directory on HDFS to write outputs.
040   */
041  private final String outputPath;
042
043  /**
044   * HDFS File system.
045   */
046  private FileSystem fs;
047
048  /**
049   * Constructor - instantiated via TANG.
050   *
051   * @param outputPath path of the output directory on HDFS to write outputs.
052   */
053  @Inject
054  private TaskOutputStreamProviderHDFS(
055      @Parameter(TaskOutputService.OutputPath.class) final String outputPath) throws IOException {
056    this.outputPath = outputPath;
057    final JobConf jobConf = new JobConf();
058    fs = FileSystem.get(jobConf);
059  }
060
061  /**
062   * create a file output stream using the given name.
063   * The path of the file on HDFS is 'outputPath/name/taskId'.
064   *
065   * @param name name of the created output stream
066   *             It is used as the name of the directory if the created output stream is a file output stream
067   * @return OutputStream to a file on HDFS. The path of the file is 'outputPath/name/taskId'
068   * @throws java.io.IOException
069   */
070  @Override
071  public DataOutputStream create(final String name) throws IOException {
072    final String directoryPath = outputPath + Path.SEPARATOR + name;
073    if (!fs.exists(new Path(directoryPath))) {
074      fs.mkdirs(new Path(directoryPath));
075    }
076    return fs.create(new Path(directoryPath + Path.SEPARATOR + getTaskId()));
077  }
078
079  @Override
080  public void close() throws IOException {
081    fs.close();
082  }
083}