blob: 02246675e25bd84f57018593c8f6460ede1bbc68 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.serializers
import com.fasterxml.jackson.core.`type`.TypeReference
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.samza.util.Logging
import java.util
import org.apache.samza.checkpoint.Checkpoint
import org.apache.samza.container.TaskName
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.{SamzaException, Partition}
import scala.collection.JavaConverters._
/**
* Write out the Checkpoint object in JSON. The underlying map of SSP => Offset cannot be stored directly because
* JSON only allows strings as map types, so we would need to separately serialize the SSP to a string that doesn't
* then interfere with JSON's decoding of the overall map. We'll sidestep the whole issue by turning the
* map into a list[String] of (System, Stream, Partition, Offset) serializing that.
*/
class CheckpointSerde extends Serde[Checkpoint] with Logging {
import CheckpointSerde._
// TODO: Elucidate the CheckpointSerde relationshiop to Serde. Should Serde also have keyTo/FromBytes? Should
// we just take CheckpointSerde here as interface and have this be JSONCheckpointSerde?
// TODO: Add more tests. This class currently only has direct test and is mainly tested by the other checkpoint managers
val jsonMapper = new ObjectMapper()
// Jackson absolutely hates Scala types and hidden conversions hate you, so we're going to be very, very
// explicit about the Java (not Scala) types used here and never let Scala get its grubby little hands
// on any instance.
// Store checkpoint as maps keyed of the SSP.toString to the another map of the constituent SSP components
// and offset. Jackson can't automatically serialize the SSP since it's not a POJO and this avoids
// having to wrap it another class while maintaing readability.
def fromBytes(bytes: Array[Byte]): Checkpoint = {
try {
val jMap = jsonMapper.readValue(bytes, classOf[util.HashMap[String, util.HashMap[String, String]]])
def deserializeJSONMap(m:util.HashMap[String, String]) = {
require(m.size() == 4, "All JSON-encoded SystemStreamPartitions must have four keys")
val system = m.get("system")
require(system != null, "System must be present in JSON-encoded SystemStreamPartition")
val stream = m.get("stream")
require(stream != null, "Stream must be present in JSON-encoded SystemStreamPartition")
val partition = m.get("partition")
require(partition != null, "Partition must be present in JSON-encoded SystemStreamPartition")
val offset = m.get("offset")
// allow null offsets, e.g. for changelog ssps
new SystemStreamPartition(system, stream, new Partition(partition.toInt)) -> offset
}
val cpMap = jMap.values.asScala.map(deserializeJSONMap).toMap
new Checkpoint(cpMap.asJava)
}catch {
case e : Exception =>
warn("Exception while deserializing checkpoint: " + e)
debug("Exception detail:", e)
null
}
}
def toBytes(checkpoint: Checkpoint): Array[Byte] = {
val offsets = checkpoint.getOffsets
val asMap = new util.HashMap[String, util.HashMap[String, String]](offsets.size())
offsets.asScala.foreach {
case (ssp, offset) =>
val jMap = new util.HashMap[String, String](4)
jMap.put("system", ssp.getSystemStream.getSystem)
jMap.put("stream", ssp.getSystemStream.getStream)
jMap.put("partition", ssp.getPartition.getPartitionId.toString)
jMap.put("offset", offset)
asMap.put(ssp.toString, jMap)
}
jsonMapper.writeValueAsBytes(asMap)
}
def changelogPartitionMappingFromBytes(bytes: Array[Byte]): util.Map[TaskName, java.lang.Integer] = {
try {
jsonMapper.readValue(bytes, PARTITION_MAPPING_TYPEREFERENCE)
} catch {
case e : Exception =>
throw new SamzaException("Exception while deserializing changelog partition mapping", e)
}
}
def changelogPartitionMappingToBytes(mapping: util.Map[TaskName, java.lang.Integer]) = {
jsonMapper.writeValueAsBytes(new util.HashMap[TaskName, java.lang.Integer](mapping))
}
}
object CheckpointSerde {
val PARTITION_MAPPING_TYPEREFERENCE = new TypeReference[util.HashMap[TaskName, java.lang.Integer]]() {}
}