blob: 34368fbdfd857f5734200103a77dc77d1d40994d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mrunit.mock;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mrunit.types.Pair;
import org.apache.hadoop.util.ReflectionUtils;
/**
* OutputCollector to use in the test framework for Mapper and Reducer
* classes. Accepts a set of output (k, v) pairs and returns them to the
* framework for validation.
*/
public class MockOutputCollector<K, V> implements OutputCollector<K, V> {
private ArrayList<Pair<K, V>> collectedOutputs;
private SerializationFactory serializationFactory;
private DataOutputBuffer outBuffer;
private DataInputBuffer inBuffer;
private Configuration conf;
public MockOutputCollector() {
collectedOutputs = new ArrayList<Pair<K, V>>();
outBuffer = new DataOutputBuffer();
inBuffer = new DataInputBuffer();
conf = new Configuration();
serializationFactory = new SerializationFactory(conf);
}
private Object getInstance(Class klazz) {
return ReflectionUtils.newInstance(klazz, conf);
}
private Object deepCopy(Object obj) throws IOException {
if (null == obj) {
return null;
}
Class klazz = obj.getClass();
Object out = getInstance(klazz); // the output object to return.
Serializer s = serializationFactory.getSerializer(klazz);
Deserializer ds = serializationFactory.getDeserializer(klazz);
try {
s.open(outBuffer);
ds.open(inBuffer);
outBuffer.reset();
s.serialize(obj);
byte [] data = outBuffer.getData();
int len = outBuffer.getLength();
inBuffer.reset(data, len);
out = ds.deserialize(out);
return out;
} finally {
try {
s.close();
} catch (IOException ioe) {
// ignore this; we're closing.
}
try {
ds.close();
} catch (IOException ioe) {
// ignore this; we're closing.
}
}
}
/**
* Accepts another (key, value) pair as an output of this mapper/reducer.
*/
public void collect(K key, V value) throws IOException {
collectedOutputs.add(new Pair<K, V>((K) deepCopy(key), (V) deepCopy(value)));
}
/**
* @return The outputs generated by the mapper/reducer being tested
*/
public List<Pair<K, V>> getOutputs() {
return collectedOutputs;
}
}