| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.s4.core.ft; |
| |
| import java.lang.Thread.UncaughtExceptionHandler; |
| import java.util.concurrent.ArrayBlockingQueue; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.RejectedExecutionException; |
| import java.util.concurrent.ThreadFactory; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import org.apache.s4.core.ProcessingElement; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| import com.google.inject.Inject; |
| import com.google.inject.name.Named; |
| |
| /** |
| * |
| * <p> |
| * This class is responsible for coordinating interactions between the S4 event processor and the checkpoint storage |
| * backend. In particular, it schedules asynchronous save tasks to be executed on the backend. |
| * </p> |
| * |
| * |
| * |
| */ |
| public final class SafeKeeper implements CheckpointingFramework { |
| |
| private static final class UncaughtExceptionLogger implements UncaughtExceptionHandler { |
| String operationType; |
| |
| public UncaughtExceptionLogger(String operationType) { |
| this.operationType = operationType; |
| } |
| |
| @Override |
| public void uncaughtException(Thread t, Throwable e) { |
| logger.error("Cannot execute checkpointing " + operationType + " operation", e); |
| } |
| } |
| |
| private static Logger logger = LoggerFactory.getLogger(SafeKeeper.class); |
| |
| @Inject |
| private StateStorage stateStorage; |
| @Inject(optional = true) |
| private StorageCallbackFactory storageCallbackFactory = new LoggingStorageCallbackFactory(); |
| |
| private ThreadPoolExecutor storageThreadPool; |
| private ThreadPoolExecutor serializationThreadPool; |
| private ThreadPoolExecutor fetchingThreadPool; |
| |
| @Inject(optional = true) |
| @Named("s4.checkpointing.storageMaxThreads") |
| int storageMaxThreads = 1; |
| |
| @Inject(optional = true) |
| @Named("s4.checkpointing.storageThreadKeepAliveSeconds") |
| int storageThreadKeepAliveSeconds = 120; |
| |
| @Inject(optional = true) |
| @Named("s4.checkpointing.storageMaxOutstandingRequests") |
| int storageMaxOutstandingRequests = 1000; |
| |
| @Inject(optional = true) |
| @Named("s4.checkpointing.serializationMaxThreads") |
| int serializationMaxThreads = 1; |
| |
| @Inject(optional = true) |
| @Named("s4.checkpointing.serializationThreadKeepAliveSeconds") |
| int serializationThreadKeepAliveSeconds = 120; |
| |
| @Inject(optional = true) |
| @Named("s4.checkpointing.serializationMaxOutstandingRequests") |
| int serializationMaxOutstandingRequests = 1000; |
| |
| @Inject(optional = true) |
| @Named("s4.checkpointing.maxSerializationLockTime") |
| long maxSerializationLockTime = 1000; |
| |
| @Inject(optional = true) |
| @Named("s4.checkpointing.fetchingMaxThreads") |
| int fetchingMaxThreads = 1; |
| |
| @Inject(optional = true) |
| @Named("s4.checkpointing.fetchingThreadKeepAliveSeconds") |
| int fetchingThreadKeepAliveSeconds = 120; |
| |
| @Inject(optional = true) |
| @Named("s4.checkpointing.fetchingMaxWaitMs") |
| long fetchingMaxWaitMs = 1000; |
| |
| @Inject(optional = true) |
| @Named("s4.checkpointing.fetchingMaxConsecutiveFailuresBeforeDisabling") |
| int fetchingMaxConsecutiveFailuresBeforeDisabling = 10; |
| |
| @Inject(optional = true) |
| @Named("s4.checkpointing.fetchingDisabledDurationMs") |
| long fetchingDisabledDurationMs = 600000; |
| |
| @Inject(optional = true) |
| @Named("s4.checkpointing.fetchingQueueSize") |
| int fetchingQueueSize = 100; |
| |
| long fetchingDisabledInitTime = -1; |
| AtomicInteger fetchingCurrentConsecutiveFailures = new AtomicInteger(); |
| |
| public SafeKeeper() { |
| } |
| |
| @Inject |
| private void init() { |
| |
| // NOTE: those thread pools should be fine tuned according to backend and application load/requirements. |
| // For now: |
| // - number of threads and work queue size have overridable defaults |
| // - failures are logged |
| // - when storage queue is full, we throttle backwards to the serialization threadpool |
| // - when serialization queue is full, we abort execution for new entries |
| // - fetching uses a synchronous queue and therefore is a blocking operation, with a timeout |
| |
| ThreadFactory storageThreadFactory = new ThreadFactoryBuilder().setNameFormat("Checkpointing-storage-%d") |
| .setUncaughtExceptionHandler(new UncaughtExceptionLogger("storage")).build(); |
| storageThreadPool = new ThreadPoolExecutor(1, storageMaxThreads, storageThreadKeepAliveSeconds, |
| TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(storageMaxOutstandingRequests), |
| storageThreadFactory, new ThreadPoolExecutor.CallerRunsPolicy()); |
| storageThreadPool.allowCoreThreadTimeOut(true); |
| |
| ThreadFactory serializationThreadFactory = new ThreadFactoryBuilder() |
| .setNameFormat("Checkpointing-serialization-%d") |
| .setUncaughtExceptionHandler(new UncaughtExceptionLogger("serialization")).build(); |
| serializationThreadPool = new ThreadPoolExecutor(1, serializationMaxThreads, |
| serializationThreadKeepAliveSeconds, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>( |
| serializationMaxOutstandingRequests), serializationThreadFactory, |
| new ThreadPoolExecutor.AbortPolicy()); |
| serializationThreadPool.allowCoreThreadTimeOut(true); |
| |
| ThreadFactory fetchingThreadFactory = new ThreadFactoryBuilder().setNameFormat("Checkpointing-fetching-%d") |
| .setUncaughtExceptionHandler(new UncaughtExceptionLogger("fetching")).build(); |
| fetchingThreadPool = new ThreadPoolExecutor(0, fetchingMaxThreads, fetchingThreadKeepAliveSeconds, |
| TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(fetchingQueueSize), fetchingThreadFactory); |
| fetchingThreadPool.allowCoreThreadTimeOut(true); |
| |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.apache.s4.core.ft.CheckpointingFramework#saveState(org.apache.s4.core.ProcessingElement) |
| */ |
| @Override |
| public StorageCallback saveState(ProcessingElement pe) { |
| StorageCallback storageCallback = storageCallbackFactory.createStorageCallback(); |
| Future<byte[]> futureSerializedState = null; |
| try { |
| futureSerializedState = serializeState(pe); |
| } catch (RejectedExecutionException e) { |
| // if (monitor != null) { |
| // monitor.increment(MetricsName.checkpointing_dropped_from_serialization_queue.toString(), 1, |
| // S4_CORE_METRICS.toString()); |
| // } |
| storageCallback.storageOperationResult(StorageResultCode.FAILURE, |
| "Serialization task queue is full. An older serialization task was dumped in order to serialize PE [" |
| + pe.getId() + "]" + " Remaining capacity for the serialization task queue is [" |
| + serializationThreadPool.getQueue().remainingCapacity() + "] ; number of elements is [" |
| + serializationThreadPool.getQueue().size() + "] ; maximum capacity is [" |
| + serializationThreadPool + "]"); |
| return storageCallback; |
| } |
| submitSaveStateTask(new SaveStateTask(new CheckpointId(pe), futureSerializedState, storageCallback, |
| stateStorage), storageCallback); |
| return storageCallback; |
| } |
| |
| private Future<byte[]> serializeState(ProcessingElement pe) { |
| Future<byte[]> future = serializationThreadPool.submit(new SerializeTask(pe)); |
| // if (monitor != null) { |
| // monitor.increment(MetricsName.checkpointing_added_to_serialization_queue.toString(), 1, |
| // S4_CORE_METRICS.toString()); |
| // } |
| return future; |
| } |
| |
| private void submitSaveStateTask(SaveStateTask task, StorageCallback storageCallback) { |
| try { |
| storageThreadPool.execute(task); |
| // if (monitor != null) { |
| // monitor.increment(MetricsName.checkpointing_added_to_storage_queue.toString(), 1); |
| // } |
| } catch (RejectedExecutionException e) { |
| // if (monitor != null) { |
| // monitor.increment(MetricsName.checkpointing_dropped_from_storage_queue.toString(), 1); |
| // } |
| storageCallback.storageOperationResult(StorageResultCode.FAILURE, |
| "Storage checkpoint queue is full. Removed an old task to handle latest task. Remaining capacity for task queue is [" |
| + storageThreadPool.getQueue().remainingCapacity() + "] ; number of elements is [" |
| + storageThreadPool.getQueue().size() + "] ; maximum capacity is [" |
| + storageMaxOutstandingRequests + "]"); |
| } |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.apache.s4.core.ft.CheckpointingFramework#fetchSerializedState(org.apache.s4.core.ft.SafeKeeperId) |
| */ |
| @Override |
| public byte[] fetchSerializedState(CheckpointId key) { |
| |
| byte[] result = null; |
| |
| if (fetchingCurrentConsecutiveFailures.get() == fetchingMaxConsecutiveFailuresBeforeDisabling) { |
| if ((fetchingDisabledInitTime + fetchingDisabledDurationMs) < System.currentTimeMillis()) { |
| return null; |
| } else { |
| // reached time, reinit |
| fetchingCurrentConsecutiveFailures.set(0); |
| } |
| } |
| Future<byte[]> fetched = fetchingThreadPool.submit(new FetchTask(stateStorage, key)); |
| try { |
| result = fetched.get(fetchingMaxWaitMs, TimeUnit.MILLISECONDS); |
| fetchingCurrentConsecutiveFailures.set(0); |
| return result; |
| } catch (TimeoutException te) { |
| logger.error("Cannot fetch checkpoint from backend for key [{}] before timeout of {} ms", |
| key.getStringRepresentation(), fetchingMaxWaitMs); |
| } catch (InterruptedException e) { |
| logger.error( |
| "Cannot fetch checkpoint from backend for key [{}] before timeout of {} ms because of an interruption", |
| key.getStringRepresentation(), fetchingMaxWaitMs); |
| Thread.currentThread().interrupt(); |
| } catch (ExecutionException e) { |
| logger.error("Cannot fetch checkpoint from backend for key [{}] due to {}", key.getStringRepresentation(), |
| e.getCause().getClass().getName() + "/" + e.getCause().getMessage()); |
| } |
| if (fetchingCurrentConsecutiveFailures.incrementAndGet() == fetchingMaxConsecutiveFailuresBeforeDisabling) { |
| logger.trace( |
| "Due to {} successive checkpoint fetching failures, fetching is temporarily disabled for {} ms", |
| fetchingMaxConsecutiveFailuresBeforeDisabling, fetchingDisabledDurationMs); |
| fetchingDisabledInitTime = System.currentTimeMillis(); |
| } |
| |
| return result; |
| } |
| |
| @Override |
| public boolean isCheckpointable(ProcessingElement pe) { |
| if (pe.getCheckpointingConfig().mode.equals(CheckpointingConfig.CheckpointingMode.NONE)) { |
| return false; |
| } |
| if (pe.getCheckpointingConfig().frequency > 0 && pe.isDirty()) { |
| if (pe.getCheckpointingConfig().mode.equals(CheckpointingConfig.CheckpointingMode.EVENT_COUNT)) { |
| if (pe.getEventCount() % pe.getCheckpointingConfig().frequency == 0) { |
| return true; |
| } |
| } |
| } |
| |
| return false; |
| } |
| |
| } |