Implement get methods for PrefetchableFirehose (#4956)
diff --git a/api/src/main/java/io/druid/data/input/impl/PrefetchableTextFilesFirehoseFactory.java b/api/src/main/java/io/druid/data/input/impl/PrefetchableTextFilesFirehoseFactory.java
index 96e2715..367f737 100644
--- a/api/src/main/java/io/druid/data/input/impl/PrefetchableTextFilesFirehoseFactory.java
+++ b/api/src/main/java/io/druid/data/input/impl/PrefetchableTextFilesFirehoseFactory.java
@@ -19,6 +19,7 @@
package io.druid.data.input.impl;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
@@ -164,6 +165,36 @@
this.maxFetchRetry = maxFetchRetry == null ? DEFAULT_MAX_FETCH_RETRY : maxFetchRetry;
}
+ @JsonProperty
+ public long getMaxCacheCapacityBytes()
+ {
+ return maxCacheCapacityBytes;
+ }
+
+ @JsonProperty
+ public long getMaxFetchCapacityBytes()
+ {
+ return maxFetchCapacityBytes;
+ }
+
+ @JsonProperty
+ public long getPrefetchTriggerBytes()
+ {
+ return prefetchTriggerBytes;
+ }
+
+ @JsonProperty
+ public long getFetchTimeout()
+ {
+ return fetchTimeout;
+ }
+
+ @JsonProperty
+ public int getMaxFetchRetry()
+ {
+ return maxFetchRetry;
+ }
+
@Override
public Firehose connect(StringInputRowParser firehoseParser, File temporaryDirectory) throws IOException
{
diff --git a/extensions-contrib/azure-extensions/pom.xml b/extensions-contrib/azure-extensions/pom.xml
index f259a97..e51c52b 100644
--- a/extensions-contrib/azure-extensions/pom.xml
+++ b/extensions-contrib/azure-extensions/pom.xml
@@ -61,6 +61,11 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.module</groupId>
+ <artifactId>jackson-module-guice</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
<!-- Tests -->
<dependency>
diff --git a/extensions-contrib/azure-extensions/src/main/java/io/druid/firehose/azure/AzureBlob.java b/extensions-contrib/azure-extensions/src/main/java/io/druid/firehose/azure/AzureBlob.java
index 6a4eca1..8e7a100 100644
--- a/extensions-contrib/azure-extensions/src/main/java/io/druid/firehose/azure/AzureBlob.java
+++ b/extensions-contrib/azure-extensions/src/main/java/io/druid/firehose/azure/AzureBlob.java
@@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.validation.constraints.NotNull;
+import java.util.Objects;
public class AzureBlob
@@ -62,4 +63,26 @@
+ ",path=" + path
+ "}";
}
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (o == this) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ final AzureBlob that = (AzureBlob) o;
+ return Objects.equals(container, that.container) &&
+ Objects.equals(path, that.path);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(container, path);
+ }
}
diff --git a/extensions-contrib/azure-extensions/src/main/java/io/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactory.java b/extensions-contrib/azure-extensions/src/main/java/io/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactory.java
index c891c84..986c259 100644
--- a/extensions-contrib/azure-extensions/src/main/java/io/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactory.java
+++ b/extensions-contrib/azure-extensions/src/main/java/io/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactory.java
@@ -31,6 +31,7 @@
import java.io.InputStream;
import java.util.Collection;
import java.util.List;
+import java.util.Objects;
/**
* This class is heavily inspired by the StaticS3FirehoseFactory class in the io.druid.firehose.s3 package
@@ -89,4 +90,38 @@
return new AzureByteSource(azureStorage, container, path);
}
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ final StaticAzureBlobStoreFirehoseFactory that = (StaticAzureBlobStoreFirehoseFactory) o;
+
+ return Objects.equals(blobs, that.blobs) &&
+ getMaxCacheCapacityBytes() == that.getMaxCacheCapacityBytes() &&
+ getMaxFetchCapacityBytes() == that.getMaxFetchCapacityBytes() &&
+ getPrefetchTriggerBytes() == that.getPrefetchTriggerBytes() &&
+ getFetchTimeout() == that.getFetchTimeout() &&
+ getMaxFetchRetry() == that.getMaxFetchRetry();
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(
+ blobs,
+ getMaxCacheCapacityBytes(),
+ getMaxFetchCapacityBytes(),
+ getPrefetchTriggerBytes(),
+ getFetchTimeout(),
+ getMaxFetchRetry()
+ );
+ }
}
diff --git a/extensions-contrib/azure-extensions/src/test/java/io/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactoryTest.java b/extensions-contrib/azure-extensions/src/test/java/io/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactoryTest.java
new file mode 100644
index 0000000..d805230
--- /dev/null
+++ b/extensions-contrib/azure-extensions/src/test/java/io/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactoryTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.firehose.azure;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.module.guice.ObjectMapperModule;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Binder;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Provides;
+import io.druid.initialization.DruidModule;
+import io.druid.jackson.DefaultObjectMapper;
+import io.druid.storage.azure.AzureStorage;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+public class StaticAzureBlobStoreFirehoseFactoryTest
+{
+ private static final AzureStorage STORAGE = new AzureStorage(null);
+
+ @Test
+ public void testSerde() throws IOException
+ {
+ final ObjectMapper mapper = createObjectMapper(new TestModule());
+
+ final List<AzureBlob> blobs = ImmutableList.of(
+ new AzureBlob("foo", "bar"),
+ new AzureBlob("foo", "bar2")
+ );
+
+ final StaticAzureBlobStoreFirehoseFactory factory = new StaticAzureBlobStoreFirehoseFactory(
+ STORAGE,
+ blobs,
+ 2048L,
+ 1024L,
+ 512L,
+ 100L,
+ 5
+ );
+
+ final StaticAzureBlobStoreFirehoseFactory outputFact = mapper.readValue(
+ mapper.writeValueAsString(factory),
+ StaticAzureBlobStoreFirehoseFactory.class
+ );
+
+ Assert.assertEquals(factory, outputFact);
+ }
+
+ private static ObjectMapper createObjectMapper(DruidModule baseModule)
+ {
+ final ObjectMapper baseMapper = new DefaultObjectMapper();
+ baseModule.getJacksonModules().forEach(baseMapper::registerModule);
+
+ final Injector injector = Guice.createInjector(
+ new ObjectMapperModule(),
+ baseModule
+ );
+ return injector.getInstance(ObjectMapper.class);
+ }
+
+ private static class TestModule implements DruidModule
+ {
+ @Override
+ public List<? extends Module> getJacksonModules()
+ {
+ return ImmutableList.of(new SimpleModule());
+ }
+
+ @Override
+ public void configure(Binder binder)
+ {
+
+ }
+
+ @Provides
+ public AzureStorage getRestS3Service()
+ {
+ return STORAGE;
+ }
+ }
+}
diff --git a/extensions-contrib/cloudfiles-extensions/pom.xml b/extensions-contrib/cloudfiles-extensions/pom.xml
index 1958236..dd70ff5 100644
--- a/extensions-contrib/cloudfiles-extensions/pom.xml
+++ b/extensions-contrib/cloudfiles-extensions/pom.xml
@@ -112,6 +112,11 @@
<artifactId>rackspace-cloudfiles-uk</artifactId>
<version>${jclouds.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.module</groupId>
+ <artifactId>jackson-module-guice</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
<!-- Tests -->
<dependency>
diff --git a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/CloudFilesBlob.java b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/CloudFilesBlob.java
index fb013ed..925a7c3 100644
--- a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/CloudFilesBlob.java
+++ b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/CloudFilesBlob.java
@@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.validation.constraints.NotNull;
+import java.util.Objects;
public class CloudFilesBlob
{
@@ -72,4 +73,27 @@
+ ",region=" + region
+ "}";
}
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ final CloudFilesBlob that = (CloudFilesBlob) o;
+ return Objects.equals(container, that.container) &&
+ Objects.equals(path, that.path) &&
+ Objects.equals(region, that.region);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(container, path, region);
+ }
}
diff --git a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactory.java b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactory.java
index 29ecfb3..c94c173 100644
--- a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactory.java
+++ b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactory.java
@@ -33,6 +33,7 @@
import java.io.InputStream;
import java.util.Collection;
import java.util.List;
+import java.util.Objects;
public class StaticCloudFilesFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<CloudFilesBlob>
{
@@ -91,4 +92,37 @@
{
return object.getPath().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream;
}
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (o == this) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ final StaticCloudFilesFirehoseFactory that = (StaticCloudFilesFirehoseFactory) o;
+ return Objects.equals(blobs, that.blobs) &&
+ getMaxCacheCapacityBytes() == that.getMaxCacheCapacityBytes() &&
+ getMaxFetchCapacityBytes() == that.getMaxFetchCapacityBytes() &&
+ getPrefetchTriggerBytes() == that.getPrefetchTriggerBytes() &&
+ getFetchTimeout() == that.getFetchTimeout() &&
+ getMaxFetchRetry() == that.getMaxFetchRetry();
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(
+ blobs,
+ getMaxCacheCapacityBytes(),
+ getMaxFetchCapacityBytes(),
+ getPrefetchTriggerBytes(),
+ getFetchTimeout(),
+ getMaxFetchRetry()
+ );
+ }
}
diff --git a/extensions-contrib/cloudfiles-extensions/src/test/java/io/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactoryTest.java b/extensions-contrib/cloudfiles-extensions/src/test/java/io/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactoryTest.java
new file mode 100644
index 0000000..a3eaf62
--- /dev/null
+++ b/extensions-contrib/cloudfiles-extensions/src/test/java/io/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactoryTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.firehose.cloudfiles;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.module.guice.ObjectMapperModule;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Binder;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Provides;
+import io.druid.initialization.DruidModule;
+import io.druid.jackson.DefaultObjectMapper;
+import org.easymock.EasyMock;
+import org.jclouds.rackspace.cloudfiles.v1.CloudFilesApi;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+public class StaticCloudFilesFirehoseFactoryTest
+{
+ private static final CloudFilesApi API = EasyMock.niceMock(CloudFilesApi.class);
+
+ @Test
+ public void testSerde() throws IOException
+ {
+ final ObjectMapper mapper = createObjectMapper(new TestModule());
+
+ final List<CloudFilesBlob> blobs = ImmutableList.of(
+ new CloudFilesBlob("container", "foo", "bar"),
+ new CloudFilesBlob("container", "foo", "bar2")
+ );
+
+ final StaticCloudFilesFirehoseFactory factory = new StaticCloudFilesFirehoseFactory(
+ API,
+ blobs,
+ 2048L,
+ 1024L,
+ 512L,
+ 100L,
+ 5
+ );
+
+ final StaticCloudFilesFirehoseFactory outputFact = mapper.readValue(
+ mapper.writeValueAsString(factory),
+ StaticCloudFilesFirehoseFactory.class
+ );
+
+ Assert.assertEquals(factory, outputFact);
+ }
+
+ private static ObjectMapper createObjectMapper(DruidModule baseModule)
+ {
+ final ObjectMapper baseMapper = new DefaultObjectMapper();
+ baseModule.getJacksonModules().forEach(baseMapper::registerModule);
+
+ final Injector injector = Guice.createInjector(
+ new ObjectMapperModule(),
+ baseModule
+ );
+ return injector.getInstance(ObjectMapper.class);
+ }
+
+ private static class TestModule implements DruidModule
+ {
+ @Override
+ public List<? extends Module> getJacksonModules()
+ {
+ return ImmutableList.of(new SimpleModule());
+ }
+
+ @Override
+ public void configure(Binder binder)
+ {
+
+ }
+
+ @Provides
+ public CloudFilesApi getRestS3Service()
+ {
+ return API;
+ }
+ }
+}
diff --git a/extensions-contrib/google-extensions/pom.xml b/extensions-contrib/google-extensions/pom.xml
index d990a37..55c50fb 100644
--- a/extensions-contrib/google-extensions/pom.xml
+++ b/extensions-contrib/google-extensions/pom.xml
@@ -51,6 +51,11 @@
<artifactId>google-http-client-jackson2</artifactId>
<version>1.22.0</version>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.module</groupId>
+ <artifactId>jackson-module-guice</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
<!-- Tests -->
<dependency>
diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/GoogleBlob.java b/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/GoogleBlob.java
index 053a456..ea1e395 100644
--- a/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/GoogleBlob.java
+++ b/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/GoogleBlob.java
@@ -22,6 +22,8 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Objects;
+
public class GoogleBlob
{
private final String bucket;
@@ -54,5 +56,21 @@
+ ",path=" + path
+ "}";
}
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ final GoogleBlob that = (GoogleBlob) o;
+ return Objects.equals(bucket, that.bucket) &&
+ Objects.equals(path, that.path);
+ }
}
diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactory.java b/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactory.java
index 632850f..965fb0f 100644
--- a/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactory.java
+++ b/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactory.java
@@ -31,6 +31,7 @@
import java.io.InputStream;
import java.util.Collection;
import java.util.List;
+import java.util.Objects;
public class StaticGoogleBlobStoreFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<GoogleBlob>
{
@@ -81,5 +82,38 @@
{
return object.getPath().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream;
}
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ final StaticGoogleBlobStoreFirehoseFactory that = (StaticGoogleBlobStoreFirehoseFactory) o;
+
+ return Objects.equals(blobs, that.blobs) &&
+ getMaxCacheCapacityBytes() == that.getMaxCacheCapacityBytes() &&
+ getMaxFetchCapacityBytes() == that.getMaxFetchCapacityBytes() &&
+ getPrefetchTriggerBytes() == that.getPrefetchTriggerBytes() &&
+ getFetchTimeout() == that.getFetchTimeout() &&
+ getMaxFetchRetry() == that.getMaxFetchRetry();
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(
+ blobs,
+ getMaxCacheCapacityBytes(),
+ getMaxFetchCapacityBytes(),
+ getPrefetchTriggerBytes(),
+ getFetchTimeout(),
+ getMaxFetchRetry()
+ );
+ }
}
diff --git a/extensions-contrib/google-extensions/src/test/java/io/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactoryTest.java b/extensions-contrib/google-extensions/src/test/java/io/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactoryTest.java
new file mode 100644
index 0000000..511b50d
--- /dev/null
+++ b/extensions-contrib/google-extensions/src/test/java/io/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactoryTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.firehose.google;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.module.guice.ObjectMapperModule;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Binder;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Provides;
+import io.druid.initialization.DruidModule;
+import io.druid.jackson.DefaultObjectMapper;
+import io.druid.storage.google.GoogleStorage;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+public class StaticGoogleBlobStoreFirehoseFactoryTest
+{
+ private static final GoogleStorage STORAGE = new GoogleStorage(null);
+
+ @Test
+ public void testSerde() throws IOException
+ {
+ final ObjectMapper mapper = createObjectMapper(new TestGoogleModule());
+
+ final List<GoogleBlob> blobs = ImmutableList.of(
+ new GoogleBlob("foo", "bar"),
+ new GoogleBlob("foo", "bar2")
+ );
+
+ final StaticGoogleBlobStoreFirehoseFactory factory = new StaticGoogleBlobStoreFirehoseFactory(
+ STORAGE,
+ blobs,
+ 2048L,
+ 1024L,
+ 512L,
+ 100L,
+ 5
+ );
+
+ final StaticGoogleBlobStoreFirehoseFactory outputFact = mapper.readValue(
+ mapper.writeValueAsString(factory),
+ StaticGoogleBlobStoreFirehoseFactory.class
+ );
+
+ Assert.assertEquals(factory, outputFact);
+ }
+
+ private static ObjectMapper createObjectMapper(DruidModule baseModule)
+ {
+ final ObjectMapper baseMapper = new DefaultObjectMapper();
+ baseModule.getJacksonModules().forEach(baseMapper::registerModule);
+
+ final Injector injector = Guice.createInjector(
+ new ObjectMapperModule(),
+ baseModule
+ );
+ return injector.getInstance(ObjectMapper.class);
+ }
+
+ private static class TestGoogleModule implements DruidModule
+ {
+ @Override
+ public List<? extends Module> getJacksonModules()
+ {
+ return ImmutableList.of(new SimpleModule());
+ }
+
+ @Override
+ public void configure(Binder binder)
+ {
+
+ }
+
+ @Provides
+ public GoogleStorage getRestS3Service()
+ {
+ return STORAGE;
+ }
+ }
+}
diff --git a/extensions-core/s3-extensions/pom.xml b/extensions-core/s3-extensions/pom.xml
index b12dd46..2a23d9c 100644
--- a/extensions-core/s3-extensions/pom.xml
+++ b/extensions-core/s3-extensions/pom.xml
@@ -61,6 +61,12 @@
<artifactId>commons-io</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.module</groupId>
+ <artifactId>jackson-module-guice</artifactId>
+ <version>${jackson.version}</version>
+ <scope>provided</scope>
+ </dependency>
<!-- Tests -->
<dependency>
diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/firehose/s3/StaticS3FirehoseFactory.java b/extensions-core/s3-extensions/src/main/java/io/druid/firehose/s3/StaticS3FirehoseFactory.java
index 89e281e..7ffaae8 100644
--- a/extensions-core/s3-extensions/src/main/java/io/druid/firehose/s3/StaticS3FirehoseFactory.java
+++ b/extensions-core/s3-extensions/src/main/java/io/druid/firehose/s3/StaticS3FirehoseFactory.java
@@ -40,6 +40,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
/**
@@ -205,15 +206,28 @@
return false;
}
- StaticS3FirehoseFactory factory = (StaticS3FirehoseFactory) o;
+ StaticS3FirehoseFactory that = (StaticS3FirehoseFactory) o;
- return getUris().equals(factory.getUris());
-
+ return Objects.equals(uris, that.uris) &&
+ Objects.equals(prefixes, that.prefixes) &&
+ getMaxCacheCapacityBytes() == that.getMaxCacheCapacityBytes() &&
+ getMaxFetchCapacityBytes() == that.getMaxFetchCapacityBytes() &&
+ getPrefetchTriggerBytes() == that.getPrefetchTriggerBytes() &&
+ getFetchTimeout() == that.getFetchTimeout() &&
+ getMaxFetchRetry() == that.getMaxFetchRetry();
}
@Override
public int hashCode()
{
- return getUris().hashCode();
+ return Objects.hash(
+ uris,
+ prefixes,
+ getMaxCacheCapacityBytes(),
+ getMaxFetchCapacityBytes(),
+ getPrefetchTriggerBytes(),
+ getFetchTimeout(),
+ getMaxFetchRetry()
+ );
}
}
diff --git a/extensions-core/s3-extensions/src/test/java/io/druid/firehose/s3/StaticS3FirehoseFactoryTest.java b/extensions-core/s3-extensions/src/test/java/io/druid/firehose/s3/StaticS3FirehoseFactoryTest.java
index 79a0355..3b18984 100644
--- a/extensions-core/s3-extensions/src/test/java/io/druid/firehose/s3/StaticS3FirehoseFactoryTest.java
+++ b/extensions-core/s3-extensions/src/test/java/io/druid/firehose/s3/StaticS3FirehoseFactoryTest.java
@@ -19,11 +19,19 @@
package io.druid.firehose.s3;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
+
+
+import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.module.guice.ObjectMapperModule;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Binder;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Provides;
+import io.druid.initialization.DruidModule;
import io.druid.jackson.DefaultObjectMapper;
-import org.easymock.EasyMock;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.junit.Assert;
import org.junit.Test;
@@ -36,38 +44,67 @@
*/
public class StaticS3FirehoseFactoryTest
{
+ private static final RestS3Service SERVICE = new RestS3Service(null);
+
@Test
public void testSerde() throws Exception
{
- ObjectMapper mapper = new DefaultObjectMapper();
+ final ObjectMapper mapper = createObjectMapper(new TestS3Module());
final List<URI> uris = Arrays.asList(
new URI("s3://foo/bar/file.gz"),
new URI("s3://bar/foo/file2.gz")
);
- TestStaticS3FirehoseFactory factory = new TestStaticS3FirehoseFactory(
- uris
+ final StaticS3FirehoseFactory factory = new StaticS3FirehoseFactory(
+ SERVICE,
+ uris,
+ null,
+ 2048L,
+ 1024L,
+ 512L,
+ 100L,
+ 5
);
- TestStaticS3FirehoseFactory outputFact = mapper.readValue(
+ final StaticS3FirehoseFactory outputFact = mapper.readValue(
mapper.writeValueAsString(factory),
- TestStaticS3FirehoseFactory.class
+ StaticS3FirehoseFactory.class
);
Assert.assertEquals(factory, outputFact);
- Assert.assertEquals(uris, outputFact.getUris());
}
- // This class is a workaround for the injectable value that StaticS3FirehoseFactory requires
- private static class TestStaticS3FirehoseFactory extends StaticS3FirehoseFactory
+ private static ObjectMapper createObjectMapper(DruidModule baseModule)
{
- @JsonCreator
- public TestStaticS3FirehoseFactory(
- @JsonProperty("uris") List<URI> uris
- )
+ final ObjectMapper baseMapper = new DefaultObjectMapper();
+ baseModule.getJacksonModules().forEach(baseMapper::registerModule);
+
+ final Injector injector = Guice.createInjector(
+ new ObjectMapperModule(),
+ baseModule
+ );
+ return injector.getInstance(ObjectMapper.class);
+ }
+
+ private static class TestS3Module implements DruidModule
+ {
+ @Override
+ public List<? extends Module> getJacksonModules()
{
- super(EasyMock.niceMock(RestS3Service.class), uris, null, null, null, null, null, null);
+ return ImmutableList.of(new SimpleModule());
+ }
+
+ @Override
+ public void configure(Binder binder)
+ {
+
+ }
+
+ @Provides
+ public RestS3Service getRestS3Service()
+ {
+ return SERVICE;
}
}
}
diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/HttpFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/HttpFirehoseFactory.java
index 4a604ae..f112a0d 100644
--- a/server/src/main/java/io/druid/segment/realtime/firehose/HttpFirehoseFactory.java
+++ b/server/src/main/java/io/druid/segment/realtime/firehose/HttpFirehoseFactory.java
@@ -29,6 +29,7 @@
import java.net.URI;
import java.util.Collection;
import java.util.List;
+import java.util.Objects;
public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<URI>
{
@@ -71,4 +72,37 @@
{
return object.getPath().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream;
}
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ final HttpFirehoseFactory that = (HttpFirehoseFactory) o;
+ return Objects.equals(uris, that.uris) &&
+ getMaxCacheCapacityBytes() == that.getMaxCacheCapacityBytes() &&
+ getMaxFetchCapacityBytes() == that.getMaxFetchCapacityBytes() &&
+ getPrefetchTriggerBytes() == that.getPrefetchTriggerBytes() &&
+ getFetchTimeout() == that.getFetchTimeout() &&
+ getMaxFetchRetry() == that.getMaxFetchRetry();
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(
+ uris,
+ getMaxCacheCapacityBytes(),
+ getMaxFetchCapacityBytes(),
+ getPrefetchTriggerBytes(),
+ getFetchTimeout(),
+ getMaxFetchRetry()
+ );
+ }
}
diff --git a/server/src/test/java/io/druid/segment/realtime/firehose/HttpFirehoseFactoryTest.java b/server/src/test/java/io/druid/segment/realtime/firehose/HttpFirehoseFactoryTest.java
new file mode 100644
index 0000000..7ca132b
--- /dev/null
+++ b/server/src/test/java/io/druid/segment/realtime/firehose/HttpFirehoseFactoryTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.segment.realtime.firehose;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import io.druid.jackson.DefaultObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URI;
+
+public class HttpFirehoseFactoryTest
+{
+ @Test
+ public void testSerde() throws IOException
+ {
+ final ObjectMapper mapper = new DefaultObjectMapper();
+ final HttpFirehoseFactory factory = new HttpFirehoseFactory(
+ ImmutableList.of(URI.create("http://foo/bar"), URI.create("http://foo/bar2")),
+ 2048L,
+ 1024L,
+ 512L,
+ 100L,
+ 5
+ );
+
+ final HttpFirehoseFactory outputFact = mapper.readValue(
+ mapper.writeValueAsString(factory),
+ HttpFirehoseFactory.class
+ );
+
+ Assert.assertEquals(factory, outputFact);
+ }
+}