blob: de69182525f8d89bd094b92e67040ff115c19321 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hcatalog.pig;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
public class MockLoader extends LoadFunc {
private static final class MockRecordReader extends RecordReader<Object, Object> {
@Override
public void close() throws IOException {
}
@Override
public Object getCurrentKey() throws IOException, InterruptedException {
return "mockKey";
}
@Override
public Object getCurrentValue() throws IOException, InterruptedException {
return "mockValue";
}
@Override
public float getProgress() throws IOException, InterruptedException {
return 0.5f;
}
@Override
public void initialize(InputSplit split, TaskAttemptContext arg1) throws IOException,
InterruptedException {
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return true;
}
}
private static final class MockInputSplit extends InputSplit implements Writable {
private String location;
public MockInputSplit() {
}
public MockInputSplit(String location) {
this.location = location;
}
@Override
public String[] getLocations() throws IOException, InterruptedException {
return new String[]{location};
}
@Override
public long getLength() throws IOException, InterruptedException {
return 10000000;
}
@Override
public boolean equals(Object arg0) {
return arg0 == this;
}
@Override
public int hashCode() {
return location.hashCode();
}
@Override
public void readFields(DataInput arg0) throws IOException {
location = arg0.readUTF();
}
@Override
public void write(DataOutput arg0) throws IOException {
arg0.writeUTF(location);
}
}
private static final class MockInputFormat extends InputFormat {
private final String location;
public MockInputFormat(String location) {
this.location = location;
}
@Override
public RecordReader createRecordReader(InputSplit arg0, TaskAttemptContext arg1)
throws IOException, InterruptedException {
return new MockRecordReader();
}
@Override
public List getSplits(JobContext arg0) throws IOException, InterruptedException {
return Arrays.asList(new MockInputSplit(location));
}
}
private static final Map<String, Iterable<Tuple>> locationToData = new HashMap<String, Iterable<Tuple>>();
public static void setData(String location, Iterable<Tuple> data) {
locationToData.put(location, data);
}
private String location;
private Iterator<Tuple> data;
@Override
public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
return location;
}
@Override
public void setLocation(String location, Job job) throws IOException {
this.location = location;
if (location == null) {
throw new IOException("null location passed to MockLoader");
}
this.data = locationToData.get(location).iterator();
if (this.data == null) {
throw new IOException("No data configured for location: " + location);
}
}
@Override
public Tuple getNext() throws IOException {
if (data == null) {
throw new IOException("data was not correctly initialized in MockLoader");
}
return data.hasNext() ? data.next() : null;
}
@Override
public InputFormat getInputFormat() throws IOException {
return new MockInputFormat(location);
}
@Override
public void prepareToRead(RecordReader arg0, PigSplit arg1) throws IOException {
}
}