fixed delete verb def
clean-up documentation
confirmed working
diff --git a/local/twitter-userstream-elasticsearch/TwitterUserstreamElasticsearch.png b/local/twitter-userstream-elasticsearch/TwitterUserstreamElasticsearch.png
new file mode 100644
index 0000000..003a002
--- /dev/null
+++ b/local/twitter-userstream-elasticsearch/TwitterUserstreamElasticsearch.png
Binary files differ
diff --git a/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/twitter/example/TwitterUserstreamElasticsearch.java b/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/twitter/example/TwitterUserstreamElasticsearch.java
index 2e7d2cd..fb9d1ed 100644
--- a/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/twitter/example/TwitterUserstreamElasticsearch.java
+++ b/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/twitter/example/TwitterUserstreamElasticsearch.java
@@ -18,11 +18,15 @@
package org.apache.streams.twitter.example;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.converter.ActivityConverterProcessor;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.elasticsearch.ElasticsearchPersistDeleter;
import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
@@ -31,12 +35,16 @@
import org.apache.streams.filters.VerbDefinitionKeepFilter;
import org.apache.streams.local.builders.LocalStreamBuilder;
import org.apache.streams.core.StreamBuilder;
+import org.apache.streams.pojo.json.Activity;
import org.apache.streams.twitter.TwitterStreamConfiguration;
import org.apache.streams.twitter.provider.TwitterStreamProvider;
+import org.apache.streams.verbs.ObjectCombination;
import org.apache.streams.verbs.VerbDefinition;
+import org.elasticsearch.common.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
import java.util.Map;
/**
@@ -48,7 +56,11 @@
private final static Logger LOGGER = LoggerFactory.getLogger(TwitterUserstreamElasticsearch.class);
- private static VerbDefinition deleteVerbDefinition = new VerbDefinition().withValue("post");
+ /* this pattern will match any/only deletes */
+ private static VerbDefinition deleteVerbDefinition =
+ new VerbDefinition()
+ .withValue("delete")
+ .withObjects(Lists.newArrayList(new ObjectCombination()));
TwitterUserstreamElasticsearchConfiguration config;
@@ -81,6 +93,7 @@
VerbDefinitionDropFilter noDeletesProcessor = new VerbDefinitionDropFilter(Sets.newHashSet(deleteVerbDefinition));
ElasticsearchPersistWriter writer = new ElasticsearchPersistWriter(elasticsearchWriterConfiguration);
VerbDefinitionKeepFilter deleteOnlyProcessor = new VerbDefinitionKeepFilter(Sets.newHashSet(deleteVerbDefinition));
+ SetDeleteIdProcessor setDeleteIdProcessor = new SetDeleteIdProcessor();
ElasticsearchPersistDeleter deleter = new ElasticsearchPersistDeleter(elasticsearchWriterConfiguration);
Map<String, Object> streamConfig = Maps.newHashMap();
@@ -92,10 +105,38 @@
builder.addStreamsProcessor("NoDeletesProcessor", noDeletesProcessor, 1, "converter");
builder.addStreamsPersistWriter(ElasticsearchPersistWriter.STREAMS_ID, writer, 1, "NoDeletesProcessor");
builder.addStreamsProcessor("DeleteOnlyProcessor", deleteOnlyProcessor, 1, "converter");
- builder.addStreamsPersistWriter("deleter", deleter, 1, "DeleteOnlyProcessor");
+ builder.addStreamsProcessor("SetDeleteIdProcessor", setDeleteIdProcessor, 1, "DeleteOnlyProcessor");
+ builder.addStreamsPersistWriter("deleter", deleter, 1, "SetDeleteIdProcessor");
builder.start();
}
+ protected class SetDeleteIdProcessor implements StreamsProcessor {
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+
+ Preconditions.checkArgument(entry.getDocument() instanceof Activity);
+ String id = entry.getId();
+ // replace delete with post in id
+ // ensure ElasticsearchPersistDeleter will remove original post if present
+ id = Strings.replace(id, "delete", "post");
+ entry.setId(id);
+
+ return Lists.newArrayList(entry);
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+
+
+ }
+
+ @Override
+ public void cleanUp() {
+
+ }
+ }
+
}
diff --git a/local/twitter-userstream-elasticsearch/src/main/resources/TwitterUserstreamElasticsearch.dot b/local/twitter-userstream-elasticsearch/src/main/resources/TwitterUserstreamElasticsearch.dot
index 62aa637..c876176 100644
--- a/local/twitter-userstream-elasticsearch/src/main/resources/TwitterUserstreamElasticsearch.dot
+++ b/local/twitter-userstream-elasticsearch/src/main/resources/TwitterUserstreamElasticsearch.dot
@@ -4,23 +4,25 @@
TwitterStreamProvider [label="TwitterStreamProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java"];
//processors
- TwitterTypeConverter [label="TwitterTypeConverter",shape=box,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java"];
- DeleteOnlyProcessor [label="VerbDefinitionKeepFilter (verb:post)",shape=box,URL="https://github.com/apache/incubator-streams/blob/master/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionKeepFilter.java"];
- NoDeletesProcessor [label="VerbDefinitionDropFilter (verb:post)",shape=box,URL="https://github.com/apache/incubator-streams/blob/master/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionDropFilter.java"];
-
+ ActivityConverterProcessor [label="ActivityConverterProcessor",shape=box,URL="https://github.com/apache/incubator-streams/blob/master/streams-components/streams-converters/src/main/java/org/apache/streams/converter/ActivityConverterProcessor.java"];
+ DeleteOnlyProcessor [label="VerbDefinitionKeepFilter (verb:delete)",shape=box,URL="https://github.com/apache/incubator-streams/blob/master/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionKeepFilter.java"];
+ NoDeletesProcessor [label="VerbDefinitionDropFilter (verb:delete)",shape=box,URL="https://github.com/apache/incubator-streams/blob/master/streams-components/streams-filters/src/main/java/org/apache/streams/filters/VerbDefinitionDropFilter.java"];
+ SetDeleteIdProcessor [label="SetDeleteIdProcessor (verb:post)",shape=box,URL="https://github.com/apache/incubator-streams-examples/blob/master/local/twitter-userstream-elasticsearch/src/main/java/org/apache/streams/twitter/example/TwitterUserstreamElasticsearch.java"];
+
//persisters
ElasticsearchPersistWriter [label="ElasticsearchPersistWriter",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java"];
- ElasticsearchPersistDeleter [label="ElasticsearchPersistDeleter",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java"];
+ ElasticsearchPersistDeleter [label="ElasticsearchPersistDeleter",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java"];
//data
es [label="es://{index}/{type}",shape=box];
//stream
- TwitterStreamProvider -> TwitterTypeConverter [label="ObjectNode"];
- TwitterTypeConverter -> DeleteOnlyProcessor [label="Activity",URL="https://github.com/apache/incubator-streams/blob/master/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json"];
- TwitterTypeConverter -> NoDeletesProcessor [label="Activity",URL="https://github.com/apache/incubator-streams/blob/master/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json"];
- DeleteOnlyProcessor -> ElasticsearchPersistDeleter [label="Activity",URL="https://github.com/apache/incubator-streams/blob/master/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json"];
+ TwitterStreamProvider -> ActivityConverterProcessor [label="ObjectNode"];
+ ActivityConverterProcessor -> DeleteOnlyProcessor [label="Activity",URL="https://github.com/apache/incubator-streams/blob/master/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json"];
+ ActivityConverterProcessor -> NoDeletesProcessor [label="Activity",URL="https://github.com/apache/incubator-streams/blob/master/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json"];
+ DeleteOnlyProcessor -> SetDeleteIdProcessor [label="Activity",URL="https://github.com/apache/incubator-streams/blob/master/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json"];
NoDeletesProcessor -> ElasticsearchPersistWriter [label="Activity",URL="https://github.com/apache/incubator-streams/blob/master/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json"];
ElasticsearchPersistWriter -> es [label="Activity",URL="https://github.com/apache/incubator-streams/blob/master/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json"];
- ElasticsearchPersistDeleter -> es [label="Activity",URL="https://github.com/apache/incubator-streams/blob/master/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json"];
+ SetDeleteIdProcessor -> ElasticsearchPersistDeleter [label="Delete",URL="https://github.com/apache/incubator-streams/blob/master/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/verbs/delete.json"];
+ ElasticsearchPersistDeleter -> es [label="Delete",URL="https://github.com/apache/incubator-streams/blob/master/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/delete.json"];
}
\ No newline at end of file
diff --git a/local/twitter-userstream-elasticsearch/twitter-userstream-elasticsearch.png b/local/twitter-userstream-elasticsearch/twitter-userstream-elasticsearch.png
deleted file mode 100644
index de2b99f..0000000
--- a/local/twitter-userstream-elasticsearch/twitter-userstream-elasticsearch.png
+++ /dev/null
Binary files differ