| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.mrunit.mock; |
| |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.io.DataInputBuffer; |
| import org.apache.hadoop.io.DataOutputBuffer; |
| import org.apache.hadoop.io.serializer.Deserializer; |
| import org.apache.hadoop.io.serializer.SerializationFactory; |
| import org.apache.hadoop.io.serializer.Serializer; |
| import org.apache.hadoop.mapred.OutputCollector; |
| import org.apache.hadoop.mrunit.types.Pair; |
| |
| |
| /** |
| * OutputCollector to use in the test framework for Mapper and Reducer |
| * classes. Accepts a set of output (k, v) pairs and returns them to the |
| * framework for validation. |
| */ |
| public class MockOutputCollector<K, V> implements OutputCollector<K, V> { |
| |
| private ArrayList<Pair<K, V>> collectedOutputs; |
| private SerializationFactory serializationFactory; |
| private DataOutputBuffer outBuffer; |
| private DataInputBuffer inBuffer; |
| private Configuration conf; |
| |
| |
| public MockOutputCollector(Configuration config) { |
| collectedOutputs = new ArrayList<Pair<K, V>>(); |
| |
| outBuffer = new DataOutputBuffer(); |
| inBuffer = new DataInputBuffer(); |
| |
| conf = config; |
| serializationFactory = new SerializationFactory(conf); |
| } |
| |
| |
| @SuppressWarnings({ "rawtypes", "unchecked" }) |
| private Object deepCopy(Object obj) throws IOException { |
| |
| if (null == obj) { |
| return null; |
| } |
| |
| Class klazz = obj.getClass(); |
| Object out = null; |
| Serializer s = serializationFactory.getSerializer(klazz); |
| Deserializer ds = serializationFactory.getDeserializer(klazz); |
| |
| try { |
| s.open(outBuffer); |
| ds.open(inBuffer); |
| |
| outBuffer.reset(); |
| s.serialize(obj); |
| |
| byte [] data = outBuffer.getData(); |
| int len = outBuffer.getLength(); |
| inBuffer.reset(data, len); |
| |
| out = ds.deserialize(out); |
| |
| return out; |
| } finally { |
| try { |
| s.close(); |
| } catch (IOException ioe) { |
| // ignore this; we're closing. |
| } |
| |
| try { |
| ds.close(); |
| } catch (IOException ioe) { |
| // ignore this; we're closing. |
| } |
| } |
| } |
| |
| /** |
| * Accepts another (key, value) pair as an output of this mapper/reducer. |
| */ |
| @SuppressWarnings("unchecked") |
| public void collect(K key, V value) throws IOException { |
| collectedOutputs.add(new Pair<K, V>((K) deepCopy(key), (V) deepCopy(value))); |
| } |
| |
| /** |
| * @return The outputs generated by the mapper/reducer being tested |
| */ |
| public List<Pair<K, V>> getOutputs() { |
| return collectedOutputs; |
| } |
| } |
| |