APEX-9 #resolve deleting checkpoint in different thread
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index fc0ce9b..5c7fe56 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -171,6 +171,7 @@
   private List<AppDataSource> appDataSources = null;
   private final Cache<Long, Object> commandResponse = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES).build();
   private long lastLatencyWarningTime;
+  private transient ExecutorService poolExecutor;
 
   //logic operator name to a queue of logical customMetrics. this gets cleared periodically
   private final Map<String, Queue<Pair<Long, Map<String, Object>>>> logicalMetrics = Maps.newConcurrentMap();
@@ -321,6 +322,7 @@
   {
     this.clock = clock;
     this.vars = new FinalVars(dag, clock.getTime());
+    poolExecutor = Executors.newFixedThreadPool(4);
     // setup prior to plan creation for event recording
     if (enableEventRecording) {
       this.eventBus = new MBassador<StramEvent>(BusConfiguration.Default(1, 1, 1));
@@ -348,6 +350,7 @@
   {
     this.vars = checkpointedState.finals;
     this.clock = new SystemClock();
+    poolExecutor = Executors.newFixedThreadPool(4);
     this.plan = checkpointedState.physicalPlan;
     this.eventBus = new MBassador<StramEvent>(BusConfiguration.Default(1, 1, 1));
     setupWsClient();
@@ -494,6 +497,9 @@
     for (FSJsonLineFile operatorFile : operatorFiles.values()) {
       IOUtils.closeQuietly(operatorFile);
     }
+    if(poolExecutor != null) {
+      poolExecutor.shutdown();
+    }
   }
 
   public void subscribeToEvents(Object listener)
@@ -1965,15 +1971,23 @@
   private void purgeCheckpoints()
   {
     for (Pair<PTOperator, Long> p : purgeCheckpoints) {
-      PTOperator operator = p.getFirst();
+      final PTOperator operator = p.getFirst();
       if (!operator.isOperatorStateLess()) {
-        try {
-          operator.getOperatorMeta().getValue(OperatorContext.STORAGE_AGENT).delete(operator.getId(), p.getSecond());
-          //LOG.debug("Purged checkpoint {} {}", operator.getId(), p.getSecond());
-        }
-        catch (Exception e) {
-          LOG.error("Failed to purge checkpoint {}", p, e);
-        }
+        final long windowId = p.getSecond();
+        Runnable r = new Runnable()
+        {
+          @Override
+          public void run()
+          {
+            try {
+              operator.getOperatorMeta().getValue(OperatorContext.STORAGE_AGENT).delete(operator.getId(), windowId);
+            }
+            catch (IOException ex) {
+              LOG.error("Failed to purge checkpoint for operator {} for windowId {}", operator, windowId, ex);
+            }
+          }
+        };
+        poolExecutor.submit(r);
       }
       // delete stream state when using buffer server
       for (PTOperator.PTOutput out : operator.getOutputs()) {