001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.io.data.loading.impl; 020 021import org.apache.hadoop.fs.Path; 022import org.apache.hadoop.mapred.InputFormat; 023import org.apache.hadoop.mapred.JobConf; 024import org.apache.reef.tang.ExternalConstructor; 025import org.apache.reef.tang.annotations.Name; 026import org.apache.reef.tang.annotations.NamedParameter; 027import org.apache.reef.tang.annotations.Parameter; 028 029import javax.inject.Inject; 030import java.lang.reflect.InvocationTargetException; 031import java.lang.reflect.Method; 032import java.util.logging.Level; 033import java.util.logging.Logger; 034 035public class JobConfExternalConstructor implements ExternalConstructor<JobConf> { 036 037 private static final Logger LOG = Logger.getLogger(JobConfExternalConstructor.class.getName()); 038 039 private final String inputFormatClassName; 040 private final String inputPath; 041 042 @Inject 043 public JobConfExternalConstructor( 044 @Parameter(InputFormatClass.class) final String inputFormatClassName, 045 @Parameter(InputPath.class) final String inputPath) { 046 this.inputFormatClassName = inputFormatClassName; 047 this.inputPath = inputPath; 048 } 049 050 @SuppressWarnings({"unchecked", "rawtypes"}) 051 @Override 052 public JobConf newInstance() { 053 054 final JobConf jobConf = new JobConf(); 055 056 try { 057 058 final Class<? extends InputFormat> inputFormatClass = 059 (Class<? extends InputFormat>) Class.forName(this.inputFormatClassName); 060 061 jobConf.setInputFormat(inputFormatClass); 062 063 final Method addInputPath = 064 inputFormatClass.getMethod("addInputPath", JobConf.class, Path.class); 065 066 addInputPath.invoke(inputFormatClass, jobConf, new Path(this.inputPath)); 067 068 } catch (final ClassNotFoundException ex) { 069 throw new RuntimeException("InputFormat: " + this.inputFormatClassName 070 + " ClassNotFoundException while creating newInstance of JobConf", ex); 071 } catch (final InvocationTargetException | IllegalAccessException ex) { 072 throw new RuntimeException("InputFormat: " + this.inputFormatClassName 073 + ".addInputPath() method exists, but cannot be called.", ex); 074 } catch (final NoSuchMethodException ex) { 075 LOG.log(Level.INFO, "{0}.addInputPath() method does not exist", this.inputFormatClassName); 076 } 077 078 return jobConf; 079 } 080 081 @NamedParameter() 082 public static final class InputFormatClass implements Name<String> { 083 } 084 085 @NamedParameter(default_value = "NULL") 086 public static final class InputPath implements Name<String> { 087 } 088}