blob: 0520ed8128c1df98f61316c2756a5ba5ddabf7a3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.kafka.record;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.MapFn;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.io.BytesWritable;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.record.TimestampType;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
/**
* Serializer/De-Serializer for Kafka's {@link ConsumerRecord}
*/
public class ConsumerRecordHelper {
/**
* PType for {@link ConsumerRecord}
*/
@SuppressWarnings("unchecked")
public static final PType<ConsumerRecord<BytesWritable, BytesWritable>> CONSUMER_RECORD_P_TYPE = Writables
.derived((Class<ConsumerRecord<BytesWritable, BytesWritable>>) (Object) ConsumerRecord.class,
new ConsumerRecordHelper.BytesToConsumerRecord(), new ConsumerRecordHelper.ConsumerRecordToBytes(),
Writables.writables(BytesWritable.class));
/**
* Serializes the record into {@code byte[]}s
*
* @param record the record to serialize
* @return the record in {@code byte[]}s
* @throws IllegalArgumentException if record is {@code null}
* @throws IOException if there is an issue during serialization
*/
public static byte[] serialize(ConsumerRecord<BytesWritable, BytesWritable> record) throws IOException {
if (record == null)
throw new IllegalArgumentException("record cannot be null");
ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
try (DataOutputStream dataOut = new DataOutputStream(byteOut)) {
dataOut.writeUTF(record.topic());
dataOut.writeInt(record.partition());
dataOut.writeLong(record.offset());
dataOut.writeLong(record.timestamp());
dataOut.writeUTF(record.timestampType().name);
dataOut.writeLong(record.checksum());
dataOut.writeInt(record.serializedKeySize());
dataOut.writeInt(record.serializedValueSize());
if (record.key() == null) {
dataOut.writeInt(-1);
} else {
byte[] keyBytes = record.key().getBytes();
dataOut.writeInt(keyBytes.length);
dataOut.write(keyBytes);
}
if (record.value() == null) {
dataOut.writeInt(-1);
} else {
byte[] valueBytes = record.value().getBytes();
dataOut.writeInt(valueBytes.length);
dataOut.write(valueBytes);
}
return byteOut.toByteArray();
}
}
/**
* De-serializes the bytes into a {@link ConsumerRecord}
*
* @param bytes the bytes of a {@link ConsumerRecord}
* @return a {@link ConsumerRecord} from the bytes
* @throws IllegalArgumentException if bytes is {@code null}
* @throws IOException if there is an issue de-serializing the bytes
*/
public static ConsumerRecord<BytesWritable, BytesWritable> deserialize(byte[] bytes) throws IOException {
if (bytes == null)
throw new IllegalArgumentException("bytes cannot be null");
try (DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(bytes))) {
String topic = dataIn.readUTF();
int partition = dataIn.readInt();
long offset = dataIn.readLong();
long timestamp = dataIn.readLong();
String timestampTypeName = dataIn.readUTF();
long checksum = dataIn.readLong();
int serializedKeySize = dataIn.readInt();
int serializedValueSize = dataIn.readInt();
BytesWritable key = null;
int keySize = dataIn.readInt();
if (keySize != -1) {
byte[] keyBytes = new byte[keySize];
dataIn.readFully(keyBytes);
key = new BytesWritable(keyBytes);
}
BytesWritable value = null;
int valueSize = dataIn.readInt();
if (valueSize != -1) {
byte[] valueBytes = new byte[valueSize];
dataIn.readFully(valueBytes);
value = new BytesWritable(valueBytes);
}
return new ConsumerRecord<>(topic, partition, offset, timestamp, TimestampType.forName(timestampTypeName), checksum,
serializedKeySize, serializedValueSize, key, value);
}
}
/**
* {@link MapFn} to convert {@link ConsumerRecord} to {@link BytesWritable}
*/
public static class ConsumerRecordToBytes extends MapFn<ConsumerRecord<BytesWritable, BytesWritable>, BytesWritable> {
private static final long serialVersionUID = -6821080008375335537L;
@Override
public BytesWritable map(ConsumerRecord<BytesWritable, BytesWritable> record) {
try {
return new BytesWritable(ConsumerRecordHelper.serialize(record));
} catch (IOException e) {
throw new CrunchRuntimeException("Error serializing consumer record " + record, e);
}
}
}
/**
* {@link MapFn} to convert {@link BytesWritable} to {@link ConsumerRecord}
*/
public static class BytesToConsumerRecord extends MapFn<BytesWritable, ConsumerRecord<BytesWritable, BytesWritable>> {
private static final long serialVersionUID = -6545017910063252322L;
@Override
public ConsumerRecord<BytesWritable, BytesWritable> map(BytesWritable bytesWritable) {
try {
return ConsumerRecordHelper.deserialize(bytesWritable.getBytes());
} catch (IOException e) {
throw new CrunchRuntimeException("Error deserializing consumer record", e);
}
}
}
}