incorporated feedback from https://github.com/apache/incubator-streams-examples/pull/1
diff --git a/local/elasticsearch-reindex/pom.xml b/local/elasticsearch-reindex/pom.xml
index 4bb6be3..a75fa2a 100644
--- a/local/elasticsearch-reindex/pom.xml
+++ b/local/elasticsearch-reindex/pom.xml
@@ -99,6 +99,21 @@
<version>0.2-incubating-SNAPSHOT</version>
</dependency>
<dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jul-to-slf4j</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ <dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
@@ -152,6 +167,10 @@
<bannedDependencies>
<excludes>
<exclude>org.slf4j:slf4j-log4j12</exclude>
+ <exclude>org.slf4j:slf4j-jcl</exclude>
+ <exclude>org.slf4j:slf4j-jdk14</exclude>
+ <exclude>org.log4j:log4j</exclude>
+ <exclude>commons-logging:commons-logging</exclude>
</excludes>
</bannedDependencies>
</rules>
diff --git a/local/elasticsearch-reindex/src/main/java/org/apache/streams/elasticsearch/example/ElasticsearchReindex.java b/local/elasticsearch-reindex/src/main/java/org/apache/streams/elasticsearch/example/ElasticsearchReindex.java
index 37e178c..dc94773 100644
--- a/local/elasticsearch-reindex/src/main/java/org/apache/streams/elasticsearch/example/ElasticsearchReindex.java
+++ b/local/elasticsearch-reindex/src/main/java/org/apache/streams/elasticsearch/example/ElasticsearchReindex.java
@@ -44,41 +44,36 @@
private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindex.class);
- protected static ListeningExecutorService executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
-
- private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
- return new ThreadPoolExecutor(nThreads, nThreads,
- 5000L, TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
- }
-
- ElasticsearchReindexConfiguration reindex;
+ ElasticsearchReindexConfiguration config;
public ElasticsearchReindex() {
- this(new ComponentConfigurator<ElasticsearchReindexConfiguration>(ElasticsearchReindexConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+ this(new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
}
public ElasticsearchReindex(ElasticsearchReindexConfiguration reindex) {
- this.reindex = reindex;
+ this.config = reindex;
}
public static void main(String[] args)
{
LOGGER.info(StreamsConfigurator.config.toString());
- executor.submit(new ElasticsearchReindex());
+ ElasticsearchReindex reindex = new ElasticsearchReindex();
+
+ new Thread(reindex).start();
}
@Override
public void run() {
- ElasticsearchPersistReader elasticsearchPersistReader = new ElasticsearchPersistReader(reindex.getSource());
+ ElasticsearchPersistReader elasticsearchPersistReader = new ElasticsearchPersistReader(config.getSource());
- ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(reindex.getDestination());
+ ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination());
Map<String, Object> streamConfig = Maps.newHashMap();
+ streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID);
streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 7 * 24 * 60 * 1000);
StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig);