blob: ebfb18453778b789833e058f622351c1bfc13c37 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.pipes;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileOutputStream;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.StringUtils;
/**
* This protocol is a binary implementation of the Pipes protocol.
*/
class BinaryProtocol<K1 extends WritableComparable, V1 extends Writable,
K2 extends WritableComparable, V2 extends Writable>
implements DownwardProtocol<K1, V1> {
public static final int CURRENT_PROTOCOL_VERSION = 0;
/**
* The buffer size for the command socket
*/
private static final int BUFFER_SIZE = 128*1024;
private DataOutputStream stream;
private DataOutputBuffer buffer = new DataOutputBuffer();
private static final Log LOG =
LogFactory.getLog(BinaryProtocol.class.getName());
private UplinkReaderThread uplink;
/**
* The integer codes to represent the different messages. These must match
* the C++ codes or massive confusion will result.
*/
private static enum MessageType { START(0),
SET_JOB_CONF(1),
SET_INPUT_TYPES(2),
RUN_MAP(3),
MAP_ITEM(4),
RUN_REDUCE(5),
REDUCE_KEY(6),
REDUCE_VALUE(7),
CLOSE(8),
ABORT(9),
AUTHENTICATION_REQ(10),
OUTPUT(50),
PARTITIONED_OUTPUT(51),
STATUS(52),
PROGRESS(53),
DONE(54),
REGISTER_COUNTER(55),
INCREMENT_COUNTER(56),
AUTHENTICATION_RESP(57);
final int code;
MessageType(int code) {
this.code = code;
}
}
private static class UplinkReaderThread<K2 extends WritableComparable,
V2 extends Writable>
extends Thread {
private DataInputStream inStream;
private UpwardProtocol<K2, V2> handler;
private K2 key;
private V2 value;
private boolean authPending = true;
public UplinkReaderThread(InputStream stream,
UpwardProtocol<K2, V2> handler,
K2 key, V2 value) throws IOException{
inStream = new DataInputStream(new BufferedInputStream(stream,
BUFFER_SIZE));
this.handler = handler;
this.key = key;
this.value = value;
}
public void closeConnection() throws IOException {
inStream.close();
}
public void run() {
while (true) {
try {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException();
}
int cmd = WritableUtils.readVInt(inStream);
LOG.debug("Handling uplink command " + cmd);
if (cmd == MessageType.AUTHENTICATION_RESP.code) {
String digest = Text.readString(inStream);
authPending = !handler.authenticate(digest);
} else if (authPending) {
LOG.warn("Message " + cmd + " received before authentication is "
+ "complete. Ignoring");
continue;
} else if (cmd == MessageType.OUTPUT.code) {
readObject(key);
readObject(value);
handler.output(key, value);
} else if (cmd == MessageType.PARTITIONED_OUTPUT.code) {
int part = WritableUtils.readVInt(inStream);
readObject(key);
readObject(value);
handler.partitionedOutput(part, key, value);
} else if (cmd == MessageType.STATUS.code) {
handler.status(Text.readString(inStream));
} else if (cmd == MessageType.PROGRESS.code) {
handler.progress(inStream.readFloat());
} else if (cmd == MessageType.REGISTER_COUNTER.code) {
int id = WritableUtils.readVInt(inStream);
String group = Text.readString(inStream);
String name = Text.readString(inStream);
handler.registerCounter(id, group, name);
} else if (cmd == MessageType.INCREMENT_COUNTER.code) {
int id = WritableUtils.readVInt(inStream);
long amount = WritableUtils.readVLong(inStream);
handler.incrementCounter(id, amount);
} else if (cmd == MessageType.DONE.code) {
LOG.debug("Pipe child done");
handler.done();
return;
} else {
throw new IOException("Bad command code: " + cmd);
}
} catch (InterruptedException e) {
return;
} catch (Throwable e) {
LOG.error(StringUtils.stringifyException(e));
handler.failed(e);
return;
}
}
}
private void readObject(Writable obj) throws IOException {
int numBytes = WritableUtils.readVInt(inStream);
byte[] buffer;
// For BytesWritable and Text, use the specified length to set the length
// this causes the "obvious" translations to work. So that if you emit
// a string "abc" from C++, it shows up as "abc".
if (obj instanceof BytesWritable) {
buffer = new byte[numBytes];
inStream.readFully(buffer);
((BytesWritable) obj).set(buffer, 0, numBytes);
} else if (obj instanceof Text) {
buffer = new byte[numBytes];
inStream.readFully(buffer);
((Text) obj).set(buffer);
} else {
obj.readFields(inStream);
}
}
}
/**
* An output stream that will save a copy of the data into a file.
*/
private static class TeeOutputStream extends FilterOutputStream {
private OutputStream file;
TeeOutputStream(String filename, OutputStream base) throws IOException {
super(base);
file = new FileOutputStream(filename);
}
public void write(byte b[], int off, int len) throws IOException {
file.write(b,off,len);
out.write(b,off,len);
}
public void write(int b) throws IOException {
file.write(b);
out.write(b);
}
public void flush() throws IOException {
file.flush();
out.flush();
}
public void close() throws IOException {
flush();
file.close();
out.close();
}
}
/**
* Create a proxy object that will speak the binary protocol on a socket.
* Upward messages are passed on the specified handler and downward
* downward messages are public methods on this object.
* @param sock The socket to communicate on.
* @param handler The handler for the received messages.
* @param key The object to read keys into.
* @param value The object to read values into.
* @param config The job's configuration
* @throws IOException
*/
public BinaryProtocol(Socket sock,
UpwardProtocol<K2, V2> handler,
K2 key,
V2 value,
JobConf config) throws IOException {
OutputStream raw = sock.getOutputStream();
// If we are debugging, save a copy of the downlink commands to a file
if (Submitter.getKeepCommandFile(config)) {
raw = new TeeOutputStream("downlink.data", raw);
}
stream = new DataOutputStream(new BufferedOutputStream(raw,
BUFFER_SIZE)) ;
uplink = new UplinkReaderThread<K2, V2>(sock.getInputStream(),
handler, key, value);
uplink.setName("pipe-uplink-handler");
uplink.start();
}
/**
* Close the connection and shutdown the handler thread.
* @throws IOException
* @throws InterruptedException
*/
public void close() throws IOException, InterruptedException {
LOG.debug("closing connection");
stream.close();
uplink.closeConnection();
uplink.interrupt();
uplink.join();
}
public void authenticate(String digest, String challenge)
throws IOException {
LOG.debug("Sending AUTHENTICATION_REQ, digest=" + digest + ", challenge="
+ challenge);
WritableUtils.writeVInt(stream, MessageType.AUTHENTICATION_REQ.code);
Text.writeString(stream, digest);
Text.writeString(stream, challenge);
}
public void start() throws IOException {
LOG.debug("starting downlink");
WritableUtils.writeVInt(stream, MessageType.START.code);
WritableUtils.writeVInt(stream, CURRENT_PROTOCOL_VERSION);
}
public void setJobConf(JobConf job) throws IOException {
WritableUtils.writeVInt(stream, MessageType.SET_JOB_CONF.code);
List<String> list = new ArrayList<String>();
for(Map.Entry<String, String> itm: job) {
list.add(itm.getKey());
list.add(itm.getValue());
}
WritableUtils.writeVInt(stream, list.size());
for(String entry: list){
Text.writeString(stream, entry);
}
}
public void setInputTypes(String keyType,
String valueType) throws IOException {
WritableUtils.writeVInt(stream, MessageType.SET_INPUT_TYPES.code);
Text.writeString(stream, keyType);
Text.writeString(stream, valueType);
}
public void runMap(InputSplit split, int numReduces,
boolean pipedInput) throws IOException {
WritableUtils.writeVInt(stream, MessageType.RUN_MAP.code);
writeObject(split);
WritableUtils.writeVInt(stream, numReduces);
WritableUtils.writeVInt(stream, pipedInput ? 1 : 0);
}
public void mapItem(WritableComparable key,
Writable value) throws IOException {
WritableUtils.writeVInt(stream, MessageType.MAP_ITEM.code);
writeObject(key);
writeObject(value);
}
public void runReduce(int reduce, boolean pipedOutput) throws IOException {
WritableUtils.writeVInt(stream, MessageType.RUN_REDUCE.code);
WritableUtils.writeVInt(stream, reduce);
WritableUtils.writeVInt(stream, pipedOutput ? 1 : 0);
}
public void reduceKey(WritableComparable key) throws IOException {
WritableUtils.writeVInt(stream, MessageType.REDUCE_KEY.code);
writeObject(key);
}
public void reduceValue(Writable value) throws IOException {
WritableUtils.writeVInt(stream, MessageType.REDUCE_VALUE.code);
writeObject(value);
}
public void endOfInput() throws IOException {
WritableUtils.writeVInt(stream, MessageType.CLOSE.code);
LOG.debug("Sent close command");
}
public void abort() throws IOException {
WritableUtils.writeVInt(stream, MessageType.ABORT.code);
LOG.debug("Sent abort command");
}
public void flush() throws IOException {
stream.flush();
}
/**
* Write the given object to the stream. If it is a Text or BytesWritable,
* write it directly. Otherwise, write it to a buffer and then write the
* length and data to the stream.
* @param obj the object to write
* @throws IOException
*/
private void writeObject(Writable obj) throws IOException {
// For Text and BytesWritable, encode them directly, so that they end up
// in C++ as the natural translations.
if (obj instanceof Text) {
Text t = (Text) obj;
int len = t.getLength();
WritableUtils.writeVInt(stream, len);
stream.write(t.getBytes(), 0, len);
} else if (obj instanceof BytesWritable) {
BytesWritable b = (BytesWritable) obj;
int len = b.getLength();
WritableUtils.writeVInt(stream, len);
stream.write(b.getBytes(), 0, len);
} else {
buffer.reset();
obj.write(buffer);
int length = buffer.getLength();
WritableUtils.writeVInt(stream, length);
stream.write(buffer.getData(), 0, length);
}
}
}