blob: ccab4ff8cd69b371f5d5f21e25b3ab2e4f140573 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.serializers
import org.apache.samza.SamzaException
import org.apache.samza.config.StorageConfig
import org.apache.samza.system.ControlMessage
import org.apache.samza.system.SystemStream
import org.apache.samza.system.OutgoingMessageEnvelope
import org.apache.samza.system.IncomingMessageEnvelope
class SerdeManager(
serdes: Map[String, Serde[Object]] = Map(),
systemKeySerdes: Map[String, Serde[Object]] = Map(),
systemMessageSerdes: Map[String, Serde[Object]] = Map(),
systemStreamKeySerdes: Map[SystemStream, Serde[Object]] = Map(),
systemStreamMessageSerdes: Map[SystemStream, Serde[Object]] = Map(),
changeLogSystemStreams: Set[SystemStream] = Set(),
controlMessageKeySerdes: Map[SystemStream, Serde[String]] = Map(),
intermediateMessageSerdes: Map[SystemStream, Serde[Object]] = Map()) {
def toBytes(obj: Object, serializerName: String) = serdes
.getOrElse(serializerName, throw new SamzaException("No serde defined for %s" format serializerName))
.toBytes(obj)
def toBytes(envelope: OutgoingMessageEnvelope): OutgoingMessageEnvelope = {
val key = if (changeLogSystemStreams.contains(envelope.getSystemStream)
|| envelope.getSystemStream.getStream.endsWith(StorageConfig.ACCESSLOG_STREAM_SUFFIX)) {
// If the stream is a change log stream, don't do any serde. It is up to storage engines to handle serde.
envelope.getKey
} else if (envelope.getMessage.isInstanceOf[ControlMessage]
&& controlMessageKeySerdes.contains(envelope.getSystemStream)) {
// If the message is a control message and the key needs to serialize
controlMessageKeySerdes(envelope.getSystemStream).toBytes(envelope.getKey.asInstanceOf[String])
} else if (envelope.getKeySerializerName != null) {
// If a serde is defined, use it.
toBytes(envelope.getKey, envelope.getKeySerializerName)
} else if (systemStreamKeySerdes.contains(envelope.getSystemStream)) {
// If the stream has a serde defined, use it.
systemStreamKeySerdes(envelope.getSystemStream).toBytes(envelope.getKey)
} else if (systemKeySerdes.contains(envelope.getSystemStream.getSystem)) {
// If the system has a serde defined, use it.
systemKeySerdes(envelope.getSystemStream.getSystem).toBytes(envelope.getKey)
} else {
// Just use the object.
envelope.getKey
}
val message = if (changeLogSystemStreams.contains(envelope.getSystemStream)
|| envelope.getSystemStream.getStream.endsWith(StorageConfig.ACCESSLOG_STREAM_SUFFIX)) {
// If the stream is a change log stream, don't do any serde. It is up to storage engines to handle serde.
envelope.getMessage
} else if (intermediateMessageSerdes.contains(envelope.getSystemStream)) {
// If the stream is an intermediate stream, use the intermediate message serde
intermediateMessageSerdes(envelope.getSystemStream).toBytes(envelope.getMessage)
} else if (envelope.getMessageSerializerName != null) {
// If a serde is defined, use it.
toBytes(envelope.getMessage, envelope.getMessageSerializerName)
} else if (systemStreamMessageSerdes.contains(envelope.getSystemStream)) {
// If the stream has a serde defined, use it.
systemStreamMessageSerdes(envelope.getSystemStream).toBytes(envelope.getMessage)
} else if (systemMessageSerdes.contains(envelope.getSystemStream.getSystem)) {
// If the system has a serde defined, use it.
systemMessageSerdes(envelope.getSystemStream.getSystem).toBytes(envelope.getMessage)
} else {
// Just use the object.
envelope.getMessage
}
if ((key eq envelope.getKey) && (message eq envelope.getMessage)) {
envelope
} else {
new OutgoingMessageEnvelope(
envelope.getSystemStream,
null,
null,
envelope.getPartitionKey,
key,
message)
}
}
def fromBytes(bytes: Array[Byte], deserializerName: String) = serdes
.getOrElse(deserializerName, throw new SamzaException("No serde defined for %s" format deserializerName))
.fromBytes(bytes)
def fromBytes(envelope: IncomingMessageEnvelope) = {
val systemStream = envelope.getSystemStreamPartition.getSystemStream
val message = if (changeLogSystemStreams.contains(systemStream)
|| systemStream.getStream.endsWith(StorageConfig.ACCESSLOG_STREAM_SUFFIX)) {
// If the stream is a change log stream, don't do any serde. It is up to storage engines to handle serde.
envelope.getMessage
} else if (intermediateMessageSerdes.contains(systemStream)) {
// If the stream is an intermediate stream, use the intermediate message serde
intermediateMessageSerdes(systemStream).fromBytes(envelope.getMessage.asInstanceOf[Array[Byte]])
} else if (systemStreamMessageSerdes.contains(systemStream)) {
// If the stream has a serde defined, use it.
systemStreamMessageSerdes(systemStream).fromBytes(envelope.getMessage.asInstanceOf[Array[Byte]])
} else if (systemMessageSerdes.contains(systemStream.getSystem)) {
// If the system has a serde defined, use it.
systemMessageSerdes(systemStream.getSystem).fromBytes(envelope.getMessage.asInstanceOf[Array[Byte]])
} else {
// Just use the object.
envelope.getMessage
}
val key = if (changeLogSystemStreams.contains(systemStream)
|| systemStream.getStream.endsWith(StorageConfig.ACCESSLOG_STREAM_SUFFIX) ) {
// If the stream is a change log stream, don't do any serde. It is up to storage engines to handle serde.
envelope.getKey
} else if (message.isInstanceOf[ControlMessage]
&& controlMessageKeySerdes.contains(systemStream)) {
// If the message is a control message and the key needs to deserialize
controlMessageKeySerdes(systemStream).fromBytes(envelope.getKey.asInstanceOf[Array[Byte]])
} else if (systemStreamKeySerdes.contains(systemStream)) {
// If the stream has a serde defined, use it.
systemStreamKeySerdes(systemStream).fromBytes(envelope.getKey.asInstanceOf[Array[Byte]])
} else if (systemKeySerdes.contains(systemStream.getSystem)) {
// If the system has a serde defined, use it.
systemKeySerdes(systemStream.getSystem).fromBytes(envelope.getKey.asInstanceOf[Array[Byte]])
} else {
// Just use the object.
envelope.getKey
}
if ((key eq envelope.getKey) && (message eq envelope.getMessage)) {
envelope
} else {
new IncomingMessageEnvelope(
envelope.getSystemStreamPartition,
envelope.getOffset,
key,
message,
envelope.getSize,
envelope.getEventTime(),
envelope.getArrivalTime)
}
}
}