blob: 3ffc91b73007c29ea0f3f820baf9d9feb4e897d3 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.s4.core.ft;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.s4.core.ProcessingElement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* <p>
* This class is responsible for coordinating interactions between the S4 event processor and the checkpoint storage
* backend. In particular, it schedules asynchronous save tasks to be executed on the backend.
* </p>
public final class SafeKeeper implements CheckpointingFramework {
private static final class UncaughtExceptionLogger implements UncaughtExceptionHandler {
String operationType;
public UncaughtExceptionLogger(String operationType) {
this.operationType = operationType;
public void uncaughtException(Thread t, Throwable e) {
logger.error("Cannot execute checkpointing " + operationType + " operation", e);
private static Logger logger = LoggerFactory.getLogger(SafeKeeper.class);
private StateStorage stateStorage;
@Inject(optional = true)
private StorageCallbackFactory storageCallbackFactory = new LoggingStorageCallbackFactory();
private ThreadPoolExecutor storageThreadPool;
private ThreadPoolExecutor serializationThreadPool;
private ThreadPoolExecutor fetchingThreadPool;
@Inject(optional = true)
int storageMaxThreads = 1;
@Inject(optional = true)
int storageThreadKeepAliveSeconds = 120;
@Inject(optional = true)
int storageMaxOutstandingRequests = 1000;
@Inject(optional = true)
int serializationMaxThreads = 1;
@Inject(optional = true)
int serializationThreadKeepAliveSeconds = 120;
@Inject(optional = true)
int serializationMaxOutstandingRequests = 1000;
@Inject(optional = true)
long maxSerializationLockTime = 1000;
@Inject(optional = true)
int fetchingMaxThreads = 1;
@Inject(optional = true)
int fetchingThreadKeepAliveSeconds = 120;
@Inject(optional = true)
long fetchingMaxWaitMs = 1000;
@Inject(optional = true)
int fetchingMaxConsecutiveFailuresBeforeDisabling = 10;
@Inject(optional = true)
long fetchingDisabledDurationMs = 600000;
@Inject(optional = true)
int fetchingQueueSize = 100;
long fetchingDisabledInitTime = -1;
AtomicInteger fetchingCurrentConsecutiveFailures = new AtomicInteger();
public SafeKeeper() {
private void init() {
// NOTE: those thread pools should be fine tuned according to backend and application load/requirements.
// For now:
// - number of threads and work queue size have overridable defaults
// - failures are logged
// - when storage queue is full, we throttle backwards to the serialization threadpool
// - when serialization queue is full, we abort execution for new entries
// - fetching uses a synchronous queue and therefore is a blocking operation, with a timeout
ThreadFactory storageThreadFactory = new ThreadFactoryBuilder().setNameFormat("Checkpointing-storage-%d")
.setUncaughtExceptionHandler(new UncaughtExceptionLogger("storage")).build();
storageThreadPool = new ThreadPoolExecutor(1, storageMaxThreads, storageThreadKeepAliveSeconds,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(storageMaxOutstandingRequests),
storageThreadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
ThreadFactory serializationThreadFactory = new ThreadFactoryBuilder()
.setUncaughtExceptionHandler(new UncaughtExceptionLogger("serialization")).build();
serializationThreadPool = new ThreadPoolExecutor(1, serializationMaxThreads,
serializationThreadKeepAliveSeconds, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(
serializationMaxOutstandingRequests), serializationThreadFactory,
new ThreadPoolExecutor.AbortPolicy());
ThreadFactory fetchingThreadFactory = new ThreadFactoryBuilder().setNameFormat("Checkpointing-fetching-%d")
.setUncaughtExceptionHandler(new UncaughtExceptionLogger("fetching")).build();
fetchingThreadPool = new ThreadPoolExecutor(0, fetchingMaxThreads, fetchingThreadKeepAliveSeconds,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(fetchingQueueSize), fetchingThreadFactory);
* (non-Javadoc)
* @see org.apache.s4.core.ft.CheckpointingFramework#saveState(org.apache.s4.core.ProcessingElement)
public StorageCallback saveState(ProcessingElement pe) {
StorageCallback storageCallback = storageCallbackFactory.createStorageCallback();
Future<byte[]> futureSerializedState = null;
try {
futureSerializedState = serializeState(pe);
} catch (RejectedExecutionException e) {
// if (monitor != null) {
// monitor.increment(MetricsName.checkpointing_dropped_from_serialization_queue.toString(), 1,
// S4_CORE_METRICS.toString());
// }
"Serialization task queue is full. An older serialization task was dumped in order to serialize PE ["
+ pe.getId() + "]" + " Remaining capacity for the serialization task queue is ["
+ serializationThreadPool.getQueue().remainingCapacity() + "] ; number of elements is ["
+ serializationThreadPool.getQueue().size() + "] ; maximum capacity is ["
+ serializationThreadPool + "]");
return storageCallback;
submitSaveStateTask(new SaveStateTask(new CheckpointId(pe), futureSerializedState, storageCallback,
stateStorage), storageCallback);
return storageCallback;
private Future<byte[]> serializeState(ProcessingElement pe) {
Future<byte[]> future = serializationThreadPool.submit(new SerializeTask(pe));
// if (monitor != null) {
// monitor.increment(MetricsName.checkpointing_added_to_serialization_queue.toString(), 1,
// S4_CORE_METRICS.toString());
// }
return future;
private void submitSaveStateTask(SaveStateTask task, StorageCallback storageCallback) {
try {
// if (monitor != null) {
// monitor.increment(MetricsName.checkpointing_added_to_storage_queue.toString(), 1);
// }
} catch (RejectedExecutionException e) {
// if (monitor != null) {
// monitor.increment(MetricsName.checkpointing_dropped_from_storage_queue.toString(), 1);
// }
"Storage checkpoint queue is full. Removed an old task to handle latest task. Remaining capacity for task queue is ["
+ storageThreadPool.getQueue().remainingCapacity() + "] ; number of elements is ["
+ storageThreadPool.getQueue().size() + "] ; maximum capacity is ["
+ storageMaxOutstandingRequests + "]");
* (non-Javadoc)
* @see org.apache.s4.core.ft.CheckpointingFramework#fetchSerializedState(org.apache.s4.core.ft.SafeKeeperId)
public byte[] fetchSerializedState(CheckpointId key) {
byte[] result = null;
if (fetchingCurrentConsecutiveFailures.get() == fetchingMaxConsecutiveFailuresBeforeDisabling) {
if ((fetchingDisabledInitTime + fetchingDisabledDurationMs) < System.currentTimeMillis()) {
return null;
} else {
// reached time, reinit
Future<byte[]> fetched = fetchingThreadPool.submit(new FetchTask(stateStorage, key));
try {
result = fetched.get(fetchingMaxWaitMs, TimeUnit.MILLISECONDS);
return result;
} catch (TimeoutException te) {
logger.error("Cannot fetch checkpoint from backend for key [{}] before timeout of {} ms",
key.getStringRepresentation(), fetchingMaxWaitMs);
} catch (InterruptedException e) {
"Cannot fetch checkpoint from backend for key [{}] before timeout of {} ms because of an interruption",
key.getStringRepresentation(), fetchingMaxWaitMs);
} catch (ExecutionException e) {
logger.error("Cannot fetch checkpoint from backend for key [{}] due to {}", key.getStringRepresentation(),
e.getCause().getClass().getName() + "/" + e.getCause().getMessage());
if (fetchingCurrentConsecutiveFailures.incrementAndGet() == fetchingMaxConsecutiveFailuresBeforeDisabling) {
"Due to {} successive checkpoint fetching failures, fetching is temporarily disabled for {} ms",
fetchingMaxConsecutiveFailuresBeforeDisabling, fetchingDisabledDurationMs);
fetchingDisabledInitTime = System.currentTimeMillis();
return result;
public boolean isCheckpointable(ProcessingElement pe) {
if (pe.getCheckpointingConfig().mode.equals(CheckpointingConfig.CheckpointingMode.NONE)) {
return false;
if (pe.getCheckpointingConfig().frequency > 0 && pe.isDirty()) {
if (pe.getCheckpointingConfig().mode.equals(CheckpointingConfig.CheckpointingMode.EVENT_COUNT)) {
if (pe.getEventCount() % pe.getCheckpointingConfig().frequency == 0) {
return true;
return false;