blob: a27ffe33178692b5b9abb06927a7150e3dc271a3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hama.pipes.protocol;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hama.Constants;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.commons.util.KeyValuePair;
public class UplinkReader<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M extends Writable>
extends Thread {
private static final Log LOG = LogFactory.getLog(UplinkReader.class);
private BinaryProtocol<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> binProtocol;
private BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> peer = null;
private Configuration conf;
private FileSystem fs;
protected DataInputStream inStream;
protected DataOutputStream outStream;
private Map<Integer, Entry<SequenceFile.Reader, Entry<Writable, Writable>>> sequenceFileReaders;
private Map<Integer, Entry<SequenceFile.Writer, Entry<Writable, Writable>>> sequenceFileWriters;
private Set<String> sequenceFileWriterPaths;
public UplinkReader(
BinaryProtocol<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> binaryProtocol,
Configuration conf, InputStream stream) throws IOException {
this.binProtocol = binaryProtocol;
this.conf = conf;
this.fs = FileSystem.get(conf);
this.inStream = new DataInputStream(new BufferedInputStream(stream,
BinaryProtocol.BUFFER_SIZE));
this.outStream = binProtocol.getOutputStream();
this.sequenceFileReaders = new HashMap<Integer, Entry<SequenceFile.Reader, Entry<Writable, Writable>>>();
this.sequenceFileWriters = new HashMap<Integer, Entry<SequenceFile.Writer, Entry<Writable, Writable>>>();
this.sequenceFileWriterPaths = new HashSet<String>();
}
public UplinkReader(
BinaryProtocol<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> binaryProtocol,
BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> peer, InputStream stream)
throws IOException {
this(binaryProtocol, peer.getConfiguration(), stream);
this.peer = peer;
}
private boolean isPeerAvailable() {
return this.peer != null;
}
@Override
public void run() {
while (true) {
try {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException();
}
int cmd = readCommand();
LOG.debug("Handling uplink command: " + cmd);
// MessageType.values()[cmd] may cause NullPointerException (bad
// command)
if (cmd == -1) {
continue;
} else if (cmd == MessageType.WRITE_KEYVALUE.code && isPeerAvailable()) { // INCOMING
writeKeyValue();
} else if (cmd == MessageType.READ_KEYVALUE.code && isPeerAvailable()) { // OUTGOING
readKeyValue();
} else if (cmd == MessageType.INCREMENT_COUNTER.code
&& isPeerAvailable()) { // INCOMING
incrementCounter();
} else if (cmd == MessageType.REGISTER_COUNTER.code
&& isPeerAvailable()) { // INCOMING
/*
* Is not used in Hama. Hadoop Pipes uses it - maybe for performance
* issue, skip transferring group and name each INCREMENT
*/
} else if (cmd == MessageType.TASK_DONE.code) { // INCOMING
synchronized (binProtocol.hasTaskLock) {
binProtocol.setHasTask(false);
LOG.debug("Got MessageType.TASK_DONE");
binProtocol.hasTaskLock.notify();
}
} else if (cmd == MessageType.DONE.code) { // INCOMING
LOG.debug("Pipe child done");
return;
} else if (cmd == MessageType.SEND_MSG.code && isPeerAvailable()) { // INCOMING
sendMessage();
} else if (cmd == MessageType.GET_MSG_COUNT.code && isPeerAvailable()) { // OUTGOING
getMessageCount();
} else if (cmd == MessageType.GET_MSG.code && isPeerAvailable()) { // OUTGOING
getMessage();
} else if (cmd == MessageType.SYNC.code && isPeerAvailable()) { // INCOMING
sync();
} else if (cmd == MessageType.GET_ALL_PEERNAME.code
&& isPeerAvailable()) { // OUTGOING
getAllPeerNames();
} else if (cmd == MessageType.GET_PEERNAME.code && isPeerAvailable()) { // OUTGOING
getPeerName();
} else if (cmd == MessageType.GET_PEER_INDEX.code && isPeerAvailable()) { // OUTGOING
getPeerIndex();
} else if (cmd == MessageType.GET_PEER_COUNT.code && isPeerAvailable()) { // OUTGOING
getPeerCount();
} else if (cmd == MessageType.GET_SUPERSTEP_COUNT.code
&& isPeerAvailable()) { // OUTGOING
getSuperstepCount();
} else if (cmd == MessageType.REOPEN_INPUT.code && isPeerAvailable()) { // INCOMING
reopenInput();
} else if (cmd == MessageType.CLEAR.code && isPeerAvailable()) { // INCOMING
clear();
/* SequenceFileConnector Implementation */
} else if (cmd == MessageType.SEQFILE_OPEN.code) { // OUTGOING
seqFileOpen();
} else if (cmd == MessageType.SEQFILE_READNEXT.code) { // OUTGOING
seqFileReadNext();
} else if (cmd == MessageType.SEQFILE_APPEND.code) { // INCOMING
seqFileAppend();
} else if (cmd == MessageType.SEQFILE_CLOSE.code) { // OUTGOING
seqFileClose();
/* SequenceFileConnector Implementation */
} else if (cmd == MessageType.PARTITION_RESPONSE.code) { // INCOMING
partitionResponse();
} else {
throw new Exception("Bad command code: " + cmd);
}
} catch (InterruptedException e) {
onError(e);
return;
} catch (Throwable e) {
onError(e);
throw new RuntimeException(e);
}
}
}
// onError is overwritten by StreamingProtocol in Hama Streaming
protected void onError(Throwable e) {
LOG.error(StringUtils.stringifyException(e));
// notify binaryProtocol and set Exception
synchronized (binProtocol.hasTaskLock) {
binProtocol.setUplinkException(e);
binProtocol.setHasTask(false);
binProtocol.hasTaskLock.notify();
}
}
// readCommand is overwritten by StreamingProtocol in Hama Streaming
protected int readCommand() throws IOException {
return WritableUtils.readVInt(this.inStream);
}
public void closeConnection() throws IOException {
// close input stream
this.inStream.close();
// close open SequenceFileReaders
for (int fileID : this.sequenceFileReaders.keySet()) {
LOG.debug("close SequenceFileReader: " + fileID);
this.sequenceFileReaders.get(fileID).getKey().close();
}
// close open SequenceFileWriters
for (int fileID : this.sequenceFileWriters.keySet()) {
LOG.debug("close SequenceFileWriter: " + fileID);
this.sequenceFileWriters.get(fileID).getKey().close();
}
}
public void reopenInput() throws IOException {
LOG.debug("Got MessageType.REOPEN_INPUT");
peer.reopenInput();
WritableUtils.writeVInt(this.outStream, MessageType.REOPEN_INPUT.code);
binProtocol.flush();
LOG.debug("Responded MessageType.REOPEN_INPUT");
}
public void clear() throws IOException {
LOG.debug("Got MessageType.CLEAR");
peer.clear();
WritableUtils.writeVInt(this.outStream, MessageType.CLEAR.code);
binProtocol.flush();
LOG.debug("Responded MessageType.CLEAR");
}
public void getSuperstepCount() throws IOException {
WritableUtils.writeVInt(this.outStream,
MessageType.GET_SUPERSTEP_COUNT.code);
WritableUtils.writeVLong(this.outStream, peer.getSuperstepCount());
binProtocol.flush();
LOG.debug("Responded MessageType.GET_SUPERSTEP_COUNT - SuperstepCount: "
+ peer.getSuperstepCount());
}
public void getPeerCount() throws IOException {
WritableUtils.writeVInt(this.outStream, MessageType.GET_PEER_COUNT.code);
WritableUtils.writeVInt(this.outStream, peer.getNumPeers());
binProtocol.flush();
LOG.debug("Responded MessageType.GET_PEER_COUNT - NumPeers: "
+ peer.getNumPeers());
}
public void getPeerIndex() throws IOException {
WritableUtils.writeVInt(this.outStream, MessageType.GET_PEER_INDEX.code);
WritableUtils.writeVInt(this.outStream, peer.getPeerIndex());
binProtocol.flush();
LOG.debug("Responded MessageType.GET_PEER_INDEX - PeerIndex: "
+ peer.getPeerIndex());
}
public void getPeerName() throws IOException {
int id = WritableUtils.readVInt(this.inStream);
LOG.debug("Got MessageType.GET_PEERNAME id: " + id);
WritableUtils.writeVInt(this.outStream, MessageType.GET_PEERNAME.code);
if (id == -1) { // -1 indicates get own PeerName
Text.writeString(this.outStream, peer.getPeerName());
LOG.debug("Responded MessageType.GET_PEERNAME - Get Own PeerName: "
+ peer.getPeerName());
} else if ((id < -1) || (id >= peer.getNumPeers())) {
// if no PeerName for this index is found write emptyString
Text.writeString(this.outStream, "");
LOG.debug("Responded MessageType.GET_PEERNAME - Empty PeerName!");
} else {
Text.writeString(this.outStream, peer.getPeerName(id));
LOG.debug("Responded MessageType.GET_PEERNAME - PeerName: "
+ peer.getPeerName(id));
}
binProtocol.flush();
}
public void getAllPeerNames() throws IOException {
LOG.debug("Got MessageType.GET_ALL_PEERNAME");
String[] peerNames = peer.getAllPeerNames();
WritableUtils.writeVInt(this.outStream, MessageType.GET_ALL_PEERNAME.code);
WritableUtils.writeVInt(this.outStream, peerNames.length);
for (String s : peerNames) {
Text.writeString(this.outStream, s);
}
binProtocol.flush();
LOG.debug("Responded MessageType.GET_ALL_PEERNAME - peerNamesCount: "
+ peerNames.length);
}
public void sync() throws IOException, SyncException, InterruptedException {
LOG.debug("Got MessageType.SYNC");
peer.sync(); // this call blocks
WritableUtils.writeVInt(this.outStream, MessageType.SYNC.code);
binProtocol.flush();
LOG.debug("Responded MessageType.SYNC");
}
public void getMessage() throws IOException {
LOG.debug("Got MessageType.GET_MSG");
Writable message = peer.getCurrentMessage();
if (message != null) {
WritableUtils.writeVInt(this.outStream, MessageType.GET_MSG.code);
binProtocol.writeObject(message);
LOG.debug("Responded MessageType.GET_MSG - Message: "
+ ((message.toString().length() < 10) ? message.toString() : message
.toString().substring(0, 9) + "..."));
} else {
WritableUtils.writeVInt(this.outStream, MessageType.END_OF_DATA.code);
LOG.debug("Responded MessageType.END_OF_DATA");
}
binProtocol.flush();
}
public void getMessageCount() throws IOException {
WritableUtils.writeVInt(this.outStream, MessageType.GET_MSG_COUNT.code);
WritableUtils.writeVInt(this.outStream, peer.getNumCurrentMessages());
binProtocol.flush();
LOG.debug("Responded MessageType.GET_MSG_COUNT - Count: "
+ peer.getNumCurrentMessages());
}
public void incrementCounter() throws IOException {
LOG.debug("Got MessageType.INCREMENT_COUNTER");
String group = Text.readString(this.inStream);
String name = Text.readString(this.inStream);
long amount = WritableUtils.readVLong(this.inStream);
LOG.debug("Got MessageType.INCREMENT_COUNTER group: " + group + " name: "
+ name + " amount: " + amount);
peer.incrementCounter(group, name, amount);
WritableUtils.writeVInt(this.outStream, MessageType.INCREMENT_COUNTER.code);
binProtocol.flush();
LOG.debug("Responded MessageType.INCREMENT_COUNTER");
}
@SuppressWarnings("unchecked")
public void sendMessage() throws IOException, InstantiationException,
IllegalAccessException {
String peerName = Text.readString(this.inStream);
M message = (M) ReflectionUtils.newInstance((Class<? extends M>) conf
.getClass(Constants.MESSAGE_CLASS, BytesWritable.class), conf);
LOG.debug("Got MessageType.SEND_MSG peerName: " + peerName
+ " messageClass: " + message.getClass().getName());
readObject(message);
peer.send(peerName, message);
WritableUtils.writeVInt(this.outStream, MessageType.SEND_MSG.code);
binProtocol.flush();
LOG.debug("Responded MessageType.SEND_MSG");
LOG.debug("Sent message to peerName: "
+ peerName
+ " messageClass: "
+ message.getClass().getName()
+ " Message: "
+ ((message.toString().length() < 10) ? message.toString() : message
.toString().substring(0, 9) + "..."));
}
public void readKeyValue() throws IOException {
boolean nullinput = peer.getConfiguration().get(
Constants.INPUT_FORMAT_CLASS) == null
|| peer.getConfiguration().get(Constants.INPUT_FORMAT_CLASS)
.equals("org.apache.hama.bsp.NullInputFormat");
if (!nullinput) {
KeyValuePair<KEYIN, VALUEIN> pair = peer.readNext();
if (pair != null) {
WritableUtils.writeVInt(this.outStream, MessageType.READ_KEYVALUE.code);
binProtocol.writeObject((Writable) pair.getKey());
binProtocol.writeObject((Writable) pair.getValue());
LOG.debug("Responded MessageType.READ_KEYVALUE -"
+ " Key: "
+ ((pair.getKey().toString().length() < 10) ? pair.getKey()
.toString() : pair.getKey().toString().substring(0, 9) + "...")
+ " Value: "
+ ((pair.getValue().toString().length() < 10) ? pair.getValue()
.toString() : pair.getValue().toString().substring(0, 9)
+ "..."));
} else {
WritableUtils.writeVInt(this.outStream, MessageType.END_OF_DATA.code);
LOG.debug("Responded MessageType.READ_KEYVALUE - END_OF_DATA");
}
binProtocol.flush();
} else {
WritableUtils.writeVInt(this.outStream, MessageType.END_OF_DATA.code);
binProtocol.flush();
LOG.debug("Responded MessageType.READ_KEYVALUE - END_OF_DATA");
}
}
@SuppressWarnings("unchecked")
public void writeKeyValue() throws IOException {
KEYOUT keyOut = (KEYOUT) ReflectionUtils.newInstance(
(Class<? extends KEYOUT>) conf.getClass("bsp.output.key.class",
Object.class), conf);
VALUEOUT valueOut = (VALUEOUT) ReflectionUtils.newInstance(
(Class<? extends VALUEOUT>) conf.getClass("bsp.output.value.class",
Object.class), conf);
LOG.debug("Got MessageType.WRITE_KEYVALUE keyOutClass: "
+ keyOut.getClass().getName() + " valueOutClass: "
+ valueOut.getClass().getName());
readObject((Writable) keyOut);
readObject((Writable) valueOut);
peer.write(keyOut, valueOut);
WritableUtils.writeVInt(this.outStream, MessageType.WRITE_KEYVALUE.code);
binProtocol.flush();
LOG.debug("Responded MessageType.WRITE_KEYVALUE");
LOG.debug("Done MessageType.WRITE_KEYVALUE -"
+ " Key: "
+ ((keyOut.toString().length() < 10) ? keyOut.toString() : keyOut
.toString().substring(0, 9) + "...")
+ " Value: "
+ ((valueOut.toString().length() < 10) ? valueOut.toString() : valueOut
.toString().substring(0, 9) + "..."));
}
public void seqFileOpen() throws IOException {
String path = Text.readString(this.inStream);
// option - read = "r" or write = "w"
String option = Text.readString(this.inStream);
// key and value class stored in the SequenceFile
String keyClass = Text.readString(this.inStream);
String valueClass = Text.readString(this.inStream);
LOG.debug("GOT MessageType.SEQFILE_OPEN - Path: " + path);
LOG.debug("GOT MessageType.SEQFILE_OPEN - Option: " + option);
LOG.debug("GOT MessageType.SEQFILE_OPEN - KeyClass: " + keyClass);
LOG.debug("GOT MessageType.SEQFILE_OPEN - ValueClass: " + valueClass);
int fileID = -1;
if (option.equals("r")) {
SequenceFile.Reader reader;
try {
reader = new SequenceFile.Reader(fs, new Path(path), conf);
if (reader.getKeyClassName().equals(keyClass)
&& reader.getValueClassName().equals(valueClass)) {
// try to load key and value class
Class<?> sequenceKeyClass = conf.getClassLoader().loadClass(keyClass);
Class<?> sequenceValueClass = conf.getClassLoader().loadClass(
valueClass);
// try to instantiate key and value class
Writable sequenceKeyWritable = (Writable) ReflectionUtils
.newInstance(sequenceKeyClass, conf);
Writable sequenceValueWritable = (Writable) ReflectionUtils
.newInstance(sequenceValueClass, conf);
// put new fileID and key and value Writable instances into HashMap
fileID = reader.hashCode();
this.sequenceFileReaders
.put(
fileID,
new AbstractMap.SimpleEntry<SequenceFile.Reader, Entry<Writable, Writable>>(
reader, new AbstractMap.SimpleEntry<Writable, Writable>(
sequenceKeyWritable, sequenceValueWritable)));
} else { // keyClass or valueClass is wrong
fileID = -1;
if (!reader.getKeyClassName().equals(keyClass)) {
LOG.error("SEQFILE_OPEN - Wrong KeyClass: " + keyClass
+ " File KeyClass: " + reader.getKeyClassName());
} else {
LOG.error("SEQFILE_OPEN - Wrong ValueClass: " + valueClass
+ " File ValueClass: " + reader.getValueClassName());
}
}
} catch (IOException e) {
LOG.error("SEQFILE_OPEN - " + e.getMessage());
fileID = -1;
} catch (ClassNotFoundException e) {
LOG.error("SEQFILE_OPEN - " + e.getMessage());
fileID = -1;
}
} else if (option.equals("w")) {
SequenceFile.Writer writer;
try {
// SequenceFile.Writer has an exclusive lease for a file
// No other client can write to this file until other Writer has
// completed
if (!this.sequenceFileWriterPaths.contains(path)) {
// try to load key and value class
Class<?> sequenceKeyClass = conf.getClassLoader().loadClass(keyClass);
Class<?> sequenceValueClass = conf.getClassLoader().loadClass(
valueClass);
// try to instantiate key and value class
Writable sequenceKeyWritable = (Writable) ReflectionUtils
.newInstance(sequenceKeyClass, conf);
Writable sequenceValueWritable = (Writable) ReflectionUtils
.newInstance(sequenceValueClass, conf);
writer = new SequenceFile.Writer(fs, conf, new Path(path),
sequenceKeyClass, sequenceValueClass);
// put new fileID and key and value Writable instances into HashMap
fileID = writer.hashCode();
this.sequenceFileWriters
.put(
fileID,
new AbstractMap.SimpleEntry<SequenceFile.Writer, Entry<Writable, Writable>>(
writer, new AbstractMap.SimpleEntry<Writable, Writable>(
sequenceKeyWritable, sequenceValueWritable)));
// add path to set (exclusive access)
this.sequenceFileWriterPaths.add(path);
} else { // Path was already opened by another SequenceFile.Writer
fileID = -1;
LOG.error("SEQFILE_OPEN - Path: " + path
+ " is already used by another Writer!");
}
} catch (IOException e) {
LOG.error("SEQFILE_OPEN - " + e.getMessage());
fileID = -1;
} catch (ClassNotFoundException e) {
LOG.error("SEQFILE_OPEN - " + e.getMessage());
fileID = -1;
}
} else { // wrong option
LOG.error("SEQFILE_OPEN - Wrong option: '" + option + "'");
}
WritableUtils.writeVInt(this.outStream, MessageType.SEQFILE_OPEN.code);
WritableUtils.writeVInt(this.outStream, fileID);
binProtocol.flush();
LOG.debug("Responded MessageType.SEQFILE_OPEN - FileID: " + fileID);
}
public void seqFileReadNext() throws IOException {
int fileID = WritableUtils.readVInt(this.inStream);
LOG.debug("GOT MessageType.SEQFILE_READNEXT - FileID: " + fileID);
// check if fileID is available in sequenceFileReader
if (this.sequenceFileReaders.containsKey(fileID)) {
Writable sequenceKeyWritable = sequenceFileReaders.get(fileID).getValue()
.getKey();
Writable sequenceValueWritable = sequenceFileReaders.get(fileID)
.getValue().getValue();
// try to read next key/value pair from SequenceFile.Reader
if (this.sequenceFileReaders.get(fileID).getKey()
.next(sequenceKeyWritable, sequenceValueWritable)) {
WritableUtils.writeVInt(this.outStream,
MessageType.SEQFILE_READNEXT.code);
binProtocol.writeObject(sequenceKeyWritable);
binProtocol.writeObject(sequenceValueWritable);
LOG.debug("Responded MessageType.SEQFILE_READNEXT -"
+ " Key: "
+ ((sequenceKeyWritable.toString().length() < 10) ? sequenceKeyWritable
.toString() : sequenceKeyWritable.toString().substring(0, 9)
+ "...")
+ " Value: "
+ ((sequenceValueWritable.toString().length() < 10) ? sequenceValueWritable
.toString() : sequenceValueWritable.toString().substring(0, 9)
+ "..."));
} else { // false when at end of file
WritableUtils.writeVInt(this.outStream, MessageType.END_OF_DATA.code);
LOG.debug("Responded MessageType.SEQFILE_READNEXT - END_OF_DATA");
}
binProtocol.flush();
} else { // no fileID stored
LOG.error("MessageType.SEQFILE_READNEXT: FileID " + fileID
+ " not found!");
WritableUtils.writeVInt(this.outStream, MessageType.END_OF_DATA.code);
LOG.debug("Responded MessageType.SEQFILE_READNEXT - END_OF_DATA");
binProtocol.flush();
}
}
public void seqFileAppend() throws IOException {
int fileID = WritableUtils.readVInt(this.inStream);
LOG.debug("GOT MessageType.SEQFILE_APPEND - FileID: " + fileID);
boolean result = false;
// check if fileID is available in sequenceFileWriter
if (this.sequenceFileWriters.containsKey(fileID)) {
Writable sequenceKeyWritable = sequenceFileWriters.get(fileID).getValue()
.getKey();
Writable sequenceValueWritable = sequenceFileWriters.get(fileID)
.getValue().getValue();
// try to read key and value
readObject(sequenceKeyWritable);
readObject(sequenceValueWritable);
if ((sequenceKeyWritable != null) && (sequenceValueWritable != null)) {
// append to sequenceFile
this.sequenceFileWriters.get(fileID).getKey()
.append(sequenceKeyWritable, sequenceValueWritable);
LOG.debug("Stored data: Key: "
+ ((sequenceKeyWritable.toString().length() < 10) ? sequenceKeyWritable
.toString() : sequenceKeyWritable.toString().substring(0, 9)
+ "...")
+ " Value: "
+ ((sequenceValueWritable.toString().length() < 10) ? sequenceValueWritable
.toString() : sequenceValueWritable.toString().substring(0, 9)
+ "..."));
result = true;
}
} else { // no fileID stored
// Skip written data from InputStream
int availableBytes = this.inStream.available();
this.inStream.skip(availableBytes);
LOG.debug("MessageType.SEQFILE_APPEND: skip " + availableBytes + " bytes");
LOG.error("MessageType.SEQFILE_APPEND: FileID " + fileID + " not found!");
}
// RESPOND
WritableUtils.writeVInt(this.outStream, MessageType.SEQFILE_APPEND.code);
WritableUtils.writeVInt(this.outStream, result ? 1 : 0);
binProtocol.flush();
LOG.debug("Responded MessageType.SEQFILE_APPEND - Result: " + result);
}
public void seqFileClose() throws IOException {
int fileID = WritableUtils.readVInt(this.inStream);
LOG.debug("GOT MessageType.SEQFILE_CLOSE - FileID: " + fileID);
boolean result = false;
if (this.sequenceFileReaders.containsKey(fileID)) {
this.sequenceFileReaders.get(fileID).getKey().close();
this.sequenceFileReaders.remove(fileID);
result = true;
} else if (this.sequenceFileWriters.containsKey(fileID)) {
this.sequenceFileWriters.get(fileID).getKey().close();
this.sequenceFileWriters.remove(fileID);
result = true;
} else { // no fileID stored
LOG.error("MessageType.SEQFILE_CLOSE: FileID " + fileID + " not found!");
}
// RESPOND
WritableUtils.writeVInt(this.outStream, MessageType.SEQFILE_CLOSE.code);
WritableUtils.writeVInt(this.outStream, result ? 1 : 0);
binProtocol.flush();
LOG.debug("Responded MessageType.SEQFILE_CLOSE - Result: " + result);
}
public void partitionResponse() throws IOException {
int partResponse = WritableUtils.readVInt(this.inStream);
synchronized (binProtocol.resultLock) {
binProtocol.setResult(partResponse);
LOG.debug("Received MessageType.PARTITION_RESPONSE - Result: "
+ partResponse);
binProtocol.resultLock.notify();
}
}
/**
* Read the given object from stream. If it is a IntWritable, LongWritable,
* FloatWritable, DoubleWritable, Text or BytesWritable, read it directly.
* Otherwise, read it to a buffer and then write the length and data to the
* stream.
*
* @param obj the object to read
* @throws IOException
*/
protected void readObject(Writable obj) throws IOException {
byte[] buffer;
// For BytesWritable and Text, use the specified length to set the length
// this causes the "obvious" translations to work. So that if you emit
// a string "abc" from C++, it shows up as "abc".
if (obj instanceof Text) {
int numBytes = WritableUtils.readVInt(this.inStream);
buffer = new byte[numBytes];
this.inStream.readFully(buffer);
((Text) obj).set(buffer);
} else if (obj instanceof BytesWritable) {
int numBytes = WritableUtils.readVInt(this.inStream);
buffer = new byte[numBytes];
this.inStream.readFully(buffer);
((BytesWritable) obj).set(buffer, 0, numBytes);
} else if (obj instanceof IntWritable) {
((IntWritable) obj).set(WritableUtils.readVInt(this.inStream));
} else if (obj instanceof LongWritable) {
((LongWritable) obj).set(WritableUtils.readVLong(this.inStream));
} else {
try {
LOG.debug("reading type: " + obj.getClass().getName());
// try reading object
obj.readFields(this.inStream);
} catch (IOException e) {
throw new IOException("Hama Pipes is not able to read "
+ obj.getClass().getName(), e);
}
}
}
}