refactored to use streams-filters for routing Deletes to PersistDeleter
diff --git a/local/twitter-userstream-elasticsearch/pom.xml b/local/twitter-userstream-elasticsearch/pom.xml
index 1a3074c..f438e77 100644
--- a/local/twitter-userstream-elasticsearch/pom.xml
+++ b/local/twitter-userstream-elasticsearch/pom.xml
@@ -81,6 +81,11 @@
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
+ <artifactId>streams-filters</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
<artifactId>streams-provider-twitter</artifactId>
<version>${project.version}</version>
</dependency>
@@ -240,4 +245,44 @@
</plugin>
</plugins>
</build>
+
+ <profiles>
+ <profile>
+ <id>docker</id>
+ <build>
+ <plugins>
+ <plugin>
+ <!-- The Docker Maven plugin is used to create docker image with the fat jar -->
+ <groupId>org.jolokia</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ <version>0.11.0</version>
+ <configuration>
+ <images>
+
+ <image>
+ <alias>${project.artifactId}</alias>
+ <name>${project.artifactId}:${project.version}</name>
+ <build>
+ <from>dockerfile/java:oracle-java8</from>
+ <assembly>
+ <basedir>/</basedir>
+ <descriptorRef>artifact</descriptorRef>
+ </assembly>
+ <!-- Default command for the build image -->
+ </build>
+
+ </image>
+
+ </images>
+ </configuration>
+
+ </plugin>
+
+ </plugins>
+ </build>
+ <activation>
+ <activeByDefault/>
+ </activation>
+ </profile>
+ </profiles>
</project>
diff --git a/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/twitter/example/TwitterUserstreamElasticsearch.java b/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/twitter/example/TwitterUserstreamElasticsearch.java
index f24b1c4..2e7d2cd 100644
--- a/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/twitter/example/TwitterUserstreamElasticsearch.java
+++ b/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/twitter/example/TwitterUserstreamElasticsearch.java
@@ -18,45 +18,26 @@
package org.apache.streams.twitter.example;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
+import com.google.common.collect.Sets;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.data.util.ActivityUtil;
-import org.apache.streams.elasticsearch.ElasticsearchConfigurator;
+import org.apache.streams.converter.ActivityConverterProcessor;
import org.apache.streams.elasticsearch.ElasticsearchPersistDeleter;
import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
import org.apache.streams.example.twitter.TwitterUserstreamElasticsearchConfiguration;
-import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.filters.VerbDefinitionDropFilter;
+import org.apache.streams.filters.VerbDefinitionKeepFilter;
import org.apache.streams.local.builders.LocalStreamBuilder;
import org.apache.streams.core.StreamBuilder;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.ActivityObject;
-import org.apache.streams.pojo.json.Actor;
-import org.apache.streams.pojo.json.Delete;
-import org.apache.streams.pojo.json.Follow;
-import org.apache.streams.pojo.json.Page;
import org.apache.streams.twitter.TwitterStreamConfiguration;
-import org.apache.streams.twitter.pojo.FriendList;
-import org.apache.streams.twitter.pojo.UserstreamEvent;
-import org.apache.streams.twitter.processor.TwitterTypeConverter;
-import org.apache.streams.twitter.provider.TwitterConfigurator;
import org.apache.streams.twitter.provider.TwitterStreamProvider;
-import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
-import org.apache.streams.twitter.serializer.util.TwitterActivityUtil;
+import org.apache.streams.verbs.VerbDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
/**
* Example stream that populates elasticsearch with activities from twitter userstream in real-time
@@ -67,6 +48,8 @@
private final static Logger LOGGER = LoggerFactory.getLogger(TwitterUserstreamElasticsearch.class);
+ private static VerbDefinition deleteVerbDefinition = new VerbDefinition().withValue("post");
+
TwitterUserstreamElasticsearchConfiguration config;
public TwitterUserstreamElasticsearch() {
@@ -94,67 +77,25 @@
ElasticsearchWriterConfiguration elasticsearchWriterConfiguration = config.getElasticsearch();
TwitterStreamProvider stream = new TwitterStreamProvider(twitterStreamConfiguration);
- TwitterTypeConverter converter = new TwitterTypeConverter(ObjectNode.class, Activity.class);
+ ActivityConverterProcessor converter = new ActivityConverterProcessor();
+ VerbDefinitionDropFilter noDeletesProcessor = new VerbDefinitionDropFilter(Sets.newHashSet(deleteVerbDefinition));
ElasticsearchPersistWriter writer = new ElasticsearchPersistWriter(elasticsearchWriterConfiguration);
- DeleteOnlyProcessor deleteOnlyProcessor = new DeleteOnlyProcessor();
- NoDeletesProcessor noDeletesProcessor = new NoDeletesProcessor();
+ VerbDefinitionKeepFilter deleteOnlyProcessor = new VerbDefinitionKeepFilter(Sets.newHashSet(deleteVerbDefinition));
ElasticsearchPersistDeleter deleter = new ElasticsearchPersistDeleter(elasticsearchWriterConfiguration);
Map<String, Object> streamConfig = Maps.newHashMap();
streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 12 * 60 * 1000);
- StreamBuilder builder = new LocalStreamBuilder(25);
+ StreamBuilder builder = new LocalStreamBuilder(25, streamConfig);
builder.newPerpetualStream(TwitterStreamProvider.STREAMS_ID, stream);
builder.addStreamsProcessor("converter", converter, 2, TwitterStreamProvider.STREAMS_ID);
builder.addStreamsProcessor("NoDeletesProcessor", noDeletesProcessor, 1, "converter");
builder.addStreamsPersistWriter(ElasticsearchPersistWriter.STREAMS_ID, writer, 1, "NoDeletesProcessor");
builder.addStreamsProcessor("DeleteOnlyProcessor", deleteOnlyProcessor, 1, "converter");
- builder.addStreamsPersistWriter(ElasticsearchPersistDeleter.STREAMS_ID, deleter, 1, "DeleteOnlyProcessor");
+ builder.addStreamsPersistWriter("deleter", deleter, 1, "DeleteOnlyProcessor");
builder.start();
}
- private class DeleteOnlyProcessor implements StreamsProcessor
- {
- String delete = new Delete().getVerb();
-
- @Override
- public void prepare(Object configurationObject) {}
-
- @Override
- public void cleanUp() {}
-
- @Override
- public List<StreamsDatum> process(StreamsDatum entry) {
- Preconditions.checkArgument(entry.getDocument() instanceof Activity);
- Activity activity = (Activity) entry.getDocument();
- if( activity.getVerb().equals(delete))
- return Lists.newArrayList(entry);
- else
- return Lists.newArrayList();
- }
- }
-
- private class NoDeletesProcessor implements StreamsProcessor
- {
- String delete = new Delete().getVerb();
-
- @Override
- public void prepare(Object configurationObject) {}
-
- @Override
- public void cleanUp() {}
-
- @Override
- public List<StreamsDatum> process(StreamsDatum entry) {
- Preconditions.checkArgument(entry.getDocument() instanceof Activity);
- Activity activity = (Activity) entry.getDocument();
- if( activity.getVerb().equals(delete))
- return Lists.newArrayList();
- else
- return Lists.newArrayList(entry);
- }
- }
-
}
diff --git a/local/twitter-userstream-elasticsearch/src/main/resources/TwitterUserstreamElasticsearch.dot b/local/twitter-userstream-elasticsearch/src/main/resources/TwitterUserstreamElasticsearch.dot
index 5302bf9..62aa637 100644
--- a/local/twitter-userstream-elasticsearch/src/main/resources/TwitterUserstreamElasticsearch.dot
+++ b/local/twitter-userstream-elasticsearch/src/main/resources/TwitterUserstreamElasticsearch.dot
@@ -5,8 +5,8 @@
//processors
TwitterTypeConverter [label="TwitterTypeConverter",shape=box,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java"];
- DeleteOnlyProcessor [label="DeleteOnlyProcessor ",shape=box,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java"];
- NoDeletesProcessor [label="NoDeletes",shape=box,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java"];
+ DeleteOnlyProcessor [label="VerbDefinitionKeepFilter (verb:post)",shape=box,URL="https://github.com/apache/incubator-streams/blob/master/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionKeepFilter.java"];
+ NoDeletesProcessor [label="VerbDefinitionDropFilter (verb:post)",shape=box,URL="https://github.com/apache/incubator-streams/blob/master/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionDropFilter.java"];
//persisters
ElasticsearchPersistWriter [label="ElasticsearchPersistWriter",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java"];