| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.samza.serializers.model; |
| |
| import com.fasterxml.jackson.core.JsonGenerator; |
| import com.fasterxml.jackson.core.JsonParser; |
| import com.fasterxml.jackson.core.JsonProcessingException; |
| import com.fasterxml.jackson.core.ObjectCodec; |
| import com.fasterxml.jackson.core.Version; |
| import com.fasterxml.jackson.core.type.TypeReference; |
| import com.fasterxml.jackson.databind.DeserializationContext; |
| import com.fasterxml.jackson.databind.DeserializationFeature; |
| import com.fasterxml.jackson.databind.JsonDeserializer; |
| import com.fasterxml.jackson.databind.JsonNode; |
| import com.fasterxml.jackson.databind.JsonSerializer; |
| import com.fasterxml.jackson.databind.KeyDeserializer; |
| import com.fasterxml.jackson.databind.SerializationFeature; |
| import com.fasterxml.jackson.databind.SerializerProvider; |
| import com.fasterxml.jackson.databind.module.SimpleModule; |
| import com.fasterxml.jackson.databind.node.ArrayNode; |
| import com.fasterxml.jackson.databind.node.ObjectNode; |
| import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.ImmutableSet; |
| import java.io.IOException; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.Set; |
| import org.apache.samza.Partition; |
| import org.apache.samza.SamzaException; |
| import org.apache.samza.checkpoint.CheckpointId; |
| import org.apache.samza.checkpoint.CheckpointV2; |
| import org.apache.samza.checkpoint.kafka.KafkaStateCheckpointMarker; |
| import org.apache.samza.config.Config; |
| import org.apache.samza.config.MapConfig; |
| import org.apache.samza.container.TaskName; |
| import org.apache.samza.job.JobCoordinatorMetadata; |
| import org.apache.samza.job.model.ContainerModel; |
| import org.apache.samza.job.model.JobModel; |
| import org.apache.samza.job.model.LocalityModel; |
| import org.apache.samza.job.model.ProcessorLocality; |
| import org.apache.samza.job.model.TaskMode; |
| import org.apache.samza.job.model.TaskModel; |
| import org.apache.samza.system.SystemStream; |
| import org.apache.samza.system.SystemStreamPartition; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import static org.junit.Assert.*; |
| |
| |
| public class TestSamzaObjectMapper { |
| private JobModel jobModel; |
| private ObjectMapper samzaObjectMapper; |
| |
| @Before |
| public void setup() { |
| Config config = new MapConfig(ImmutableMap.of("a", "b")); |
| TaskName taskName = new TaskName("test"); |
| Set<SystemStreamPartition> ssps = ImmutableSet.of(new SystemStreamPartition("foo", "bar", new Partition(1))); |
| TaskModel taskModel = new TaskModel(taskName, ssps, new Partition(2)); |
| Map<TaskName, TaskModel> tasks = ImmutableMap.of(taskName, taskModel); |
| ContainerModel containerModel = new ContainerModel("1", tasks); |
| Map<String, ContainerModel> containerMap = ImmutableMap.of("1", containerModel); |
| this.jobModel = new JobModel(config, containerMap); |
| this.samzaObjectMapper = SamzaObjectMapper.getObjectMapper(); |
| } |
| |
| @Test |
| public void testSerializeJobModel() throws IOException { |
| String serializedString = this.samzaObjectMapper.writeValueAsString(this.jobModel); |
| // use a plain ObjectMapper to read JSON to make comparison easier |
| ObjectNode serializedAsJson = (ObjectNode) new ObjectMapper().readTree(serializedString); |
| ObjectNode expectedJson = buildJobModelJson(); |
| |
| /* |
| * Jackson serializes all get* methods even if they aren't regular getters. We only care about certain fields now |
| * since those are the only ones that get deserialized. |
| */ |
| assertEquals(expectedJson.get("config"), serializedAsJson.get("config")); |
| assertEquals(expectedJson.get("containers"), serializedAsJson.get("containers")); |
| } |
| |
| @Test |
| public void testSerializeTaskModel() throws IOException { |
| TaskModel taskModel = new TaskModel(new TaskName("Standby Partition 0"), new HashSet<>(), new Partition(0), |
| TaskMode.Standby); |
| String serializedString = this.samzaObjectMapper.writeValueAsString(taskModel); |
| TaskModel deserializedTaskModel = this.samzaObjectMapper.readValue(serializedString, TaskModel.class); |
| assertEquals(taskModel, deserializedTaskModel); |
| |
| String sampleSerializedString = "{\"task-name\":\"Partition 0\",\"system-stream-partitions\":[],\"changelog-partition\":0}"; |
| deserializedTaskModel = this.samzaObjectMapper.readValue(sampleSerializedString, TaskModel.class); |
| taskModel = new TaskModel(new TaskName("Partition 0"), new HashSet<>(), new Partition(0), TaskMode.Active); |
| assertEquals(taskModel, deserializedTaskModel); |
| } |
| |
| @Test |
| public void testDeserializeJobModel() throws IOException { |
| ObjectNode asJson = buildJobModelJson(); |
| assertEquals(this.jobModel, deserializeFromObjectNode(asJson)); |
| } |
| |
| /** |
| * Deserialization should not fail if there are fields which are ignored. |
| */ |
| @Test |
| public void testDeserializeWithIgnoredFields() throws IOException { |
| ObjectNode jobModelJson = buildJobModelJson(); |
| // JobModel ignores all unknown fields |
| jobModelJson.put("unknown_job_model_key", "unknown_job_model_value"); |
| ObjectNode taskPartitionMappings = new ObjectMapper().createObjectNode(); |
| taskPartitionMappings.put("1", (Integer) null); |
| // old key that used to be serialized |
| jobModelJson.put("task-partition-mappings", taskPartitionMappings); |
| ObjectNode allContainerLocality = new ObjectMapper().createObjectNode(); |
| allContainerLocality.put("1", (Integer) null); |
| // currently gets serialized since there is a getAllContainerLocality |
| jobModelJson.put("all-container-locality", allContainerLocality); |
| assertEquals(this.jobModel, deserializeFromObjectNode(jobModelJson)); |
| } |
| |
| /** |
| * Given a {@link ContainerModel} JSON with a processor-id and a container-id, deserialization should properly ignore |
| * the container-id. |
| */ |
| @Test |
| public void testDeserializeContainerIdAndProcessorId() throws IOException { |
| ObjectNode jobModelJson = buildJobModelJson(); |
| ObjectNode containerModelJson = (ObjectNode) jobModelJson.get("containers").get("1"); |
| containerModelJson.put("container-id", 123); |
| assertEquals(this.jobModel, deserializeFromObjectNode(jobModelJson)); |
| } |
| |
| /** |
| * Given a {@link ContainerModel} JSON with an unknown field, deserialization should properly ignore it. |
| */ |
| @Test |
| public void testDeserializeUnknownContainerModelField() throws IOException { |
| ObjectNode jobModelJson = buildJobModelJson(); |
| ObjectNode containerModelJson = (ObjectNode) jobModelJson.get("containers").get("1"); |
| containerModelJson.put("unknown_container_model_key", "unknown_container_model_value"); |
| assertEquals(this.jobModel, deserializeFromObjectNode(jobModelJson)); |
| } |
| |
| /** |
| * Given a {@link ContainerModel} JSON without a processor-id but with a container-id, deserialization should use the |
| * container-id to calculate the processor-id. |
| */ |
| @Test |
| public void testDeserializeContainerModelOnlyContainerId() throws IOException { |
| ObjectNode jobModelJson = buildJobModelJson(); |
| ObjectNode containerModelJson = (ObjectNode) jobModelJson.get("containers").get("1"); |
| containerModelJson.remove("processor-id"); |
| containerModelJson.put("container-id", 1); |
| assertEquals(this.jobModel, deserializeFromObjectNode(jobModelJson)); |
| } |
| |
| /** |
| * Given a {@link ContainerModel} JSON with an unknown field, deserialization should properly ignore it. |
| */ |
| @Test |
| public void testDeserializeUnknownTaskModelField() throws IOException { |
| ObjectNode jobModelJson = buildJobModelJson(); |
| ObjectNode taskModelJson = (ObjectNode) jobModelJson.get("containers").get("1").get("tasks").get("test"); |
| taskModelJson.put("unknown_task_model_key", "unknown_task_model_value"); |
| assertEquals(this.jobModel, deserializeFromObjectNode(jobModelJson)); |
| } |
| |
| /** |
| * Given a {@link ContainerModel} JSON with neither a processor-id nor a container-id, deserialization should fail. |
| */ |
| @Test(expected = SamzaException.class) |
| public void testDeserializeContainerModelMissingProcessorIdAndContainerId() throws IOException { |
| ObjectNode jobModelJson = buildJobModelJson(); |
| ObjectNode containerModelJson = (ObjectNode) jobModelJson.get("containers").get("1"); |
| containerModelJson.remove("processor-id"); |
| deserializeFromObjectNode(jobModelJson); |
| } |
| |
| /** |
| * Given a {@link ContainerModel} JSON with only an "id" field, deserialization should fail. |
| * This verifies that even though {@link ContainerModel} has a getId method, the "id" field is not used, since |
| * "processor-id" is the field that is supposed to be used. |
| */ |
| @Test(expected = SamzaException.class) |
| public void testDeserializeContainerModelIdFieldOnly() throws IOException { |
| ObjectNode jobModelJson = buildJobModelJson(); |
| ObjectNode containerModelJson = (ObjectNode) jobModelJson.get("containers").get("1"); |
| containerModelJson.remove("processor-id"); |
| containerModelJson.put("id", 1); |
| deserializeFromObjectNode(jobModelJson); |
| } |
| |
| @Test |
| public void testSerializeSystemStreamPartition() throws IOException { |
| // case 1: keyBucket not explicitly mentioned |
| SystemStreamPartition ssp = new SystemStreamPartition("foo", "bar", new Partition(1)); |
| String serializedString = this.samzaObjectMapper.writeValueAsString(ssp); |
| |
| ObjectMapper objectMapper = new ObjectMapper(); |
| |
| ObjectNode sspJson = objectMapper.createObjectNode(); |
| sspJson.put("system", "foo"); |
| sspJson.put("stream", "bar"); |
| sspJson.put("partition", 1); |
| |
| // use a plain ObjectMapper to read JSON to make comparison easier |
| ObjectNode serializedAsJson = (ObjectNode) new ObjectMapper().readTree(serializedString); |
| ObjectNode expectedJson = sspJson; |
| |
| assertEquals(expectedJson.get("system"), serializedAsJson.get("system")); |
| assertEquals(expectedJson.get("stream"), serializedAsJson.get("stream")); |
| assertEquals(expectedJson.get("partition"), serializedAsJson.get("partition")); |
| |
| //Case 2: with non-null keyBucket |
| ssp = new SystemStreamPartition("foo", "bar", new Partition(1), 1); |
| serializedString = this.samzaObjectMapper.writeValueAsString(ssp); |
| |
| sspJson = objectMapper.createObjectNode(); |
| sspJson.put("system", "foo"); |
| sspJson.put("stream", "bar"); |
| sspJson.put("partition", 1); |
| sspJson.put("keyBucket", 1); |
| |
| // use a plain ObjectMapper to read JSON to make comparison easier |
| serializedAsJson = (ObjectNode) new ObjectMapper().readTree(serializedString); |
| expectedJson = sspJson; |
| |
| assertEquals(expectedJson.get("system"), serializedAsJson.get("system")); |
| assertEquals(expectedJson.get("stream"), serializedAsJson.get("stream")); |
| assertEquals(expectedJson.get("partition"), serializedAsJson.get("partition")); |
| assertEquals(expectedJson.get("keyBucket"), serializedAsJson.get("keyBucket")); |
| } |
| |
| @Test |
| public void testDeserializeSystemStreamPartition() throws IOException { |
| ObjectMapper objectMapper = new ObjectMapper(); |
| |
| // case 1: keyBucket not explicitly mentioned |
| ObjectNode sspJson = objectMapper.createObjectNode(); |
| sspJson.put("system", "foo"); |
| sspJson.put("stream", "bar"); |
| sspJson.put("partition", 1); |
| |
| SystemStreamPartition ssp = new SystemStreamPartition("foo", "bar", new Partition(1)); |
| String jsonString = new ObjectMapper().writeValueAsString(sspJson); |
| SystemStreamPartition deserSSP = this.samzaObjectMapper.readValue(jsonString, SystemStreamPartition.class); |
| |
| assertEquals(ssp, deserSSP); |
| |
| // case 2: explicitly set key bucket |
| sspJson = objectMapper.createObjectNode(); |
| sspJson.put("system", "foo"); |
| sspJson.put("stream", "bar"); |
| sspJson.put("partition", 1); |
| sspJson.put("keyBucket", 1); |
| |
| ssp = new SystemStreamPartition("foo", "bar", new Partition(1), 1); |
| jsonString = new ObjectMapper().writeValueAsString(sspJson); |
| deserSSP = this.samzaObjectMapper.readValue(jsonString, SystemStreamPartition.class); |
| |
| assertEquals(ssp, deserSSP); |
| } |
| |
| @Test |
| public void testSerdeSystemStreamPartitionKey() throws IOException { |
| |
| // get object mapper which has old deserialization logic for SSP as Key |
| ObjectMapper oldDeserForSSPObjectMapper = getOldDeserForSSpKeyObjectMapper(); |
| |
| SystemStreamPartition ssp = new SystemStreamPartition("foo", "bar", new Partition(1)); |
| String offset = "100"; |
| |
| String sspmapString = this.samzaObjectMapper.writeValueAsString(ImmutableMap.of(ssp, offset)); |
| |
| TypeReference<HashMap<SystemStreamPartition, String>> typeRef |
| = new TypeReference<HashMap<SystemStreamPartition, String>>() { }; |
| |
| // case 1: deserialize with the new logic for SSP key deser |
| Map<SystemStreamPartition, String> deserSSPMap = this.samzaObjectMapper.readValue(sspmapString, typeRef); |
| SystemStreamPartition deserSSP = deserSSPMap.keySet().stream().findAny().get(); |
| String deserOffset = deserSSPMap.values().stream().findFirst().get(); |
| assertEquals(ssp.getSystem(), deserSSP.getSystem()); |
| assertEquals(ssp.getStream(), deserSSP.getStream()); |
| assertEquals(ssp.getPartition(), deserSSP.getPartition()); |
| assertEquals(ssp.getKeyBucket(), deserSSP.getKeyBucket()); |
| assertEquals(offset, deserOffset); |
| |
| // case 2: deserialize with the OLD logic for SSP key deser |
| deserSSPMap = oldDeserForSSPObjectMapper.readValue(sspmapString, typeRef); |
| deserSSP = deserSSPMap.keySet().stream().findAny().get(); |
| deserOffset = deserSSPMap.values().stream().findFirst().get(); |
| assertEquals(ssp.getSystem(), deserSSP.getSystem()); |
| assertEquals(ssp.getStream(), deserSSP.getStream()); |
| assertEquals(ssp.getPartition(), deserSSP.getPartition()); |
| assertEquals(ssp.getKeyBucket(), deserSSP.getKeyBucket()); |
| assertEquals(offset, deserOffset); |
| } |
| |
| @Test |
| public void testSerDePreElasticSystemStreamPartition() throws IOException { |
| ObjectMapper preElasticObjectMapper = getPreEleasticObjectMapper(); |
| ObjectMapper elasticObjectMapper = SamzaObjectMapper.getObjectMapper(); |
| |
| |
| //Scenario 1: ssp serialized with preElasticMapper and deserialized by new Mapper with elasticity |
| SystemStreamPartition ssp = new SystemStreamPartition("foo", "bar", new Partition(1)); |
| String serializedString = preElasticObjectMapper.writeValueAsString(ssp); |
| |
| ObjectMapper objectMapper = new ObjectMapper(); |
| ObjectNode serializedSSPAsJson = objectMapper.createObjectNode(); |
| serializedSSPAsJson.put("system", "foo"); |
| serializedSSPAsJson.put("stream", "bar"); |
| serializedSSPAsJson.put("partition", 1); |
| |
| JsonNode deserSSPAsJson = elasticObjectMapper.readTree(serializedString); |
| |
| assertEquals(serializedSSPAsJson.get("system"), deserSSPAsJson.get("system")); |
| assertEquals(serializedSSPAsJson.get("stream"), deserSSPAsJson.get("stream")); |
| assertEquals(serializedSSPAsJson.get("partition"), deserSSPAsJson.get("partition")); |
| assertEquals(serializedSSPAsJson.get("keyBucket"), deserSSPAsJson.get("-1")); |
| |
| //Scenario 1: ssp serialized with new elasticMapper and deserialized by old preElastic Mapper |
| SystemStreamPartition sspWithKeyBucket = new SystemStreamPartition("foo", "bar", new Partition(1), 1); |
| serializedString = elasticObjectMapper.writeValueAsString(sspWithKeyBucket); |
| |
| |
| serializedSSPAsJson = objectMapper.createObjectNode(); |
| serializedSSPAsJson.put("system", "foo"); |
| serializedSSPAsJson.put("stream", "bar"); |
| serializedSSPAsJson.put("partition", 1); |
| serializedSSPAsJson.put("keyBucket", 1); |
| |
| deserSSPAsJson = preElasticObjectMapper.readTree(serializedString); |
| |
| assertEquals(serializedSSPAsJson.get("system"), deserSSPAsJson.get("system")); |
| assertEquals(serializedSSPAsJson.get("stream"), deserSSPAsJson.get("stream")); |
| assertEquals(serializedSSPAsJson.get("partition"), deserSSPAsJson.get("partition")); |
| } |
| |
| private JobModel deserializeFromObjectNode(ObjectNode jobModelJson) throws IOException { |
| // use plain ObjectMapper to get JSON string |
| String jsonString = new ObjectMapper().writeValueAsString(jobModelJson); |
| return this.samzaObjectMapper.readValue(jsonString, JobModel.class); |
| } |
| |
| /** |
| * Builds {@link ObjectNode} which matches the {@link JobModel} built in setup. |
| */ |
| private static ObjectNode buildJobModelJson() { |
| ObjectMapper objectMapper = new ObjectMapper(); |
| |
| ObjectNode configJson = objectMapper.createObjectNode(); |
| configJson.put("a", "b"); |
| |
| ObjectNode containerModel1TaskTestSSPJson = objectMapper.createObjectNode(); |
| containerModel1TaskTestSSPJson.put("system", "foo"); |
| containerModel1TaskTestSSPJson.put("stream", "bar"); |
| containerModel1TaskTestSSPJson.put("partition", 1); |
| containerModel1TaskTestSSPJson.put("keyBucket", -1); |
| |
| ArrayNode containerModel1TaskTestSSPsJson = objectMapper.createArrayNode(); |
| containerModel1TaskTestSSPsJson.add(containerModel1TaskTestSSPJson); |
| |
| ObjectNode containerModel1TaskTestJson = objectMapper.createObjectNode(); |
| containerModel1TaskTestJson.put("task-name", "test"); |
| containerModel1TaskTestJson.put("system-stream-partitions", containerModel1TaskTestSSPsJson); |
| containerModel1TaskTestJson.put("changelog-partition", 2); |
| containerModel1TaskTestJson.put("task-mode", "Active"); |
| |
| ObjectNode containerModel1TasksJson = objectMapper.createObjectNode(); |
| containerModel1TasksJson.put("test", containerModel1TaskTestJson); |
| |
| ObjectNode containerModel1Json = objectMapper.createObjectNode(); |
| // important: needs to be "processor-id" for compatibility between Samza 0.14 and 1.0 |
| containerModel1Json.put("processor-id", "1"); |
| containerModel1Json.put("tasks", containerModel1TasksJson); |
| |
| ObjectNode containersJson = objectMapper.createObjectNode(); |
| containersJson.put("1", containerModel1Json); |
| |
| ObjectNode jobModelJson = objectMapper.createObjectNode(); |
| jobModelJson.put("config", configJson); |
| jobModelJson.put("containers", containersJson); |
| |
| return jobModelJson; |
| } |
| |
| private static class OldSystemStreamPartitionKeyDeserializer extends KeyDeserializer { |
| @Override |
| public Object deserializeKey(String sspString, DeserializationContext ctxt) throws IOException { |
| int idx = sspString.indexOf('.'); |
| int lastIdx = sspString.lastIndexOf('.'); |
| if (idx < 0 || lastIdx < 0) { |
| throw new IllegalArgumentException("System stream partition expected in format 'system.stream.partition"); |
| } |
| return new SystemStreamPartition( |
| new SystemStream(sspString.substring(0, idx), sspString.substring(idx + 1, lastIdx)), |
| new Partition(Integer.parseInt(sspString.substring(lastIdx + 1)))); |
| } |
| } |
| public static ObjectMapper getOldDeserForSSpKeyObjectMapper() { |
| ObjectMapper mapper = new ObjectMapper(); |
| mapper.configure(DeserializationFeature.WRAP_EXCEPTIONS, false); |
| mapper.configure(SerializationFeature.WRAP_EXCEPTIONS, false); |
| SimpleModule module = new SimpleModule("SamzaModule", new Version(1, 0, 0, "")); |
| |
| module.addKeyDeserializer(SystemStreamPartition.class, new OldSystemStreamPartitionKeyDeserializer()); |
| mapper.registerModules(module, new Jdk8Module()); |
| return mapper; |
| } |
| |
| private static class PreElasticitySystemStreamPartitionSerializer extends JsonSerializer<SystemStreamPartition> { |
| @Override |
| public void serialize(SystemStreamPartition systemStreamPartition, JsonGenerator jsonGenerator, SerializerProvider provider) throws IOException, |
| JsonProcessingException { |
| Map<String, Object> systemStreamPartitionMap = new HashMap<String, Object>(); |
| systemStreamPartitionMap.put("system", systemStreamPartition.getSystem()); |
| systemStreamPartitionMap.put("stream", systemStreamPartition.getStream()); |
| systemStreamPartitionMap.put("partition", systemStreamPartition.getPartition()); |
| jsonGenerator.writeObject(systemStreamPartitionMap); |
| } |
| } |
| |
| private static class PreElasticitySystemStreamPartitionDeserializer extends JsonDeserializer<SystemStreamPartition> { |
| @Override |
| public SystemStreamPartition deserialize(JsonParser jsonParser, DeserializationContext context) throws IOException, JsonProcessingException { |
| ObjectCodec oc = jsonParser.getCodec(); |
| JsonNode node = oc.readTree(jsonParser); |
| String system = node.get("system").textValue(); |
| String stream = node.get("stream").textValue(); |
| Partition partition = new Partition(node.get("partition").intValue()); |
| return new SystemStreamPartition(system, stream, partition); |
| } |
| } |
| |
| private static final ObjectMapper OBJECT_MAPPER = getPreEleasticObjectMapper(); |
| public static ObjectMapper getPreEleasticObjectMapper() { |
| ObjectMapper mapper = new ObjectMapper(); |
| mapper.configure(DeserializationFeature.WRAP_EXCEPTIONS, false); |
| mapper.configure(SerializationFeature.WRAP_EXCEPTIONS, false); |
| SimpleModule module = new SimpleModule("SamzaModule", new Version(1, 0, 0, "")); |
| |
| // Setup custom serdes for simple data types. |
| module.addSerializer(Partition.class, new SamzaObjectMapper.PartitionSerializer()); |
| module.addSerializer(SystemStreamPartition.class, new PreElasticitySystemStreamPartitionSerializer()); |
| module.addKeySerializer(SystemStreamPartition.class, new SamzaObjectMapper.SystemStreamPartitionKeySerializer()); |
| module.addSerializer(TaskName.class, new SamzaObjectMapper.TaskNameSerializer()); |
| module.addSerializer(TaskMode.class, new SamzaObjectMapper.TaskModeSerializer()); |
| module.addDeserializer(TaskName.class, new SamzaObjectMapper.TaskNameDeserializer()); |
| module.addDeserializer(Partition.class, new SamzaObjectMapper.PartitionDeserializer()); |
| module.addDeserializer(SystemStreamPartition.class, new PreElasticitySystemStreamPartitionDeserializer()); |
| module.addKeyDeserializer(SystemStreamPartition.class, new SamzaObjectMapper.SystemStreamPartitionKeyDeserializer()); |
| module.addDeserializer(Config.class, new SamzaObjectMapper.ConfigDeserializer()); |
| module.addDeserializer(TaskMode.class, new SamzaObjectMapper.TaskModeDeserializer()); |
| module.addSerializer(CheckpointId.class, new SamzaObjectMapper.CheckpointIdSerializer()); |
| module.addDeserializer(CheckpointId.class, new SamzaObjectMapper.CheckpointIdDeserializer()); |
| |
| // Setup mixins for data models. |
| mapper.addMixIn(TaskModel.class, JsonTaskModelMixIn.class); |
| mapper.addMixIn(ContainerModel.class, JsonContainerModelMixIn.class); |
| mapper.addMixIn(JobModel.class, JsonJobModelMixIn.class); |
| mapper.addMixIn(CheckpointV2.class, JsonCheckpointV2Mixin.class); |
| mapper.addMixIn(KafkaStateCheckpointMarker.class, KafkaStateCheckpointMarkerMixin.class); |
| |
| module.addDeserializer(ContainerModel.class, new JsonDeserializer<ContainerModel>() { |
| @Override |
| public ContainerModel deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JsonProcessingException { |
| ObjectCodec oc = jp.getCodec(); |
| JsonNode node = oc.readTree(jp); |
| /* |
| * Before Samza 0.13, "container-id" was used. |
| * In Samza 0.13, "processor-id" was added to be the id to use and "container-id" was deprecated. However, |
| * "container-id" still needed to be checked for backwards compatibility in case "processor-id" was missing |
| * (i.e. from a job model corresponding to a version of the job that was on a pre Samza 0.13 version). |
| * In Samza 1.0, "container-id" was further cleaned up from ContainerModel. This logic is still being left here |
| * as a fallback for backwards compatibility with pre Samza 0.13. ContainerModel.getProcessorId was changed to |
| * ContainerModel.getId in the Java API, but "processor-id" still needs to be used as the JSON key for backwards |
| * compatibility with Samza 0.13 and Samza 0.14. |
| */ |
| String id; |
| if (node.get(JsonContainerModelMixIn.PROCESSOR_ID_KEY) == null) { |
| if (node.get(JsonContainerModelMixIn.CONTAINER_ID_KEY) == null) { |
| throw new SamzaException( |
| String.format("JobModel was missing %s and %s. This should never happen. JobModel corrupt!", |
| JsonContainerModelMixIn.PROCESSOR_ID_KEY, JsonContainerModelMixIn.CONTAINER_ID_KEY)); |
| } |
| id = String.valueOf(node.get(JsonContainerModelMixIn.CONTAINER_ID_KEY).intValue()); |
| } else { |
| id = node.get(JsonContainerModelMixIn.PROCESSOR_ID_KEY).textValue(); |
| } |
| Map<TaskName, TaskModel> tasksMapping = |
| OBJECT_MAPPER.readValue(OBJECT_MAPPER.treeAsTokens(node.get(JsonContainerModelMixIn.TASKS_KEY)), |
| new TypeReference<Map<TaskName, TaskModel>>() { }); |
| return new ContainerModel(id, tasksMapping); |
| } |
| }); |
| |
| mapper.addMixIn(LocalityModel.class, JsonLocalityModelMixIn.class); |
| mapper.addMixIn(ProcessorLocality.class, JsonProcessorLocalityMixIn.class); |
| |
| // Register mixins for job coordinator metadata model |
| mapper.addMixIn(JobCoordinatorMetadata.class, JsonJobCoordinatorMetadataMixIn.class); |
| |
| // Convert camel case to hyphenated field names, and register the module. |
| mapper.setPropertyNamingStrategy(new SamzaObjectMapper.CamelCaseToDashesStrategy()); |
| mapper.registerModule(module); |
| |
| return mapper; |
| } |
| |
| } |