Merge pull request #216 from metamx/announce-fix

Make batch data segment announcer thread safe
diff --git a/indexing-common/src/main/java/com/metamx/druid/common/s3/S3Utils.java b/indexing-common/src/main/java/com/metamx/druid/common/s3/S3Utils.java
index 15fa7c8..84ce35d 100644
--- a/indexing-common/src/main/java/com/metamx/druid/common/s3/S3Utils.java
+++ b/indexing-common/src/main/java/com/metamx/druid/common/s3/S3Utils.java
@@ -53,7 +53,7 @@
    * Retries S3 operations that fail due to io-related exceptions. Service-level exceptions (access denied, file not
    * found, etc) are not retried.
    */
-  public static <T> T retryS3Operation(Callable<T> f) throws ServiceException, InterruptedException
+  public static <T> T retryS3Operation(Callable<T> f) throws IOException, ServiceException, InterruptedException
   {
     int nTry = 0;
     final int maxTries = 3;
@@ -66,7 +66,7 @@
         if (nTry <= maxTries) {
           awaitNextRetry(e, nTry);
         } else {
-          throw Throwables.propagate(e);
+          throw e;
         }
       }
       catch (ServiceException e) {
diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/S3TaskLogs.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/S3TaskLogs.java
index a09f2ef..89c1b6d 100644
--- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/S3TaskLogs.java
+++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/S3TaskLogs.java
@@ -5,6 +5,7 @@
 import com.google.common.base.Throwables;
 import com.google.common.io.InputSupplier;
 import com.metamx.common.logger.Logger;
+import com.metamx.druid.common.s3.S3Utils;
 import org.jets3t.service.ServiceException;
 import org.jets3t.service.StorageService;
 import org.jets3t.service.impl.rest.httpclient.RestS3Service;
@@ -13,6 +14,7 @@
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.concurrent.Callable;
 
 /**
  * Provides task logs archived on S3.
@@ -86,16 +88,25 @@
     }
   }
 
-  public void pushTaskLog(String taskid, File logFile) throws IOException
+  public void pushTaskLog(final String taskid, final File logFile) throws IOException
   {
     final String taskKey = getTaskLogKey(taskid);
+    log.info("Pushing task log %s to: %s", logFile, taskKey);
 
     try {
-      log.info("Pushing task log %s to: %s", logFile, taskKey);
-
-      final StorageObject object = new StorageObject(logFile);
-      object.setKey(taskKey);
-      service.putObject(bucket, object);
+      S3Utils.retryS3Operation(
+          new Callable<Void>()
+          {
+            @Override
+            public Void call() throws Exception
+            {
+              final StorageObject object = new StorageObject(logFile);
+              object.setKey(taskKey);
+              service.putObject(bucket, object);
+              return null;
+            }
+          }
+      );
     }
     catch (Exception e) {
       Throwables.propagateIfInstanceOf(e, IOException.class);
diff --git a/pom.xml b/pom.xml
index 3b81a46..fd320e4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -38,7 +38,7 @@
 
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <metamx.java-util.version>0.22.6</metamx.java-util.version>
+        <metamx.java-util.version>0.23.0</metamx.java-util.version>
         <apache.curator.version>2.1.0-incubating</apache.curator.version>
     </properties>
 
diff --git a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java
index 75d2128..c6a82ff 100644
--- a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java
+++ b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java
@@ -40,7 +40,6 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Map;
-import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.zip.GZIPInputStream;
 
@@ -154,6 +153,9 @@
     catch (InterruptedException e) {
       throw Throwables.propagate(e);
     }
+    catch (IOException e) {
+      throw new SegmentLoadingException(e, "S3 fail! Key[%s]", coords);
+    }
     catch (ServiceException e) {
       throw new SegmentLoadingException(e, "S3 fail! Key[%s]", coords);
     }
@@ -179,6 +181,9 @@
     catch (InterruptedException e) {
       throw Throwables.propagate(e);
     }
+    catch (IOException e) {
+      throw new SegmentLoadingException(e, e.getMessage());
+    }
     catch (ServiceException e) {
       throw new SegmentLoadingException(e, e.getMessage());
     }
diff --git a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPusher.java b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPusher.java
index d9ac69e..a5462a8 100644
--- a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPusher.java
+++ b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPusher.java
@@ -21,21 +21,23 @@
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Joiner;
+import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.io.ByteStreams;
 import com.google.common.io.Files;
 import com.metamx.druid.client.DataSegment;
+import com.metamx.druid.common.s3.S3Utils;
 import com.metamx.druid.index.v1.IndexIO;
 import com.metamx.druid.utils.CompressionUtils;
 import com.metamx.emitter.EmittingLogger;
-import org.jets3t.service.S3ServiceException;
+import org.jets3t.service.ServiceException;
 import org.jets3t.service.acl.gs.GSAccessControlList;
 import org.jets3t.service.impl.rest.httpclient.RestS3Service;
 import org.jets3t.service.model.S3Object;
 
 import java.io.File;
 import java.io.IOException;
-import java.security.NoSuchAlgorithmException;
+import java.util.concurrent.Callable;
 
 public class S3DataSegmentPusher implements DataSegmentPusher
 {
@@ -63,61 +65,76 @@
   }
 
   @Override
-  public DataSegment push(final File indexFilesDir, DataSegment segment) throws IOException
+  public DataSegment push(final File indexFilesDir, final DataSegment inSegment) throws IOException
   {
     log.info("Uploading [%s] to S3", indexFilesDir);
-    String outputKey = JOINER.join(
+    final String outputKey = JOINER.join(
         config.getBaseKey().isEmpty() ? null : config.getBaseKey(),
-        DataSegmentPusherUtil.getStorageDir(segment)
+        DataSegmentPusherUtil.getStorageDir(inSegment)
     );
-
     final File zipOutFile = File.createTempFile("druid", "index.zip");
-    long indexSize = CompressionUtils.zip(indexFilesDir, zipOutFile);
+    final long indexSize = CompressionUtils.zip(indexFilesDir, zipOutFile);
 
     try {
-      S3Object toPush = new S3Object(zipOutFile);
+      return S3Utils.retryS3Operation(
+          new Callable<DataSegment>()
+          {
+            @Override
+            public DataSegment call() throws Exception
+            {
+              S3Object toPush = new S3Object(zipOutFile);
 
-      final String outputBucket = config.getBucket();
-      toPush.setBucketName(outputBucket);
-      toPush.setKey(outputKey + "/index.zip");
-      if (!config.getDisableAcl()) {
-        toPush.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
-      }
+              final String outputBucket = config.getBucket();
+              toPush.setBucketName(outputBucket);
+              toPush.setKey(outputKey + "/index.zip");
+              if (!config.getDisableAcl()) {
+                toPush.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
+              }
 
-      log.info("Pushing %s.", toPush);
-      s3Client.putObject(outputBucket, toPush);
+              log.info("Pushing %s.", toPush);
+              s3Client.putObject(outputBucket, toPush);
 
-      segment = segment.withSize(indexSize)
-                       .withLoadSpec(
-                           ImmutableMap.<String, Object>of("type", "s3_zip", "bucket", outputBucket, "key", toPush.getKey())
-                       )
-                       .withBinaryVersion(IndexIO.getVersionFromDir(indexFilesDir));
+              final DataSegment outSegment = inSegment.withSize(indexSize)
+                                                      .withLoadSpec(
+                                                          ImmutableMap.<String, Object>of(
+                                                              "type",
+                                                              "s3_zip",
+                                                              "bucket",
+                                                              outputBucket,
+                                                              "key",
+                                                              toPush.getKey()
+                                                          )
+                                                      )
+                                                      .withBinaryVersion(IndexIO.getVersionFromDir(indexFilesDir));
 
-      File descriptorFile = File.createTempFile("druid", "descriptor.json");
-      Files.copy(ByteStreams.newInputStreamSupplier(jsonMapper.writeValueAsBytes(segment)), descriptorFile);
-      S3Object descriptorObject = new S3Object(descriptorFile);
-      descriptorObject.setBucketName(outputBucket);
-      descriptorObject.setKey(outputKey + "/descriptor.json");
-      if (!config.getDisableAcl()) {
-        descriptorObject.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
-      }
+              File descriptorFile = File.createTempFile("druid", "descriptor.json");
+              Files.copy(ByteStreams.newInputStreamSupplier(jsonMapper.writeValueAsBytes(inSegment)), descriptorFile);
+              S3Object descriptorObject = new S3Object(descriptorFile);
+              descriptorObject.setBucketName(outputBucket);
+              descriptorObject.setKey(outputKey + "/descriptor.json");
+              if (!config.getDisableAcl()) {
+                descriptorObject.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
+              }
 
-      log.info("Pushing %s", descriptorObject);
-      s3Client.putObject(outputBucket, descriptorObject);
+              log.info("Pushing %s", descriptorObject);
+              s3Client.putObject(outputBucket, descriptorObject);
 
-      log.info("Deleting zipped index File[%s]", zipOutFile);
-      zipOutFile.delete();
+              log.info("Deleting zipped index File[%s]", zipOutFile);
+              zipOutFile.delete();
 
-      log.info("Deleting descriptor file[%s]", descriptorFile);
-      descriptorFile.delete();
+              log.info("Deleting descriptor file[%s]", descriptorFile);
+              descriptorFile.delete();
 
-      return segment;
+              return outSegment;
+            }
+          }
+      );
     }
-    catch (NoSuchAlgorithmException e) {
+    catch (ServiceException e) {
       throw new IOException(e);
     }
-    catch (S3ServiceException e) {
-      throw new IOException(e);
+    catch (InterruptedException e) {
+      throw Throwables.propagate(e);
     }
   }
 }
\ No newline at end of file
diff --git a/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java b/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java
index 3c123bd..ff773fd 100644
--- a/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java
+++ b/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java
@@ -88,44 +88,48 @@
     final ResourceHolder<ByteBuffer> bufferHolder = intermediateResultsBufferPool.take();
 
     return Sequences.concat(
-        new BaseSequence<Sequence<Row>, Iterator<Sequence<Row>>>(new BaseSequence.IteratorMaker<Sequence<Row>, Iterator<Sequence<Row>>>()
-        {
-          @Override
-          public Iterator<Sequence<Row>> make()
-          {
-            return FunctionalIterator
-              .create(cursors.iterator())
-              .transform(new Function<Cursor, Sequence<Row>>()
+        new BaseSequence<Sequence<Row>, Iterator<Sequence<Row>>>(
+            new BaseSequence.IteratorMaker<Sequence<Row>, Iterator<Sequence<Row>>>()
+            {
+              @Override
+              public Iterator<Sequence<Row>> make()
+              {
+                return FunctionalIterator
+                    .create(cursors.iterator())
+                    .transform(
+                        new Function<Cursor, Sequence<Row>>()
                         {
                           @Override
                           public Sequence<Row> apply(@Nullable final Cursor cursor)
                           {
-                            return new BaseSequence<Row, CloseableIterator<Row>>(
-                                new BaseSequence.IteratorMaker<Row, CloseableIterator<Row>>()
+                            return new BaseSequence<Row, RowIterator>(
+                                new BaseSequence.IteratorMaker<Row, RowIterator>()
                                 {
                                   @Override
-                                  public  CloseableIterator<Row> make()
+                                  public RowIterator make()
                                   {
                                     return new RowIterator(query, cursor, bufferHolder.get());
                                   }
 
                                   @Override
-                                  public void cleanup(CloseableIterator iterFromMake)
+                                  public void cleanup(RowIterator iterFromMake)
                                   {
                                     Closeables.closeQuietly(iterFromMake);
                                   }
                                 }
                             );
                           }
-              });
-          }
+                        }
+                    );
+              }
 
-          @Override
-          public void cleanup(Iterator<Sequence<Row>> iterFromMake)
-          {
-            Closeables.closeQuietly(bufferHolder);
-          }
-        })
+              @Override
+              public void cleanup(Iterator<Sequence<Row>> iterFromMake)
+              {
+                Closeables.closeQuietly(bufferHolder);
+              }
+            }
+        )
     );
 
   }