| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.metron.dataloads.extractor.inputformat; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.BytesWritable; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.io.NullWritable; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.mapreduce.*; |
| import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; |
| import org.apache.hadoop.mapreduce.lib.input.FileSplit; |
| |
| import java.io.IOException; |
| import java.util.Map; |
| |
| public class WholeFileFormat implements InputFormatHandler { |
| |
| public static class WholeFileRecordReader extends RecordReader<NullWritable, Text> { |
| private FileSplit fileSplit; |
| private Configuration conf; |
| private Text value = new Text(); |
| private boolean processed = false; |
| |
| @Override |
| public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { |
| this.fileSplit = (FileSplit) split; |
| this.conf = context.getConfiguration(); |
| } |
| |
| @Override |
| public boolean nextKeyValue() throws IOException, InterruptedException { |
| if (!processed) { |
| byte[] contents = new byte[(int) fileSplit.getLength()]; |
| Path file = fileSplit.getPath(); |
| FileSystem fs = file.getFileSystem(conf); |
| FSDataInputStream in = null; |
| try { |
| in = fs.open(file); |
| IOUtils.readFully(in, contents, 0, contents.length); |
| value.set(contents, 0, contents.length); |
| } finally { |
| IOUtils.closeStream(in); |
| } |
| processed = true; |
| return true; |
| } |
| return false; |
| } |
| |
| @Override |
| public NullWritable getCurrentKey() throws IOException, InterruptedException { |
| return NullWritable.get(); |
| } |
| @Override |
| public Text getCurrentValue() throws IOException, InterruptedException{ |
| return value; |
| } |
| |
| @Override |
| public float getProgress() throws IOException { |
| return processed ? 1.0f : 0.0f; |
| } |
| |
| @Override |
| public void close() throws IOException{ |
| //do nothing :) |
| } |
| } |
| |
| public static class WholeFileInputFormat extends FileInputFormat<NullWritable, Text> { |
| |
| @Override |
| protected boolean isSplitable(JobContext context, Path file) { |
| return false; |
| } |
| |
| @Override |
| public RecordReader<NullWritable, Text> createRecordReader( |
| InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { |
| WholeFileRecordReader reader = new WholeFileRecordReader(); |
| reader.initialize(split, context); |
| return reader; |
| } |
| } |
| @Override |
| public void set(Job job, Path input, Map<String, Object> config) throws IOException { |
| WholeFileInputFormat.setInputPaths(job, input); |
| job.setInputFormatClass(WholeFileInputFormat.class); |
| } |
| } |