Merge pull request #216 from metamx/announce-fix
Make batch data segment announcer thread safe
diff --git a/indexing-common/src/main/java/com/metamx/druid/common/s3/S3Utils.java b/indexing-common/src/main/java/com/metamx/druid/common/s3/S3Utils.java
index 15fa7c8..84ce35d 100644
--- a/indexing-common/src/main/java/com/metamx/druid/common/s3/S3Utils.java
+++ b/indexing-common/src/main/java/com/metamx/druid/common/s3/S3Utils.java
@@ -53,7 +53,7 @@
* Retries S3 operations that fail due to io-related exceptions. Service-level exceptions (access denied, file not
* found, etc) are not retried.
*/
- public static <T> T retryS3Operation(Callable<T> f) throws ServiceException, InterruptedException
+ public static <T> T retryS3Operation(Callable<T> f) throws IOException, ServiceException, InterruptedException
{
int nTry = 0;
final int maxTries = 3;
@@ -66,7 +66,7 @@
if (nTry <= maxTries) {
awaitNextRetry(e, nTry);
} else {
- throw Throwables.propagate(e);
+ throw e;
}
}
catch (ServiceException e) {
diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/S3TaskLogs.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/S3TaskLogs.java
index a09f2ef..89c1b6d 100644
--- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/S3TaskLogs.java
+++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/S3TaskLogs.java
@@ -5,6 +5,7 @@
import com.google.common.base.Throwables;
import com.google.common.io.InputSupplier;
import com.metamx.common.logger.Logger;
+import com.metamx.druid.common.s3.S3Utils;
import org.jets3t.service.ServiceException;
import org.jets3t.service.StorageService;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
@@ -13,6 +14,7 @@
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.util.concurrent.Callable;
/**
* Provides task logs archived on S3.
@@ -86,16 +88,25 @@
}
}
- public void pushTaskLog(String taskid, File logFile) throws IOException
+ public void pushTaskLog(final String taskid, final File logFile) throws IOException
{
final String taskKey = getTaskLogKey(taskid);
+ log.info("Pushing task log %s to: %s", logFile, taskKey);
try {
- log.info("Pushing task log %s to: %s", logFile, taskKey);
-
- final StorageObject object = new StorageObject(logFile);
- object.setKey(taskKey);
- service.putObject(bucket, object);
+ S3Utils.retryS3Operation(
+ new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ final StorageObject object = new StorageObject(logFile);
+ object.setKey(taskKey);
+ service.putObject(bucket, object);
+ return null;
+ }
+ }
+ );
}
catch (Exception e) {
Throwables.propagateIfInstanceOf(e, IOException.class);
diff --git a/pom.xml b/pom.xml
index 3b81a46..fd320e4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -38,7 +38,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <metamx.java-util.version>0.22.6</metamx.java-util.version>
+ <metamx.java-util.version>0.23.0</metamx.java-util.version>
<apache.curator.version>2.1.0-incubating</apache.curator.version>
</properties>
diff --git a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java
index 75d2128..c6a82ff 100644
--- a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java
+++ b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java
@@ -40,7 +40,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
-import java.util.Random;
import java.util.concurrent.Callable;
import java.util.zip.GZIPInputStream;
@@ -154,6 +153,9 @@
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
+ catch (IOException e) {
+ throw new SegmentLoadingException(e, "S3 fail! Key[%s]", coords);
+ }
catch (ServiceException e) {
throw new SegmentLoadingException(e, "S3 fail! Key[%s]", coords);
}
@@ -179,6 +181,9 @@
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
+ catch (IOException e) {
+ throw new SegmentLoadingException(e, e.getMessage());
+ }
catch (ServiceException e) {
throw new SegmentLoadingException(e, e.getMessage());
}
diff --git a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPusher.java b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPusher.java
index d9ac69e..a5462a8 100644
--- a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPusher.java
+++ b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPusher.java
@@ -21,21 +21,23 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
+import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.metamx.druid.client.DataSegment;
+import com.metamx.druid.common.s3.S3Utils;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.utils.CompressionUtils;
import com.metamx.emitter.EmittingLogger;
-import org.jets3t.service.S3ServiceException;
+import org.jets3t.service.ServiceException;
import org.jets3t.service.acl.gs.GSAccessControlList;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;
import java.io.File;
import java.io.IOException;
-import java.security.NoSuchAlgorithmException;
+import java.util.concurrent.Callable;
public class S3DataSegmentPusher implements DataSegmentPusher
{
@@ -63,61 +65,76 @@
}
@Override
- public DataSegment push(final File indexFilesDir, DataSegment segment) throws IOException
+ public DataSegment push(final File indexFilesDir, final DataSegment inSegment) throws IOException
{
log.info("Uploading [%s] to S3", indexFilesDir);
- String outputKey = JOINER.join(
+ final String outputKey = JOINER.join(
config.getBaseKey().isEmpty() ? null : config.getBaseKey(),
- DataSegmentPusherUtil.getStorageDir(segment)
+ DataSegmentPusherUtil.getStorageDir(inSegment)
);
-
final File zipOutFile = File.createTempFile("druid", "index.zip");
- long indexSize = CompressionUtils.zip(indexFilesDir, zipOutFile);
+ final long indexSize = CompressionUtils.zip(indexFilesDir, zipOutFile);
try {
- S3Object toPush = new S3Object(zipOutFile);
+ return S3Utils.retryS3Operation(
+ new Callable<DataSegment>()
+ {
+ @Override
+ public DataSegment call() throws Exception
+ {
+ S3Object toPush = new S3Object(zipOutFile);
- final String outputBucket = config.getBucket();
- toPush.setBucketName(outputBucket);
- toPush.setKey(outputKey + "/index.zip");
- if (!config.getDisableAcl()) {
- toPush.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
- }
+ final String outputBucket = config.getBucket();
+ toPush.setBucketName(outputBucket);
+ toPush.setKey(outputKey + "/index.zip");
+ if (!config.getDisableAcl()) {
+ toPush.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
+ }
- log.info("Pushing %s.", toPush);
- s3Client.putObject(outputBucket, toPush);
+ log.info("Pushing %s.", toPush);
+ s3Client.putObject(outputBucket, toPush);
- segment = segment.withSize(indexSize)
- .withLoadSpec(
- ImmutableMap.<String, Object>of("type", "s3_zip", "bucket", outputBucket, "key", toPush.getKey())
- )
- .withBinaryVersion(IndexIO.getVersionFromDir(indexFilesDir));
+ final DataSegment outSegment = inSegment.withSize(indexSize)
+ .withLoadSpec(
+ ImmutableMap.<String, Object>of(
+ "type",
+ "s3_zip",
+ "bucket",
+ outputBucket,
+ "key",
+ toPush.getKey()
+ )
+ )
+ .withBinaryVersion(IndexIO.getVersionFromDir(indexFilesDir));
- File descriptorFile = File.createTempFile("druid", "descriptor.json");
- Files.copy(ByteStreams.newInputStreamSupplier(jsonMapper.writeValueAsBytes(segment)), descriptorFile);
- S3Object descriptorObject = new S3Object(descriptorFile);
- descriptorObject.setBucketName(outputBucket);
- descriptorObject.setKey(outputKey + "/descriptor.json");
- if (!config.getDisableAcl()) {
- descriptorObject.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
- }
+ File descriptorFile = File.createTempFile("druid", "descriptor.json");
+ Files.copy(ByteStreams.newInputStreamSupplier(jsonMapper.writeValueAsBytes(inSegment)), descriptorFile);
+ S3Object descriptorObject = new S3Object(descriptorFile);
+ descriptorObject.setBucketName(outputBucket);
+ descriptorObject.setKey(outputKey + "/descriptor.json");
+ if (!config.getDisableAcl()) {
+ descriptorObject.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
+ }
- log.info("Pushing %s", descriptorObject);
- s3Client.putObject(outputBucket, descriptorObject);
+ log.info("Pushing %s", descriptorObject);
+ s3Client.putObject(outputBucket, descriptorObject);
- log.info("Deleting zipped index File[%s]", zipOutFile);
- zipOutFile.delete();
+ log.info("Deleting zipped index File[%s]", zipOutFile);
+ zipOutFile.delete();
- log.info("Deleting descriptor file[%s]", descriptorFile);
- descriptorFile.delete();
+ log.info("Deleting descriptor file[%s]", descriptorFile);
+ descriptorFile.delete();
- return segment;
+ return outSegment;
+ }
+ }
+ );
}
- catch (NoSuchAlgorithmException e) {
+ catch (ServiceException e) {
throw new IOException(e);
}
- catch (S3ServiceException e) {
- throw new IOException(e);
+ catch (InterruptedException e) {
+ throw Throwables.propagate(e);
}
}
}
\ No newline at end of file
diff --git a/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java b/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java
index 3c123bd..ff773fd 100644
--- a/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java
+++ b/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java
@@ -88,44 +88,48 @@
final ResourceHolder<ByteBuffer> bufferHolder = intermediateResultsBufferPool.take();
return Sequences.concat(
- new BaseSequence<Sequence<Row>, Iterator<Sequence<Row>>>(new BaseSequence.IteratorMaker<Sequence<Row>, Iterator<Sequence<Row>>>()
- {
- @Override
- public Iterator<Sequence<Row>> make()
- {
- return FunctionalIterator
- .create(cursors.iterator())
- .transform(new Function<Cursor, Sequence<Row>>()
+ new BaseSequence<Sequence<Row>, Iterator<Sequence<Row>>>(
+ new BaseSequence.IteratorMaker<Sequence<Row>, Iterator<Sequence<Row>>>()
+ {
+ @Override
+ public Iterator<Sequence<Row>> make()
+ {
+ return FunctionalIterator
+ .create(cursors.iterator())
+ .transform(
+ new Function<Cursor, Sequence<Row>>()
{
@Override
public Sequence<Row> apply(@Nullable final Cursor cursor)
{
- return new BaseSequence<Row, CloseableIterator<Row>>(
- new BaseSequence.IteratorMaker<Row, CloseableIterator<Row>>()
+ return new BaseSequence<Row, RowIterator>(
+ new BaseSequence.IteratorMaker<Row, RowIterator>()
{
@Override
- public CloseableIterator<Row> make()
+ public RowIterator make()
{
return new RowIterator(query, cursor, bufferHolder.get());
}
@Override
- public void cleanup(CloseableIterator iterFromMake)
+ public void cleanup(RowIterator iterFromMake)
{
Closeables.closeQuietly(iterFromMake);
}
}
);
}
- });
- }
+ }
+ );
+ }
- @Override
- public void cleanup(Iterator<Sequence<Row>> iterFromMake)
- {
- Closeables.closeQuietly(bufferHolder);
- }
- })
+ @Override
+ public void cleanup(Iterator<Sequence<Row>> iterFromMake)
+ {
+ Closeables.closeQuietly(bufferHolder);
+ }
+ }
+ )
);
}