blob: 767b9ce208a8cf60d59bf19ed4ea4dd797c3b8d0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.storage;
import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.TaskMode;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;
/**
* This class encapsulates all processing logic / state for all side input SSPs within a task.
*/
public class TaskSideInputHandler {
private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class);
private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>();
private final TaskName taskName;
private final TaskSideInputStorageManager taskSideInputStorageManager;
private final Map<SystemStreamPartition, Set<String>> sspToStores;
private final Map<String, SideInputsProcessor> storeToProcessor;
private final SystemAdmins systemAdmins;
private final StreamMetadataCache streamMetadataCache;
// indicates to ContainerStorageManager that all side input ssps in this task are caught up
private final CountDownLatch taskCaughtUpLatch;
private Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> initialSideInputSSPMetadata;
private Map<SystemStreamPartition, String> startingOffsets;
public TaskSideInputHandler(TaskName taskName, TaskMode taskMode, File storeBaseDir,
Map<String, StorageEngine> storeToStorageEngines, Map<String, Set<SystemStreamPartition>> storeToSSPs,
Map<String, SideInputsProcessor> storeToProcessor, SystemAdmins systemAdmins,
StreamMetadataCache streamMetadataCache, CountDownLatch taskCaughtUpLatch, Clock clock) {
validateProcessorConfiguration(storeToSSPs.keySet(), storeToProcessor);
this.taskName = taskName;
this.systemAdmins = systemAdmins;
this.streamMetadataCache = streamMetadataCache;
this.storeToProcessor = storeToProcessor;
this.taskCaughtUpLatch = taskCaughtUpLatch;
this.sspToStores = new HashMap<>();
storeToSSPs.forEach((store, ssps) -> {
for (SystemStreamPartition ssp: ssps) {
this.sspToStores.computeIfAbsent(ssp, key -> new HashSet<>());
this.sspToStores.computeIfPresent(ssp, (key, value) -> {
value.add(store);
return value;
});
}
});
this.taskSideInputStorageManager = new TaskSideInputStorageManager(taskName,
taskMode,
storeBaseDir,
storeToStorageEngines,
storeToSSPs,
clock);
}
/**
* The {@link TaskName} associated with this {@link TaskSideInputHandler}
*
* @return the task name for this handler
*/
public TaskName getTaskName() {
return this.taskName;
}
/**
* Initializes the underlying {@link TaskSideInputStorageManager} and determines starting offsets for each SSP.
*/
public void init() {
this.taskSideInputStorageManager.init();
Map<SystemStreamPartition, String> fileOffsets = this.taskSideInputStorageManager.getFileOffsets();
LOG.info("File offsets for the task {}: {}", taskName, fileOffsets);
this.lastProcessedOffsets.putAll(fileOffsets);
LOG.info("Last processed offsets for the task {}: {}", taskName, lastProcessedOffsets);
this.startingOffsets = getStartingOffsets(fileOffsets, getOldestOffsets());
LOG.info("Starting offsets for the task {}: {}", taskName, startingOffsets);
this.initialSideInputSSPMetadata = getInitialSideInputSSPMetadata();
LOG.info("Task {} will catch up to offsets {}", this.taskName, this.initialSideInputSSPMetadata);
this.startingOffsets.forEach((ssp, offset) -> checkCaughtUp(ssp, offset, SystemStreamMetadata.OffsetType.UPCOMING));
}
/**
* Retrieves the newest offset for each SSP
*
* @return a map of SSP to metadata
*/
private Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> getInitialSideInputSSPMetadata() {
Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> initialSideInputSSPMetadata = new HashMap<>();
for (SystemStreamPartition ssp : this.sspToStores.keySet()) {
boolean partitionsMetadataOnly = false;
SystemStreamMetadata systemStreamMetadata = this.streamMetadataCache.getSystemStreamMetadata(ssp.getSystemStream(), partitionsMetadataOnly);
if (systemStreamMetadata != null) {
SystemStreamMetadata.SystemStreamPartitionMetadata sspMetadata =
systemStreamMetadata.getSystemStreamPartitionMetadata().get(ssp.getPartition());
initialSideInputSSPMetadata.put(ssp, sspMetadata);
}
}
return initialSideInputSSPMetadata;
}
/**
* Processes the incoming side input message envelope and updates the last processed offset for its SSP.
* Synchronized inorder to be exclusive with flush().
*
* @param envelope incoming envelope to be processed
*/
public synchronized void process(IncomingMessageEnvelope envelope) {
SystemStreamPartition envelopeSSP = envelope.getSystemStreamPartition();
String envelopeOffset = envelope.getOffset();
for (String store: this.sspToStores.get(envelopeSSP)) {
SideInputsProcessor storeProcessor = this.storeToProcessor.get(store);
KeyValueStore keyValueStore = (KeyValueStore) this.taskSideInputStorageManager.getStore(store);
Collection<Entry<?, ?>> entriesToBeWritten = storeProcessor.process(envelope, keyValueStore);
// TODO: SAMZA-2255: optimize writes to side input stores
for (Entry entry : entriesToBeWritten) {
// If the key is null we ignore, if the value is null, we issue a delete, else we issue a put
if (entry.getKey() != null) {
if (entry.getValue() != null) {
keyValueStore.put(entry.getKey(), entry.getValue());
} else {
keyValueStore.delete(entry.getKey());
}
}
}
}
this.lastProcessedOffsets.put(envelopeSSP, envelopeOffset);
checkCaughtUp(envelopeSSP, envelopeOffset, SystemStreamMetadata.OffsetType.NEWEST);
}
/**
* Flushes the underlying {@link TaskSideInputStorageManager}
* Synchronized inorder to be exclusive with process()
*/
public synchronized void flush() {
this.taskSideInputStorageManager.flush(this.lastProcessedOffsets);
}
/**
* Gets the starting offset for the given side input {@link SystemStreamPartition}.
*
* Note: The method doesn't respect {@link org.apache.samza.config.StreamConfig#CONSUMER_OFFSET_DEFAULT} and
* {@link org.apache.samza.config.StreamConfig#CONSUMER_RESET_OFFSET} configurations. It will use the local offset
* file if it is valid, else it will fall back to oldest offset in the stream.
*
* @param ssp side input system stream partition to get the starting offset for
* @return the starting offset
*/
public String getStartingOffset(SystemStreamPartition ssp) {
return this.startingOffsets.get(ssp);
}
/**
* Gets the last processed offset for the given side input {@link SystemStreamPartition}.
*
* @param ssp side input system stream partition to get the last processed offset for
* @return the last processed offset
*/
public String getLastProcessedOffset(SystemStreamPartition ssp) {
return this.lastProcessedOffsets.get(ssp);
}
/**
* Stops the underlying storage manager at the last processed offsets. Any pending and upcoming invocations
* of {@link #process} and {@link #flush} are assumed to have completed or ceased prior to calling this method.
*/
public void stop() {
this.taskSideInputStorageManager.stop(this.lastProcessedOffsets);
}
/**
* Gets the starting offsets for the {@link SystemStreamPartition}s belonging to all the side input stores. See doc
* of {@link StorageManagerUtil#getStartingOffset} for how file offsets and oldest offsets for each SSP are
* reconciled.
*
* @param fileOffsets offsets from the local offset file
* @param oldestOffsets oldest offsets from the source
* @return a {@link Map} of {@link SystemStreamPartition} to offset
*/
@VisibleForTesting
Map<SystemStreamPartition, String> getStartingOffsets(
Map<SystemStreamPartition, String> fileOffsets, Map<SystemStreamPartition, String> oldestOffsets) {
Map<SystemStreamPartition, String> startingOffsets = new HashMap<>();
this.sspToStores.keySet().forEach(ssp -> {
String fileOffset = fileOffsets.get(ssp);
String oldestOffset = oldestOffsets.get(ssp);
startingOffsets.put(ssp,
this.storageManagerUtil.getStartingOffset(
ssp, this.systemAdmins.getSystemAdmin(ssp.getSystem()), fileOffset, oldestOffset));
});
return startingOffsets;
}
/**
* Gets the oldest offset for the {@link SystemStreamPartition}s associated with all the store side inputs.
* 1. Groups the list of the SSPs based on system stream
* 2. Fetches the {@link SystemStreamMetadata} from {@link StreamMetadataCache}
* 3. Fetches the partition metadata for each system stream and fetch the corresponding partition metadata
* and populates the oldest offset for SSPs belonging to the system stream.
*
* @return a {@link Map} of {@link SystemStreamPartition} to their oldest offset. If partitionMetadata could not be
* obtained for any {@link SystemStreamPartition} the offset for it is populated as null.
*/
@VisibleForTesting
Map<SystemStreamPartition, String> getOldestOffsets() {
Map<SystemStreamPartition, String> oldestOffsets = new HashMap<>();
// Step 1
Map<SystemStream, List<SystemStreamPartition>> systemStreamToSsp = this.sspToStores.keySet().stream()
.collect(Collectors.groupingBy(SystemStreamPartition::getSystemStream));
// Step 2
Map<SystemStream, SystemStreamMetadata> metadata = JavaConverters.mapAsJavaMapConverter(
this.streamMetadataCache.getStreamMetadata(
JavaConverters.asScalaSetConverter(systemStreamToSsp.keySet()).asScala().toSet(), false)).asJava();
// Step 3
metadata.forEach((systemStream, systemStreamMetadata) -> {
// get the partition metadata for each system stream
Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata =
systemStreamMetadata.getSystemStreamPartitionMetadata();
// For SSPs belonging to the system stream, use the partition metadata to get the oldest offset
// if partitionMetadata was not obtained for any SSP, populate oldest-offset as null
// Because of https://bugs.openjdk.java.net/browse/JDK-8148463 using lambda will NPE when getOldestOffset() is null
for (SystemStreamPartition ssp : systemStreamToSsp.get(systemStream)) {
oldestOffsets.put(ssp, partitionMetadata.get(ssp.getPartition()).getOldestOffset());
}
});
return oldestOffsets;
}
/**
* An SSP is considered caught up once the offset indicated for it in {@link #initialSideInputSSPMetadata} has been
* processed. Once the set of SSPs to catch up becomes empty, the latch for the task will count down, notifying
* {@link ContainerStorageManager} that it is caught up.
*
* @param ssp The SSP to be checked
* @param currentOffset The offset to be checked
* @param offsetTypeToCheck The type offset to compare {@code currentOffset} to.
*/
private void checkCaughtUp(SystemStreamPartition ssp, String currentOffset, SystemStreamMetadata.OffsetType offsetTypeToCheck) {
SystemStreamMetadata.SystemStreamPartitionMetadata sspMetadata = this.initialSideInputSSPMetadata.get(ssp);
String offsetToCheck = sspMetadata == null ? null : sspMetadata.getOffset(offsetTypeToCheck);
LOG.trace("Checking offset {} against {} offset {} for {}.", currentOffset, offsetToCheck, offsetTypeToCheck, ssp);
Integer comparatorResult;
if (currentOffset == null || offsetToCheck == null) {
comparatorResult = -1;
} else {
SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(ssp.getSystem());
comparatorResult = systemAdmin.offsetComparator(currentOffset, offsetToCheck);
}
// The SSP is no longer lagging if the envelope's offset is greater than or equal to the
// latest offset.
if (comparatorResult != null && comparatorResult.intValue() >= 0) {
LOG.info("Side input ssp {} has caught up to offset {}.", ssp, offsetToCheck);
// if its caught up, we remove the ssp from the map
this.initialSideInputSSPMetadata.remove(ssp);
if (this.initialSideInputSSPMetadata.isEmpty()) {
// if the metadata list is now empty, all SSPs in the task are caught up so count down the latch
// this will only happen once, when the last ssp catches up
this.taskCaughtUpLatch.countDown();
}
}
}
/**
* Validates that each store has an associated {@link SideInputsProcessor}
*/
private void validateProcessorConfiguration(Set<String> stores, Map<String, SideInputsProcessor> storeToProcessor) {
stores.forEach(storeName -> {
if (!storeToProcessor.containsKey(storeName)) {
throw new SamzaException(
String.format("Side inputs processor missing for store: %s.", storeName));
}
});
}
}