blob: 9b9c71e47ac2246466bc22a0cbbff3b32647d44e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.container.grouper.task;
import com.google.common.base.Preconditions;
import org.apache.samza.SamzaException;
import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.model.SamzaObjectMapper;
import org.apache.samza.system.SystemStreamPartition;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Used to persist and read the task-to-partition assignment information
* into the metadata store.
*/
public class TaskPartitionAssignmentManager {
private static final Logger LOG = LoggerFactory.getLogger(TaskPartitionAssignmentManager.class);
private final ObjectMapper taskNamesMapper = SamzaObjectMapper.getObjectMapper();
private final ObjectMapper sspMapper = SamzaObjectMapper.getObjectMapper();
private final Serde<String> valueSerde;
private final MetadataStore metadataStore;
/**
* Builds the TaskPartitionAssignmentManager based upon the provided {@link MetadataStore} that is instantiated.
* Setting up a metadata store instance is expensive which requires opening multiple connections
* and reading tons of information. Fully instantiated metadata store is taken as a constructor argument
* to reuse it across different utility classes. Uses the {@link CoordinatorStreamValueSerde} to serialize
* messages before reading/writing into metadata store.
*
* @param metadataStore an instance of {@link MetadataStore} used to read/write the task to partition assignments.
*/
public TaskPartitionAssignmentManager(MetadataStore metadataStore) {
Preconditions.checkNotNull(metadataStore, "Metdatastore cannot be null.");
this.metadataStore = metadataStore;
this.valueSerde = new CoordinatorStreamValueSerde(SetTaskPartitionMapping.TYPE);
}
/**
* Stores the task names to {@link SystemStreamPartition} assignments to the metadata store.
* @param sspToTaskNameMapping the mapped assignments to write to the metadata store. If the task name list is empty,
* then the entry is deleted from the metadata store.
*/
public void writeTaskPartitionAssignments(Map<SystemStreamPartition, List<String>> sspToTaskNameMapping) {
for (SystemStreamPartition partition: sspToTaskNameMapping.keySet()) {
List<String> taskNames = sspToTaskNameMapping.get(partition);
// For broadcast streams, a input system stream partition will be mapped to more than one tasks in a
// SamzaContainer. Rather than storing taskName to list of SystemStreamPartitions in metadata store, here
// systemStreamPartition to list of taskNames is stored. This was done due to 1 MB limit on value size in kafka.
String serializedSSPAsJson = serializeSSPToJson(partition);
if (taskNames == null || taskNames.isEmpty()) {
LOG.info("Deleting the key: {} from the metadata store.", partition);
metadataStore.delete(serializedSSPAsJson);
} else {
try {
String taskNamesAsString = taskNamesMapper.writeValueAsString(taskNames);
byte[] taskNamesAsBytes = valueSerde.toBytes(taskNamesAsString);
LOG.info("Storing the partition: {} and taskNames: {} into the metadata store.", serializedSSPAsJson, taskNames);
metadataStore.put(serializedSSPAsJson, taskNamesAsBytes);
} catch (Exception e) {
throw new SamzaException("Exception occurred when writing task to partition assignment.", e);
}
}
}
metadataStore.flush();
}
/**
* Reads the task partition assignments from the underlying storage layer.
* @return the task partition assignments.
*/
public Map<SystemStreamPartition, List<String>> readTaskPartitionAssignments() {
try {
Map<SystemStreamPartition, List<String>> sspToTaskNamesMap = new HashMap<>();
Map<String, byte[]> allMetadataEntries = metadataStore.all();
for (Map.Entry<String, byte[]> entry : allMetadataEntries.entrySet()) {
SystemStreamPartition systemStreamPartition = deserializeSSPFromJson(entry.getKey());
String taskNamesAsJson = valueSerde.fromBytes(entry.getValue());
List<String> taskNames = taskNamesMapper.readValue(taskNamesAsJson, new TypeReference<List<String>>() { });
sspToTaskNamesMap.put(systemStreamPartition, taskNames);
}
return sspToTaskNamesMap;
} catch (Exception e) {
throw new SamzaException("Exception occurred when reading task partition assignments.", e);
}
}
/**
* Deletes the system stream partitions from the underlying metadata store.
* @param systemStreamPartitions the system stream partitions to delete.
*/
public void delete(Iterable<SystemStreamPartition> systemStreamPartitions) {
for (SystemStreamPartition systemStreamPartition : systemStreamPartitions) {
LOG.info("Deleting the partition: {} from store.", systemStreamPartition);
String serializedSSPAsJson = serializeSSPToJson(systemStreamPartition);
metadataStore.delete(serializedSSPAsJson);
}
metadataStore.flush();
}
/**
* Closes the connections with the underlying metadata store.
*/
public void close() {
metadataStore.close();
}
/**
* Serializes the {@param SystemStreamPartition} to json string.
* @param systemStreamPartition represents the input system stream partition.
* @return the SystemStreamPartition serialized to json.
*/
private String serializeSSPToJson(SystemStreamPartition systemStreamPartition) {
try {
return sspMapper.writeValueAsString(systemStreamPartition);
} catch (IOException e) {
throw new SamzaException(String.format("Exception occurred when serializing the partition: %s", systemStreamPartition), e);
}
}
/**
* Deserializes the {@param sspAsJson} in json format to {@link SystemStreamPartition}.
* @param sspAsJson the serialized SystemStreamPartition in json format.
* @return the deserialized SystemStreamPartition.
*/
private SystemStreamPartition deserializeSSPFromJson(String sspAsJson) {
try {
return sspMapper.readValue(sspAsJson, SystemStreamPartition.class);
} catch (IOException e) {
throw new SamzaException(String.format("Exception occurred when deserializing the partition: %s", sspAsJson), e);
}
}
}