blob: f0a4cc308335cd5470352521dea4c9782cd6a0bd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.startpoint;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.container.TaskName;
import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.system.SystemStreamPartition;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The StartpointManager reads and writes {@link Startpoint} to the provided {@link MetadataStore}
*
* The intention for the StartpointManager is to maintain a strong contract between the caller
* and how Startpoints are stored in the underlying MetadataStore.
*
* Startpoints are written in the MetadataStore using keys of two different formats:
* 1) {@link SystemStreamPartition} only
* 2) A combination of {@link SystemStreamPartition} and {@link TaskName}
*
* Startpoints are then fanned out to a fan out namespace in the MetadataStore by the
* {@link org.apache.samza.clustermanager.ClusterBasedJobCoordinator} or the standalone
* {@link org.apache.samza.coordinator.JobCoordinator} upon startup and the
* {@link org.apache.samza.checkpoint.OffsetManager} gets the fan outs to set the starting offsets per task and per
* {@link SystemStreamPartition}. The fan outs are deleted once the offsets are committed to the checkpoint.
*
* The read, write and delete methods are intended for external callers.
* The fan out methods are intended to be used within a job coordinator.
*/
public class StartpointManager {
private static final Integer VERSION = 1;
public static final String NAMESPACE = "samza-startpoint-v" + VERSION;
static final Duration DEFAULT_EXPIRATION_DURATION = Duration.ofHours(12);
private static final Logger LOG = LoggerFactory.getLogger(StartpointManager.class);
private static final String NAMESPACE_FAN_OUT = NAMESPACE + "-fan-out";
private final NamespaceAwareCoordinatorStreamStore fanOutStore;
private final NamespaceAwareCoordinatorStreamStore readWriteStore;
private final ObjectMapper objectMapper = StartpointObjectMapper.getObjectMapper();
private boolean stopped = true;
/**
* Builds the StartpointManager based upon the provided {@link MetadataStore} that is instantiated.
* Setting up a metadata store instance is expensive which requires opening multiple connections
* and reading tons of information. Fully instantiated metadata store is passed in as a constructor argument
* to reuse it across different utility classes.
*
* @param metadataStore an instance of {@link MetadataStore} used to read/write the start-points.
*/
public StartpointManager(MetadataStore metadataStore) {
Preconditions.checkNotNull(metadataStore, "MetadataStore cannot be null");
this.readWriteStore = new NamespaceAwareCoordinatorStreamStore(metadataStore, NAMESPACE);
this.fanOutStore = new NamespaceAwareCoordinatorStreamStore(metadataStore, NAMESPACE_FAN_OUT);
LOG.info("Startpoints are written to namespace: {} and fanned out to namespace: {} in the metadata store of type: {}",
NAMESPACE, NAMESPACE_FAN_OUT, metadataStore.getClass().getCanonicalName());
}
/**
* Perform startup operations. Method is idempotent.
*/
public void start() {
if (stopped) {
LOG.info("starting");
readWriteStore.init();
fanOutStore.init();
stopped = false;
} else {
LOG.warn("already started");
}
}
/**
* Perform teardown operations. Method is idempotent.
*/
public void stop() {
if (!stopped) {
LOG.info("stopping");
readWriteStore.close();
fanOutStore.close();
stopped = true;
} else {
LOG.warn("already stopped");
}
}
/**
* Writes a {@link Startpoint} that defines the start position for a {@link SystemStreamPartition}.
* @param ssp The {@link SystemStreamPartition} to map the {@link Startpoint} against.
* @param startpoint Reference to a Startpoint object.
*/
public void writeStartpoint(SystemStreamPartition ssp, Startpoint startpoint) {
writeStartpoint(ssp, null, startpoint);
}
/**
* Writes a {@link Startpoint} that defines the start position for a {@link SystemStreamPartition} and {@link TaskName}.
* @param ssp The {@link SystemStreamPartition} to map the {@link Startpoint} against.
* @param taskName The {@link TaskName} to map the {@link Startpoint} against.
* @param startpoint Reference to a Startpoint object.
*/
public void writeStartpoint(SystemStreamPartition ssp, TaskName taskName, Startpoint startpoint) {
Preconditions.checkState(!stopped, "Underlying metadata store not available");
Preconditions.checkNotNull(ssp, "SystemStreamPartition cannot be null");
Preconditions.checkNotNull(startpoint, "Startpoint cannot be null");
try {
readWriteStore.put(toReadWriteStoreKey(ssp, taskName), objectMapper.writeValueAsBytes(startpoint));
readWriteStore.flush();
} catch (Exception ex) {
throw new SamzaException(String.format(
"Startpoint for SSP: %s and task: %s may not have been written to the metadata store.", ssp, taskName), ex);
}
}
/**
* Returns the last {@link Startpoint} that defines the start position for a {@link SystemStreamPartition}.
* @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for.
* @return {@link Optional} of {@link Startpoint} for the {@link SystemStreamPartition}.
* It is empty if it does not exist or if it is too stale.
*/
@VisibleForTesting
public Optional<Startpoint> readStartpoint(SystemStreamPartition ssp) {
Map<String, byte[]> startpointBytes = readWriteStore.all();
// there is no task-name to use as key for the startpoint in this case (only the ssp), so we use a null task-name
return readStartpoint(startpointBytes, ssp, null);
}
/**
* Returns the last {@link Startpoint} that defines the start position for a {@link SystemStreamPartition} and {@link TaskName}.
* @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for.
* @param taskName the {@link TaskName} to fetch the {@link Startpoint} for.
* @return {@link Optional} of {@link Startpoint} for the {@link SystemStreamPartition}.
* It is empty if it does not exist or if it is too stale.
*/
@VisibleForTesting
public Optional<Startpoint> readStartpoint(SystemStreamPartition ssp, TaskName taskName) {
Map<String, byte[]> startpointBytes = readWriteStore.all();
return readStartpoint(startpointBytes, ssp, taskName);
}
/**
* Returns the {@link Startpoint} for a {@link SystemStreamPartition} and {@link TaskName}.
* @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for.
* @param taskName The {@link TaskName} to fetch the {@link Startpoint} for.
* @return {@link Optional} of {@link Startpoint} for the {@link SystemStreamPartition} and {@link TaskName}.
* It is empty if it does not exist or if it is too stale.
*/
public Optional<Startpoint> readStartpoint(Map<String, byte[]> startpointMap, SystemStreamPartition ssp, TaskName taskName) {
Preconditions.checkState(!stopped, "Underlying metadata store not available");
Preconditions.checkNotNull(ssp, "SystemStreamPartition cannot be null");
byte[] startpointBytes = startpointMap.get(toReadWriteStoreKey(ssp, taskName));
if (ArrayUtils.isNotEmpty(startpointBytes)) {
try {
Startpoint startpoint = objectMapper.readValue(startpointBytes, Startpoint.class);
if (Instant.now().minus(DEFAULT_EXPIRATION_DURATION).isBefore(Instant.ofEpochMilli(startpoint.getCreationTimestamp()))) {
return Optional.of(startpoint); // return if deserializable and if not stale
}
LOG.warn("Creation timestamp: {} of startpoint: {} has crossed the expiration duration: {}. Ignoring it",
startpoint.getCreationTimestamp(), startpoint, DEFAULT_EXPIRATION_DURATION);
} catch (IOException ex) {
throw new SamzaException(ex);
}
}
return Optional.empty();
}
/**
* Deletes the {@link Startpoint} for a {@link SystemStreamPartition}
* @param ssp The {@link SystemStreamPartition} to delete the {@link Startpoint} for.
*/
public void deleteStartpoint(SystemStreamPartition ssp) {
deleteStartpoint(ssp, null);
}
/**
* Deletes the {@link Startpoint} for a {@link SystemStreamPartition} and {@link TaskName}.
* @param ssp ssp The {@link SystemStreamPartition} to delete the {@link Startpoint} for.
* @param taskName ssp The {@link TaskName} to delete the {@link Startpoint} for.
*/
public void deleteStartpoint(SystemStreamPartition ssp, TaskName taskName) {
Preconditions.checkState(!stopped, "Underlying metadata store not available");
Preconditions.checkNotNull(ssp, "SystemStreamPartition cannot be null");
readWriteStore.delete(toReadWriteStoreKey(ssp, taskName));
readWriteStore.flush();
}
/**
* Deletes all {@link Startpoint}s
*/
public void deleteAllStartpoints() {
Set<String> readWriteKeys = readWriteStore.all().keySet();
for (String key : readWriteKeys) {
readWriteStore.delete(key);
}
readWriteStore.flush();
}
/**
* The Startpoints that are written to with {@link #writeStartpoint(SystemStreamPartition, Startpoint)} and with
* {@link #writeStartpoint(SystemStreamPartition, TaskName, Startpoint)} are moved from a "read-write" namespace
* to a "fan out" namespace.
* This method is not atomic or thread-safe. The intent is for the Samza Processor's coordinator to use this
* method to assign the Startpoints to the appropriate tasks.
* @param taskToSSPs Determines which {@link TaskName} each {@link SystemStreamPartition} maps to.
* @return The set of active {@link TaskName}s that were fanned out to.
*/
public Map<TaskName, Map<SystemStreamPartition, Startpoint>> fanOut(Map<TaskName, Set<SystemStreamPartition>> taskToSSPs) throws IOException {
Preconditions.checkState(!stopped, "Underlying metadata store not available");
Preconditions.checkArgument(MapUtils.isNotEmpty(taskToSSPs), "taskToSSPs cannot be null or empty");
// construct fan out with the existing readWriteStore entries and mark the entries for deletion after fan out
Instant now = Instant.now();
HashMultimap<SystemStreamPartition, TaskName> deleteKeys = HashMultimap.create();
HashMap<TaskName, StartpointFanOutPerTask> fanOuts = new HashMap<>();
Map<String, byte[]> startpointMap = readWriteStore.all();
for (TaskName taskName : taskToSSPs.keySet()) {
Set<SystemStreamPartition> ssps = taskToSSPs.get(taskName);
if (CollectionUtils.isEmpty(ssps)) {
LOG.warn("No SSPs are mapped to taskName: {}", taskName.getTaskName());
continue;
}
for (SystemStreamPartition ssp : ssps) {
Optional<Startpoint> startpoint = readStartpoint(startpointMap, ssp, null); // Read SSP-only key
startpoint.ifPresent(sp -> deleteKeys.put(ssp, null));
Optional<Startpoint> startpointForTask = readStartpoint(startpointMap, ssp, taskName); // Read SSP+taskName key
startpointForTask.ifPresent(sp -> deleteKeys.put(ssp, taskName));
Optional<Startpoint> startpointWithPrecedence = resolveStartpointPrecendence(startpoint, startpointForTask);
if (!startpointWithPrecedence.isPresent()) {
continue;
}
fanOuts.putIfAbsent(taskName, new StartpointFanOutPerTask(now));
fanOuts.get(taskName).getFanOuts().put(ssp, startpointWithPrecedence.get());
}
}
if (fanOuts.isEmpty()) {
LOG.debug("No fan outs created.");
return ImmutableMap.of();
}
LOG.info("Fanning out to {} tasks", fanOuts.size());
// Fan out to store
for (TaskName taskName : fanOuts.keySet()) {
String fanOutKey = toFanOutStoreKey(taskName);
StartpointFanOutPerTask newFanOut = fanOuts.get(taskName);
fanOutStore.put(fanOutKey, objectMapper.writeValueAsBytes(newFanOut));
}
fanOutStore.flush();
for (SystemStreamPartition ssp : deleteKeys.keySet()) {
for (TaskName taskName : deleteKeys.get(ssp)) {
if (taskName != null) {
deleteStartpoint(ssp, taskName);
} else {
deleteStartpoint(ssp);
}
}
}
return ImmutableMap.copyOf(fanOuts.entrySet().stream()
.collect(Collectors.toMap(fo -> fo.getKey(), fo -> fo.getValue().getFanOuts())));
}
/**
* Read the fanned out {@link Startpoint}s for the given {@link TaskName}
* @param taskName to read the fan out Startpoints for
* @return fanned out Startpoints
*/
public Map<SystemStreamPartition, Startpoint> getFanOutForTask(TaskName taskName) throws IOException {
Preconditions.checkState(!stopped, "Underlying metadata store not available");
Preconditions.checkNotNull(taskName, "TaskName cannot be null");
byte[] fanOutBytes = fanOutStore.get(toFanOutStoreKey(taskName));
if (ArrayUtils.isEmpty(fanOutBytes)) {
return ImmutableMap.of();
}
StartpointFanOutPerTask startpointFanOutPerTask = objectMapper.readValue(fanOutBytes, StartpointFanOutPerTask.class);
return ImmutableMap.copyOf(startpointFanOutPerTask.getFanOuts());
}
/**
* Deletes the fanned out {@link Startpoint} for the given {@link TaskName}
* @param taskName to delete the fan out Startpoints for
*/
public void removeFanOutForTask(TaskName taskName) {
Preconditions.checkState(!stopped, "Underlying metadata store not available");
Preconditions.checkNotNull(taskName, "TaskName cannot be null");
fanOutStore.delete(toFanOutStoreKey(taskName));
fanOutStore.flush();
}
/**
* Deletes all fanned out {@link Startpoint}s
*/
public void removeAllFanOuts() {
Set<String> fanOutKeys = fanOutStore.all().keySet();
for (String key : fanOutKeys) {
fanOutStore.delete(key);
}
fanOutStore.flush();
}
@VisibleForTesting
MetadataStore getReadWriteStore() {
return readWriteStore;
}
@VisibleForTesting
MetadataStore getFanOutStore() {
return fanOutStore;
}
@VisibleForTesting
ObjectMapper getObjectMapper() {
return objectMapper;
}
private static Optional<Startpoint> resolveStartpointPrecendence(Optional<Startpoint> startpoint1, Optional<Startpoint> startpoint2) {
if (startpoint1.isPresent() && startpoint2.isPresent()) {
// if SSP-only and SSP+taskName startpoints both exist, resolve to the one with the latest timestamp
if (startpoint1.get().getCreationTimestamp() > startpoint2.get().getCreationTimestamp()) {
return startpoint1;
}
return startpoint2;
}
return startpoint1.isPresent() ? startpoint1 : startpoint2;
}
private static String toReadWriteStoreKey(SystemStreamPartition ssp, TaskName taskName) {
Preconditions.checkArgument(ssp != null, "SystemStreamPartition should be defined");
Preconditions.checkArgument(StringUtils.isNotBlank(ssp.getSystem()), "System should be defined");
Preconditions.checkArgument(StringUtils.isNotBlank(ssp.getStream()), "Stream should be defined");
Preconditions.checkArgument(ssp.getPartition() != null, "Partition should be defined");
String storeKey = ssp.getSystem() + "." + ssp.getStream() + "." + String.valueOf(ssp.getPartition().getPartitionId());
if (taskName != null) {
storeKey += "." + taskName.getTaskName();
}
return storeKey;
}
private static String toFanOutStoreKey(TaskName taskName) {
Preconditions.checkArgument(taskName != null, "TaskName should be defined");
Preconditions.checkArgument(StringUtils.isNotBlank(taskName.getTaskName()), "TaskName should not be blank");
return taskName.getTaskName();
}
}