APEX-9 #resolve deleting checkpoint in different thread
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index fc0ce9b..5c7fe56 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -171,6 +171,7 @@
private List<AppDataSource> appDataSources = null;
private final Cache<Long, Object> commandResponse = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES).build();
private long lastLatencyWarningTime;
+ private transient ExecutorService poolExecutor;
//logic operator name to a queue of logical customMetrics. this gets cleared periodically
private final Map<String, Queue<Pair<Long, Map<String, Object>>>> logicalMetrics = Maps.newConcurrentMap();
@@ -321,6 +322,7 @@
{
this.clock = clock;
this.vars = new FinalVars(dag, clock.getTime());
+ poolExecutor = Executors.newFixedThreadPool(4);
// setup prior to plan creation for event recording
if (enableEventRecording) {
this.eventBus = new MBassador<StramEvent>(BusConfiguration.Default(1, 1, 1));
@@ -348,6 +350,7 @@
{
this.vars = checkpointedState.finals;
this.clock = new SystemClock();
+ poolExecutor = Executors.newFixedThreadPool(4);
this.plan = checkpointedState.physicalPlan;
this.eventBus = new MBassador<StramEvent>(BusConfiguration.Default(1, 1, 1));
setupWsClient();
@@ -494,6 +497,9 @@
for (FSJsonLineFile operatorFile : operatorFiles.values()) {
IOUtils.closeQuietly(operatorFile);
}
+ if(poolExecutor != null) {
+ poolExecutor.shutdown();
+ }
}
public void subscribeToEvents(Object listener)
@@ -1965,15 +1971,23 @@
private void purgeCheckpoints()
{
for (Pair<PTOperator, Long> p : purgeCheckpoints) {
- PTOperator operator = p.getFirst();
+ final PTOperator operator = p.getFirst();
if (!operator.isOperatorStateLess()) {
- try {
- operator.getOperatorMeta().getValue(OperatorContext.STORAGE_AGENT).delete(operator.getId(), p.getSecond());
- //LOG.debug("Purged checkpoint {} {}", operator.getId(), p.getSecond());
- }
- catch (Exception e) {
- LOG.error("Failed to purge checkpoint {}", p, e);
- }
+ final long windowId = p.getSecond();
+ Runnable r = new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try {
+ operator.getOperatorMeta().getValue(OperatorContext.STORAGE_AGENT).delete(operator.getId(), windowId);
+ }
+ catch (IOException ex) {
+ LOG.error("Failed to purge checkpoint for operator {} for windowId {}", operator, windowId, ex);
+ }
+ }
+ };
+ poolExecutor.submit(r);
}
// delete stream state when using buffer server
for (PTOperator.PTOutput out : operator.getOutputs()) {