blob: 34238ae2b8806077e6f020f40e6352cf4abd43b4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.nativetask.handlers;
import java.io.Closeable;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.nativetask.Command;
import org.apache.hadoop.mapred.nativetask.CommandDispatcher;
import org.apache.hadoop.mapred.nativetask.DataChannel;
import org.apache.hadoop.mapred.nativetask.ICombineHandler;
import org.apache.hadoop.mapred.nativetask.INativeHandler;
import org.apache.hadoop.mapred.nativetask.NativeBatchProcessor;
import org.apache.hadoop.mapred.nativetask.TaskContext;
import org.apache.hadoop.mapred.nativetask.util.NativeTaskOutput;
import org.apache.hadoop.mapred.nativetask.util.OutputUtil;
import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
/**
* Java Record Reader + Java Mapper + Native Collector
*/
@SuppressWarnings("unchecked")
@InterfaceAudience.Private
public class NativeCollectorOnlyHandler<K, V> implements CommandDispatcher, Closeable {
public static final String NAME = "NativeTask.MCollectorOutputHandler";
private static Log LOG = LogFactory.getLog(NativeCollectorOnlyHandler.class);
public static final Command GET_OUTPUT_PATH =
new Command(100, "GET_OUTPUT_PATH");
public static final Command GET_OUTPUT_INDEX_PATH =
new Command(101, "GET_OUTPUT_INDEX_PATH");
public static final Command GET_SPILL_PATH =
new Command(102, "GET_SPILL_PATH");
public static final Command GET_COMBINE_HANDLER =
new Command(103, "GET_COMBINE_HANDLER");
private NativeTaskOutput output;
private int spillNumber = 0;
private ICombineHandler combinerHandler = null;
private final BufferPusher<K, V> kvPusher;
private final INativeHandler nativeHandler;
private boolean closed = false;
public static <K, V> NativeCollectorOnlyHandler<K, V> create(TaskContext context)
throws IOException {
ICombineHandler combinerHandler = null;
try {
final TaskContext combineContext = context.copyOf();
combineContext.setInputKeyClass(context.getOutputKeyClass());
combineContext.setInputValueClass(context.getOutputValueClass());
combinerHandler = CombinerHandler.create(combineContext);
} catch (final ClassNotFoundException e) {
throw new IOException(e);
}
if (null != combinerHandler) {
LOG.info("[NativeCollectorOnlyHandler] combiner is not null");
}
final INativeHandler nativeHandler = NativeBatchProcessor.create(
NAME, context.getConf(), DataChannel.OUT);
final BufferPusher<K, V> kvPusher = new BufferPusher<K, V>(
(Class<K>)context.getOutputKeyClass(),
(Class<V>)context.getOutputValueClass(),
nativeHandler);
return new NativeCollectorOnlyHandler<K, V>(context, nativeHandler, kvPusher, combinerHandler);
}
protected NativeCollectorOnlyHandler(TaskContext context, INativeHandler nativeHandler,
BufferPusher<K, V> kvPusher, ICombineHandler combiner) throws IOException {
Configuration conf = context.getConf();
TaskAttemptID id = context.getTaskAttemptId();
if (null == id) {
this.output = OutputUtil.createNativeTaskOutput(conf, "");
} else {
this.output = OutputUtil.createNativeTaskOutput(context.getConf(), context.getTaskAttemptId()
.toString());
}
this.combinerHandler = combiner;
this.kvPusher = kvPusher;
this.nativeHandler = nativeHandler;
nativeHandler.setCommandDispatcher(this);
}
public void collect(K key, V value, int partition) throws IOException {
kvPusher.collect(key, value, partition);
};
public void flush() throws IOException {
}
@Override
public void close() throws IOException {
if (closed) {
return;
}
if (null != kvPusher) {
kvPusher.close();
}
if (null != combinerHandler) {
combinerHandler.close();
}
if (null != nativeHandler) {
nativeHandler.close();
}
closed = true;
}
@Override
public ReadWriteBuffer onCall(Command command, ReadWriteBuffer parameter) throws IOException {
Path p = null;
if (null == command) {
return null;
}
if (command.equals(GET_OUTPUT_PATH)) {
p = output.getOutputFileForWrite(-1);
} else if (command.equals(GET_OUTPUT_INDEX_PATH)) {
p = output.getOutputIndexFileForWrite(-1);
} else if (command.equals(GET_SPILL_PATH)) {
p = output.getSpillFileForWrite(spillNumber++, -1);
} else if (command.equals(GET_COMBINE_HANDLER)) {
if (null == combinerHandler) {
return null;
}
final ReadWriteBuffer result = new ReadWriteBuffer(8);
result.writeLong(combinerHandler.getId());
return result;
} else {
throw new IOException("Illegal command: " + command.toString());
}
if (p != null) {
final ReadWriteBuffer result = new ReadWriteBuffer();
result.writeString(p.toUri().getPath());
return result;
} else {
throw new IOException("MapOutputFile can't allocate spill/output file");
}
}
}