| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.samza.storage; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| import org.apache.samza.config.RunLoopConfig; |
| import scala.collection.JavaConversions; |
| |
| import java.io.File; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.Function; |
| import java.util.stream.Collectors; |
| import org.apache.samza.SamzaException; |
| import org.apache.samza.config.ApplicationConfig; |
| import org.apache.samza.config.Config; |
| import org.apache.samza.config.JobConfig; |
| import org.apache.samza.config.StorageConfig; |
| import org.apache.samza.config.TaskConfig; |
| import org.apache.samza.container.RunLoop; |
| import org.apache.samza.container.RunLoopTask; |
| import org.apache.samza.container.SamzaContainerMetrics; |
| import org.apache.samza.container.TaskInstanceMetrics; |
| import org.apache.samza.container.TaskName; |
| import org.apache.samza.context.ContainerContext; |
| import org.apache.samza.context.JobContext; |
| import org.apache.samza.job.model.ContainerModel; |
| import org.apache.samza.job.model.TaskMode; |
| import org.apache.samza.serializers.Serde; |
| import org.apache.samza.serializers.SerdeManager; |
| import org.apache.samza.storage.kv.Entry; |
| import org.apache.samza.storage.kv.KeyValueStore; |
| import org.apache.samza.system.IncomingMessageEnvelope; |
| import org.apache.samza.system.StreamMetadataCache; |
| import org.apache.samza.system.SystemAdmins; |
| import org.apache.samza.system.SystemConsumer; |
| import org.apache.samza.system.SystemConsumers; |
| import org.apache.samza.system.SystemConsumersMetrics; |
| import org.apache.samza.system.SystemFactory; |
| import org.apache.samza.system.SystemStream; |
| import org.apache.samza.system.SystemStreamMetadata; |
| import org.apache.samza.system.SystemStreamPartition; |
| import org.apache.samza.system.chooser.DefaultChooser; |
| import org.apache.samza.system.chooser.MessageChooser; |
| import org.apache.samza.system.chooser.RoundRobinChooserFactory; |
| import org.apache.samza.table.utils.SerdeUtils; |
| import org.apache.samza.task.TaskInstanceCollector; |
| import org.apache.samza.util.Clock; |
| import org.apache.samza.util.ReflectionUtil; |
| import org.apache.samza.util.ScalaJavaUtil; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| |
| public class SideInputsManager { |
| private static final Logger LOG = LoggerFactory.getLogger(SideInputsManager.class); |
| |
| private static final String SIDE_INPUTS_THREAD_NAME = "SideInputs Thread"; |
| // We use a prefix to differentiate the SystemConsumersMetrics for sideInputs from the ones in SamzaContainer |
| private static final String SIDE_INPUTS_METRICS_PREFIX = "side-inputs-"; |
| |
| // Timeout with which sideinput thread checks for exceptions and for whether SSPs as caught up |
| private static final int SIDE_INPUT_CHECK_TIMEOUT_SECONDS = 10; |
| private static final int SIDE_INPUT_SHUTDOWN_TIMEOUT_SECONDS = 60; |
| private static final int DEFAULT_SIDE_INPUT_ELASTICITY_FACTOR = 1; |
| |
| private final SamzaContainerMetrics samzaContainerMetrics; |
| private final Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics; |
| private final Config config; |
| |
| /* Sideinput related parameters */ |
| private final boolean hasSideInputs; |
| private final Map<TaskName, Map<String, StorageEngine>> sideInputStores; |
| // side inputs indexed first by task, then store name |
| private final Map<TaskName, Map<String, Set<SystemStreamPartition>>> taskSideInputStoreSSPs; |
| private final Set<String> sideInputStoreNames; |
| private final Map<SystemStreamPartition, TaskSideInputHandler> sspSideInputHandlers; |
| private SystemConsumers sideInputSystemConsumers; |
| |
| // Used by the sideInput-read thread to signal to the main thread. Map's contents are mutated. |
| private final Map<TaskName, CountDownLatch> sideInputTaskLatches; |
| private final ExecutorService sideInputsExecutor = Executors.newSingleThreadExecutor( |
| new ThreadFactoryBuilder().setDaemon(true).setNameFormat(SIDE_INPUTS_THREAD_NAME).build()); |
| private RunLoop sideInputRunLoop; // created in start() |
| |
| private volatile boolean shouldShutdown = false; |
| private volatile Throwable sideInputException = null; |
| |
| public SideInputsManager(Map<String, Set<SystemStream>> sideInputSystemStreams, |
| Map<String, SystemFactory> systemFactories, |
| Map<String, SystemStream> changelogSystemStreams, |
| Map<String, SystemStream> activeTaskChangelogSystemStreams, |
| Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories, |
| ContainerModel containerModel, JobContext jobContext, ContainerContext containerContext, |
| SamzaContainerMetrics samzaContainerMetrics, |
| Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics, |
| Map<TaskName, TaskInstanceCollector> taskInstanceCollectors, |
| StreamMetadataCache streamMetadataCache, |
| SystemAdmins systemAdmins, |
| SerdeManager serdeManager, Map<String, Serde<Object>> serdes, |
| StorageManagerUtil storageManagerUtil, |
| File loggedStoreBaseDirectory, File nonLoggedStoreBaseDirectory, |
| Config config, Clock clock) { |
| this.taskSideInputStoreSSPs = getTaskSideInputSSPs(sideInputSystemStreams, changelogSystemStreams, containerModel); |
| this.sideInputStoreNames = ContainerStorageManagerUtil.getSideInputStoreNames( |
| sideInputSystemStreams, changelogSystemStreams, containerModel); |
| this.sideInputTaskLatches = new HashMap<>(); |
| this.hasSideInputs = this.taskSideInputStoreSSPs.values().stream() |
| .flatMap(m -> m.values().stream()) |
| .flatMap(Collection::stream) |
| .findAny() |
| .isPresent(); |
| |
| this.taskInstanceMetrics = taskInstanceMetrics; |
| this.samzaContainerMetrics = samzaContainerMetrics; |
| this.config = config; |
| |
| // create side input taskStores for all tasks in the containerModel and each store in storageEngineFactories |
| this.sideInputStores = ContainerStorageManagerUtil.createTaskStores( |
| sideInputStoreNames, storageEngineFactories, sideInputStoreNames, |
| activeTaskChangelogSystemStreams, containerModel, jobContext, containerContext, serdes, |
| taskInstanceMetrics, taskInstanceCollectors, storageManagerUtil, |
| loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config); |
| |
| this.sspSideInputHandlers = createSideInputHandlers(hasSideInputs, sideInputStores, taskSideInputStoreSSPs, |
| sideInputTaskLatches, taskInstanceMetrics, containerModel, streamMetadataCache, systemAdmins, serdes, |
| loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock |
| ); |
| |
| // create SystemConsumers for consuming from taskSideInputSSPs, if sideInputs are being used |
| if (this.hasSideInputs) { |
| Set<SystemStream> containerSideInputSystemStreams = this.taskSideInputStoreSSPs.values().stream() |
| .flatMap(map -> map.values().stream()) |
| .flatMap(Set::stream) |
| .map(SystemStreamPartition::getSystemStream) |
| .collect(Collectors.toSet()); |
| |
| Set<String> containerSideInputSystems = containerSideInputSystemStreams.stream() |
| .map(SystemStream::getSystem) |
| .collect(Collectors.toSet()); |
| |
| // create sideInput consumers indexed by systemName |
| // Mapping from storeSystemNames to SystemConsumers |
| Map<String, SystemConsumer> sideInputConsumers = |
| ContainerStorageManagerUtil.createSystemConsumers(containerSideInputSystems, systemFactories, |
| samzaContainerMetrics.registry(), config); |
| |
| scala.collection.immutable.Map<SystemStream, SystemStreamMetadata> inputStreamMetadata = |
| streamMetadataCache.getStreamMetadata(JavaConversions.asScalaSet(containerSideInputSystemStreams).toSet(), false); |
| |
| // we use the same registry as samza-container-metrics |
| SystemConsumersMetrics sideInputSystemConsumersMetrics = |
| new SystemConsumersMetrics(samzaContainerMetrics.registry(), SIDE_INPUTS_METRICS_PREFIX); |
| |
| MessageChooser chooser = DefaultChooser.apply(inputStreamMetadata, new RoundRobinChooserFactory(), config, |
| sideInputSystemConsumersMetrics.registry(), systemAdmins); |
| |
| ApplicationConfig applicationConfig = new ApplicationConfig(config); |
| |
| this.sideInputSystemConsumers = |
| new SystemConsumers(chooser, ScalaJavaUtil.toScalaMap(sideInputConsumers), systemAdmins, serdeManager, |
| sideInputSystemConsumersMetrics, SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(), |
| SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(), |
| TaskConfig.DEFAULT_POLL_INTERVAL_MS, ScalaJavaUtil.toScalaFunction(() -> System.nanoTime()), |
| JobConfig.DEFAULT_JOB_ELASTICITY_FACTOR, applicationConfig.getRunId()); |
| } |
| } |
| |
| // read sideInputs until all sideInputStreams are caught up, then return |
| public void start() { |
| if (this.hasSideInputs) { |
| LOG.info("SideInput Restore started"); |
| |
| // initialize the sideInputStorageManagers |
| this.sspSideInputHandlers.values().forEach(TaskSideInputHandler::init); |
| |
| Map<TaskName, TaskSideInputHandler> taskSideInputHandlers = this.sspSideInputHandlers.values().stream() |
| .distinct() |
| .collect(Collectors.toMap(TaskSideInputHandler::getTaskName, Function.identity())); |
| |
| Map<TaskName, TaskInstanceMetrics> sideInputTaskMetrics = new HashMap<>(); |
| Map<TaskName, RunLoopTask> sideInputTasks = new HashMap<>(); |
| this.taskSideInputStoreSSPs.forEach((taskName, storesToSSPs) -> { |
| Set<SystemStreamPartition> taskSSPs = this.taskSideInputStoreSSPs.get(taskName).values().stream() |
| .flatMap(Set::stream) |
| .collect(Collectors.toSet()); |
| |
| if (!taskSSPs.isEmpty()) { |
| String sideInputSource = SIDE_INPUTS_METRICS_PREFIX + this.taskInstanceMetrics.get(taskName).source(); |
| TaskInstanceMetrics sideInputMetrics = new TaskInstanceMetrics( |
| sideInputSource, this.taskInstanceMetrics.get(taskName).registry(), SIDE_INPUTS_METRICS_PREFIX); |
| sideInputTaskMetrics.put(taskName, sideInputMetrics); |
| |
| RunLoopTask sideInputTask = new SideInputTask(taskName, taskSSPs, |
| taskSideInputHandlers.get(taskName), sideInputTaskMetrics.get(taskName), |
| new TaskConfig(config).getCommitMs()); |
| sideInputTasks.put(taskName, sideInputTask); |
| } |
| }); |
| |
| // register all sideInput SSPs with the consumers |
| for (SystemStreamPartition ssp : this.sspSideInputHandlers.keySet()) { |
| String startingOffset = this.sspSideInputHandlers.get(ssp).getStartingOffset(ssp); |
| |
| if (startingOffset == null) { |
| throw new SamzaException( |
| "No starting offset could be obtained for SideInput SystemStreamPartition : " + ssp + ". Consumer cannot start."); |
| } |
| |
| // register startingOffset with the sysConsumer and register a metric for it |
| sideInputSystemConsumers.register(ssp, startingOffset); |
| taskInstanceMetrics.get(this.sspSideInputHandlers.get(ssp).getTaskName()).addOffsetGauge( |
| ssp, ScalaJavaUtil.toScalaFunction(() -> this.sspSideInputHandlers.get(ssp).getLastProcessedOffset(ssp))); |
| sideInputTaskMetrics.get(this.sspSideInputHandlers.get(ssp).getTaskName()).addOffsetGauge( |
| ssp, ScalaJavaUtil.toScalaFunction(() -> this.sspSideInputHandlers.get(ssp).getLastProcessedOffset(ssp))); |
| } |
| |
| // start the systemConsumers for consuming input |
| this.sideInputSystemConsumers.start(); |
| |
| TaskConfig taskConfig = new TaskConfig(this.config); |
| SamzaContainerMetrics sideInputContainerMetrics = |
| new SamzaContainerMetrics(SIDE_INPUTS_METRICS_PREFIX + this.samzaContainerMetrics.source(), |
| this.samzaContainerMetrics.registry(), SIDE_INPUTS_METRICS_PREFIX); |
| |
| final ApplicationConfig applicationConfig = new ApplicationConfig(config); |
| |
| SideInputRunLoopConfig runLoopConfig = new SideInputRunLoopConfig(config); |
| this.sideInputRunLoop = new RunLoop(sideInputTasks, |
| null, // all operations are executed in the main runloop thread |
| this.sideInputSystemConsumers, |
| sideInputContainerMetrics, |
| System::nanoTime, |
| runLoopConfig); |
| |
| try { |
| sideInputsExecutor.submit(() -> { |
| try { |
| sideInputRunLoop.run(); |
| } catch (Exception e) { |
| LOG.error("Exception in reading sideInputs", e); |
| sideInputException = e; |
| } |
| }); |
| |
| // Make the main thread wait until all sideInputs have been caughtup or an exception was thrown |
| while (!shouldShutdown && sideInputException == null && |
| !awaitSideInputTasks(sideInputTaskLatches)) { |
| LOG.debug("Waiting for SideInput bootstrap to complete"); |
| } |
| |
| if (sideInputException != null) { // Throw exception if there was an exception in catching-up sideInputs |
| throw new SamzaException("Exception in restoring sideInputs", sideInputException); |
| } |
| |
| } catch (InterruptedException e) { |
| LOG.warn("Received an interrupt during side inputs store restoration." |
| + " Exiting prematurely without completing store restore."); |
| /* |
| * We want to stop side input restoration and rethrow the exception upstream. Container should handle the |
| * interrupt exception and shutdown the components and cleaning up the resource. We don't want to clean up the |
| * resources prematurely here. |
| */ |
| shouldShutdown = true; // todo: should we cancel the flush future right away or wait for container to handle it as part of shutdown sequence? |
| throw new SamzaException("Side inputs read was interrupted", e); |
| } |
| |
| LOG.info("SideInput Restore complete"); |
| } |
| } |
| |
| public Map<TaskName, Map<String, StorageEngine>> getSideInputStores() { |
| return ImmutableMap.copyOf(this.sideInputStores); |
| } |
| |
| public void shutdown() { |
| this.shouldShutdown = true; |
| |
| // stop all side input consumers and stores |
| if (this.hasSideInputs) { |
| this.sideInputRunLoop.shutdown(); |
| this.sideInputsExecutor.shutdown(); |
| try { |
| this.sideInputsExecutor.awaitTermination(SIDE_INPUT_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS); |
| } catch (InterruptedException e) { |
| throw new SamzaException("Exception while shutting down sideInputs", e); |
| } |
| |
| this.sideInputSystemConsumers.stop(); |
| |
| // stop all side input handlers -- this will perform one last flush on the KV stores, and write the offset file |
| this.sspSideInputHandlers.values().forEach(TaskSideInputHandler::stop); |
| } |
| } |
| |
| /** |
| * Add all sideInputs to a map of maps, indexed first by taskName, then by sideInput store name. |
| * |
| * @param sideInputSystemStreams the map of store to sideInput system stream |
| * @param changelogSystemStreams the map of store to changelog system stream |
| * @param containerModel the containerModel to use |
| * @return taskSideInputSSPs map |
| */ |
| @VisibleForTesting |
| static Map<TaskName, Map<String, Set<SystemStreamPartition>>> getTaskSideInputSSPs( |
| Map<String, Set<SystemStream>> sideInputSystemStreams, |
| Map<String, SystemStream> changelogSystemStreams, |
| ContainerModel containerModel) { |
| Map<TaskName, Map<String, Set<SystemStreamPartition>>> taskSideInputSSPs = new HashMap<>(); |
| |
| containerModel.getTasks().forEach((taskName, taskModel) -> { |
| taskSideInputSSPs.putIfAbsent(taskName, new HashMap<>()); |
| sideInputSystemStreams.keySet().forEach(storeName -> { |
| Set<SystemStreamPartition> taskSideInputs = taskModel.getSystemStreamPartitions().stream() |
| .filter(ssp -> sideInputSystemStreams.get(storeName).contains(ssp.getSystemStream())) |
| .collect(Collectors.toSet()); |
| taskSideInputSSPs.get(taskName).put(storeName, taskSideInputs); |
| }); |
| }); |
| |
| ContainerStorageManagerUtil.getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel) -> { |
| taskSideInputSSPs.putIfAbsent(taskName, new HashMap<>()); |
| changelogSystemStreams.forEach((storeName, systemStream) -> { |
| SystemStreamPartition ssp = new SystemStreamPartition(systemStream, taskModel.getChangelogPartition()); |
| taskSideInputSSPs.get(taskName).put(storeName, Collections.singleton(ssp)); |
| }); |
| }); |
| |
| return taskSideInputSSPs; |
| } |
| |
| // Create task sideInput storage managers, one per task, index by the SSP they are responsible for consuming |
| // Mutates (creates and adds to) sideInputTaskLatches. |
| private static Map<SystemStreamPartition, TaskSideInputHandler> createSideInputHandlers( |
| boolean hasSideInputs, Map<TaskName, Map<String, StorageEngine>> sideInputStores, |
| Map<TaskName, Map<String, Set<SystemStreamPartition>>> taskSideInputStoreSSPs, |
| Map<TaskName, CountDownLatch> sideInputTaskLatches, Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics, |
| ContainerModel containerModel, StreamMetadataCache streamMetadataCache, SystemAdmins systemAdmins, |
| Map<String, Serde<Object>> serdes, File loggedStoreBaseDirectory, File nonLoggedStoreBaseDirectory, |
| Config config, Clock clock) { |
| // creating sideInput store processors, one per store per task |
| Map<TaskName, Map<String, SideInputsProcessor>> taskSideInputProcessors = |
| createSideInputProcessors(taskInstanceMetrics, taskSideInputStoreSSPs, |
| containerModel, serdes, new StorageConfig(config)); |
| |
| Map<SystemStreamPartition, TaskSideInputHandler> handlers = new HashMap<>(); |
| |
| if (hasSideInputs) { |
| containerModel.getTasks().forEach((taskName, taskModel) -> { |
| |
| Map<String, StorageEngine> taskSideInputStores = sideInputStores.get(taskName); |
| Map<String, Set<SystemStreamPartition>> sideInputStoresToSSPs = new HashMap<>(); |
| boolean taskHasSideInputs = false; |
| for (String storeName : taskSideInputStores.keySet()) { |
| Set<SystemStreamPartition> storeSSPs = taskSideInputStoreSSPs.get(taskName).get(storeName); |
| taskHasSideInputs = taskHasSideInputs || !storeSSPs.isEmpty(); |
| sideInputStoresToSSPs.put(storeName, storeSSPs); |
| } |
| |
| if (taskHasSideInputs) { |
| CountDownLatch taskCountDownLatch = new CountDownLatch(1); |
| sideInputTaskLatches.put(taskName, taskCountDownLatch); |
| |
| TaskSideInputHandler taskSideInputHandler = new TaskSideInputHandler(taskName, |
| taskModel.getTaskMode(), |
| loggedStoreBaseDirectory, |
| taskSideInputStores, |
| sideInputStoresToSSPs, |
| taskSideInputProcessors.get(taskName), |
| systemAdmins, |
| streamMetadataCache, |
| taskCountDownLatch, |
| clock); |
| |
| sideInputStoresToSSPs.values().stream().flatMap(Set::stream).forEach(ssp -> { |
| handlers.put(ssp, taskSideInputHandler); |
| }); |
| |
| LOG.info("Created TaskSideInputHandler for task {}, taskSideInputStores {} and loggedStoreBaseDirectory {}", |
| taskName, taskSideInputStores, loggedStoreBaseDirectory); |
| } |
| }); |
| } |
| return handlers; |
| } |
| |
| // Create sideInput store processors, one per store per task |
| private static Map<TaskName, Map<String, SideInputsProcessor>> createSideInputProcessors( |
| Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics, |
| Map<TaskName, Map<String, Set<SystemStreamPartition>>> taskSideInputStoreSSPs, |
| ContainerModel containerModel, Map<String, Serde<Object>> serdes, StorageConfig config) { |
| |
| Map<TaskName, Map<String, SideInputsProcessor>> sideInputStoresToProcessors = new HashMap<>(); |
| containerModel.getTasks().forEach((taskName, taskModel) -> { |
| sideInputStoresToProcessors.put(taskName, new HashMap<>()); |
| TaskMode taskMode = taskModel.getTaskMode(); |
| |
| for (String storeName : taskSideInputStoreSSPs.get(taskName).keySet()) { |
| SideInputsProcessor sideInputsProcessor; |
| Optional<String> sideInputsProcessorSerializedInstance = |
| config.getSideInputsProcessorSerializedInstance(storeName); |
| |
| if (sideInputsProcessorSerializedInstance.isPresent()) { |
| |
| sideInputsProcessor = SerdeUtils.deserialize("Side Inputs Processor", sideInputsProcessorSerializedInstance.get()); |
| LOG.info("Using serialized side-inputs-processor for store: {}, task: {}", storeName, taskName); |
| |
| } else if (config.getSideInputsProcessorFactory(storeName).isPresent()) { |
| String sideInputsProcessorFactoryClassName = config.getSideInputsProcessorFactory(storeName).get(); |
| SideInputsProcessorFactory sideInputsProcessorFactory = |
| ReflectionUtil.getObj(sideInputsProcessorFactoryClassName, SideInputsProcessorFactory.class); |
| sideInputsProcessor = sideInputsProcessorFactory.getSideInputsProcessor( |
| config, taskInstanceMetrics.get(taskName).registry()); |
| LOG.info("Using side-inputs-processor from factory: {} for store: {}, task: {}", |
| config.getSideInputsProcessorFactory(storeName).get(), storeName, taskName); |
| |
| } else { |
| // if this is a active-task with a side-input store but no sideinput-processor-factory defined in config, |
| // we rely on upstream validations to fail the deploy |
| |
| // if this is a standby-task and the store is a non-side-input changelog store |
| // we creating identity sideInputProcessor for stores of standbyTasks |
| // have to use the right serde because the sideInput stores are created |
| |
| Serde keySerde = serdes.get(config.getStorageKeySerde(storeName) |
| .orElseThrow(() -> new SamzaException("Could not find storage key serde for store: " + storeName))); |
| Serde msgSerde = serdes.get(config.getStorageMsgSerde(storeName) |
| .orElseThrow(() -> new SamzaException("Could not find storage msg serde for store: " + storeName))); |
| sideInputsProcessor = new SideInputsProcessor() { |
| @Override |
| public Collection<Entry<?, ?>> process(IncomingMessageEnvelope message, KeyValueStore store) { |
| // Ignore message if the key is null |
| if (message.getKey() == null) { |
| return ImmutableList.of(); |
| } else { |
| // Skip serde if the message is null |
| return ImmutableList.of(new Entry<>(keySerde.fromBytes((byte[]) message.getKey()), |
| message.getMessage() == null ? null : msgSerde.fromBytes((byte[]) message.getMessage()))); |
| } |
| } |
| }; |
| LOG.info("Using identity side-inputs-processor for store: {}, task: {}", storeName, taskName); |
| } |
| |
| sideInputStoresToProcessors.get(taskName).put(storeName, sideInputsProcessor); |
| } |
| }); |
| |
| return sideInputStoresToProcessors; |
| } |
| |
| /** |
| * Waits for all side input tasks to catch up until a timeout. |
| * |
| * @return False if waiting on any latch timed out, true otherwise |
| * |
| * @throws InterruptedException if waiting any of the latches is interrupted |
| */ |
| private static boolean awaitSideInputTasks(Map<TaskName, CountDownLatch> sideInputTaskLatches) throws InterruptedException { |
| long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(SIDE_INPUT_CHECK_TIMEOUT_SECONDS); |
| for (CountDownLatch latch : sideInputTaskLatches.values()) { |
| long remainingMillisToWait = endTime - System.currentTimeMillis(); |
| if (remainingMillisToWait <= 0 || !latch.await(remainingMillisToWait, TimeUnit.MILLISECONDS)) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * Decorated {@link RunLoopConfig} used for side inputs flow in samza. The properties of {@link RunLoop} for side |
| * input use case is as follows |
| * 1. Max concurrency within a side input task is always <i>1</i>. This is critical as ordering of OPs (CRUD) for |
| * side input stores needs to be followed to recreate the correct snapshot of the external data |
| * 2. Side input tasks don't have any windows. We only allow users to plugin process functions |
| * 3. Commits are synchronous as we need to ensure data integrity upon state flushes |
| */ |
| private static class SideInputRunLoopConfig extends RunLoopConfig { |
| |
| public SideInputRunLoopConfig(Config config) { |
| super(config); |
| } |
| |
| @Override |
| public int getMaxConcurrency() { |
| return 1; |
| } |
| |
| @Override |
| public long getWindowMs() { |
| return -1; |
| } |
| |
| @Override |
| public int getElasticityFactor() { |
| return DEFAULT_SIDE_INPUT_ELASTICITY_FACTOR; |
| } |
| |
| // commit must be synchronous to ensure integrity of state flush |
| @Override |
| public boolean asyncCommitEnabled() { |
| return false; |
| } |
| } |
| } |