blob: 24de8f3202f9d5ffe77884306e513e2c90ca48b6 [file] [log] [blame]
package backtype.storm.serialization;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.CRC32OutputStream;
import backtype.storm.utils.WritableUtils;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class TupleSerializer {
ByteArrayOutputStream _outputter;
DataOutputStream _dataOutputter;
Map<Integer, Map<Integer, ValuesSerializer>> _fieldSerializers = new HashMap<Integer, Map<Integer, ValuesSerializer>>();
Map _conf;
public TupleSerializer(Map conf) {
_outputter = new ByteArrayOutputStream();
_dataOutputter = new DataOutputStream(_outputter);
_conf = conf;
}
public byte[] serialize(Tuple tuple) throws IOException {
_outputter.reset();
WritableUtils.writeVInt(_dataOutputter, tuple.getSourceTask());
WritableUtils.writeVInt(_dataOutputter, tuple.getSourceStreamId());
tuple.getMessageId().serialize(_dataOutputter);
ValuesSerializer streamSerializers = getValuesSerializer(_fieldSerializers, tuple);
streamSerializers.serializeInto(tuple.getValues(), _dataOutputter);
return _outputter.toByteArray();
}
public long crc32(Tuple tuple) {
CRC32OutputStream hasher = new CRC32OutputStream();
try {
getValuesSerializer(_fieldSerializers, tuple).serializeInto(tuple.getValues(), new DataOutputStream(hasher));
} catch (IOException e) {
throw new RuntimeException(e);
}
return hasher.getValue();
}
private ValuesSerializer getValuesSerializer(Map<Integer, Map<Integer, ValuesSerializer>> serializers, Tuple tuple) {
Map<Integer, ValuesSerializer> streamToSerializers = serializers.get(tuple.getSourceComponent());
if(streamToSerializers==null) {
streamToSerializers = new HashMap<Integer, ValuesSerializer>();
serializers.put(tuple.getSourceComponent(), streamToSerializers);
}
ValuesSerializer streamSerializers = streamToSerializers.get(tuple.getSourceStreamId());
if(streamSerializers==null) {
streamSerializers = new ValuesSerializer(_conf);
streamToSerializers.put(tuple.getSourceStreamId(), streamSerializers);
}
return streamSerializers;
}
}