fix parent pom reference
simplify thread start
diff --git a/local/mongo-elasticsearch-sync/pom.xml b/local/mongo-elasticsearch-sync/pom.xml
index 79470ee..4d271e0 100644
--- a/local/mongo-elasticsearch-sync/pom.xml
+++ b/local/mongo-elasticsearch-sync/pom.xml
@@ -22,8 +22,9 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.streams</groupId>
- <artifactId>streams-examples</artifactId>
+ <artifactId>streams-examples-local</artifactId>
<version>0.2-incubating-SNAPSHOT</version>
+ <relativePath>..</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/local/mongo-elasticsearch-sync/src/main/java/org/apache/streams/example/elasticsearch/MongoElasticsearchSync.java b/local/mongo-elasticsearch-sync/src/main/java/org/apache/streams/example/elasticsearch/MongoElasticsearchSync.java
index 568a1a9..fccbf47 100644
--- a/local/mongo-elasticsearch-sync/src/main/java/org/apache/streams/example/elasticsearch/MongoElasticsearchSync.java
+++ b/local/mongo-elasticsearch-sync/src/main/java/org/apache/streams/example/elasticsearch/MongoElasticsearchSync.java
@@ -45,19 +45,10 @@
private final static Logger LOGGER = LoggerFactory.getLogger(MongoElasticsearchSync.class);
- protected static ListeningExecutorService executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
-
- private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
- return new ThreadPoolExecutor(nThreads, nThreads,
- 5000L, TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
- }
-
MongoElasticsearchSyncConfiguration config;
public MongoElasticsearchSync() {
this(new ComponentConfigurator<MongoElasticsearchSyncConfiguration>(MongoElasticsearchSyncConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
-
}
public MongoElasticsearchSync(MongoElasticsearchSyncConfiguration config) {
@@ -68,7 +59,9 @@
{
LOGGER.info(StreamsConfigurator.config.toString());
- executor.submit(new MongoElasticsearchSync());
+ MongoElasticsearchSync sync = new MongoElasticsearchSync();
+
+ new Thread(sync).start();
}
@@ -80,6 +73,7 @@
ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination());
Map<String, Object> streamConfig = Maps.newHashMap();
+ streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID);
streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 7 * 24 * 60 * 1000);
StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig);