blob: e9fcadb489e0b97591841a2ad64aef5594121adc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.container.grouper.task;
import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.samza.container.TaskName;
import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
import org.apache.samza.job.model.TaskMode;
import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.serializers.Serde;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Task assignment Manager is used to persist and read the task-to-container
* assignment information from the coordinator stream.
* */
public class TaskAssignmentManager {
private static final Logger LOG = LoggerFactory.getLogger(TaskAssignmentManager.class);
private final Map<String, String> taskNameToContainerId = new HashMap<>();
private final Serde<String> containerIdSerde;
private final Serde<String> taskModeSerde;
private MetadataStore taskContainerMappingMetadataStore;
private MetadataStore taskModeMappingMetadataStore;
/**
* Builds the TaskAssignmentManager based upon the provided {@link MetadataStore} that is instantiated.
* Setting up a metadata store instance is expensive which requires opening multiple connections
* and reading tons of information. Fully instantiated metadata store is taken as a constructor argument
* to reuse it across different utility classes. Uses the {@link CoordinatorStreamValueSerde} to serialize
* messages before reading/writing into metadata store.
*
* @param taskContainerMappingMetadataStore an instance of {@link MetadataStore} used to read/write the task to container assignments.
* @param taskModeMappingMetadataStore an instance of {@link MetadataStore} used to read/write the task to mode assignments.
*/
public TaskAssignmentManager(MetadataStore taskContainerMappingMetadataStore, MetadataStore taskModeMappingMetadataStore) {
Preconditions.checkNotNull(taskContainerMappingMetadataStore, "Metadata store cannot be null");
Preconditions.checkNotNull(taskModeMappingMetadataStore, "Metadata store cannot be null");
this.taskModeMappingMetadataStore = taskModeMappingMetadataStore;
this.taskContainerMappingMetadataStore = taskContainerMappingMetadataStore;
this.containerIdSerde = new CoordinatorStreamValueSerde(SetTaskContainerMapping.TYPE);
this.taskModeSerde = new CoordinatorStreamValueSerde(SetTaskModeMapping.TYPE);
}
/**
* Method to allow read container task information from {@link MetadataStore}. This method is used in {@link org.apache.samza.coordinator.JobModelManager}.
*
* @return the map of taskName: containerId
*/
public Map<String, String> readTaskAssignment() {
taskNameToContainerId.clear();
taskContainerMappingMetadataStore.all().forEach((taskName, valueBytes) -> {
String containerId = containerIdSerde.fromBytes(valueBytes);
if (containerId != null) {
taskNameToContainerId.put(taskName, containerId);
}
LOG.debug("Assignment for task {}: {}", taskName, containerId);
});
return Collections.unmodifiableMap(new HashMap<>(taskNameToContainerId));
}
public Map<TaskName, TaskMode> readTaskModes() {
Map<TaskName, TaskMode> taskModeMap = new HashMap<>();
taskModeMappingMetadataStore.all().forEach((taskName, valueBytes) -> {
String taskMode = taskModeSerde.fromBytes(valueBytes);
if (taskMode != null) {
taskModeMap.put(new TaskName(taskName), TaskMode.valueOf(taskMode));
}
LOG.debug("Task mode assignment for task {}: {}", taskName, taskMode);
});
return Collections.unmodifiableMap(new HashMap<>(taskModeMap));
}
/**
* Method to batch write task container info to {@link MetadataStore}.
* @param mappings the task and container mappings: (ContainerId, (TaskName, TaskMode))
*/
public void writeTaskContainerMappings(Map<String, Map<String, TaskMode>> mappings) {
for (String containerId : mappings.keySet()) {
Map<String, TaskMode> tasks = mappings.get(containerId);
for (String taskName : tasks.keySet()) {
TaskMode taskMode = tasks.get(taskName);
LOG.info("Storing task: {} and container ID: {} into metadata store", taskName, containerId);
String existingContainerId = taskNameToContainerId.get(taskName);
if (existingContainerId != null && !existingContainerId.equals(containerId)) {
LOG.info("Task \"{}\" in mode {} moved from container {} to container {}", new Object[]{taskName, taskMode, existingContainerId, containerId});
} else {
LOG.debug("Task \"{}\" in mode {} assigned to container {}", taskName, taskMode, containerId);
}
if (containerId == null) {
taskContainerMappingMetadataStore.delete(taskName);
taskModeMappingMetadataStore.delete(taskName);
taskNameToContainerId.remove(taskName);
} else {
taskContainerMappingMetadataStore.put(taskName, containerIdSerde.toBytes(containerId));
taskModeMappingMetadataStore.put(taskName, taskModeSerde.toBytes(taskMode.toString()));
taskNameToContainerId.put(taskName, containerId);
}
}
}
taskContainerMappingMetadataStore.flush();
taskModeMappingMetadataStore.flush();
}
/**
* Deletes the task container info from the {@link MetadataStore} for the task names.
*
* @param taskNames the task names for which the mapping will be deleted.
*/
public void deleteTaskContainerMappings(Iterable<String> taskNames) {
for (String taskName : taskNames) {
taskContainerMappingMetadataStore.delete(taskName);
taskModeMappingMetadataStore.delete(taskName);
taskNameToContainerId.remove(taskName);
}
taskContainerMappingMetadataStore.flush();
taskModeMappingMetadataStore.flush();
}
public void close() {
taskContainerMappingMetadataStore.close();
taskModeMappingMetadataStore.close();
}
}