STREAMS-677,STREAMS-680 (#516)
* resolves STREAMS-677 and STREAMS-680
resolves STREAMS-677
resolves STREAMS-680
* reorganize imports and fix a few compile problems
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Twitter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Twitter.java
index ebb9c64..cf3e963 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Twitter.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Twitter.java
@@ -58,6 +58,8 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
/**
* Implementation of all twitter interfaces using juneau.
@@ -110,8 +112,8 @@
.setDefaultRequestConfig(
RequestConfig.custom()
.setConnectionRequestTimeout(5000)
- .setConnectTimeout(5000)
- .setSocketTimeout(5000)
+ .setConnectTimeout(60000)
+ .setSocketTimeout(60000)
.setCookieSpec("easy")
.build()
)
@@ -122,6 +124,7 @@
.addInterceptorLast((HttpResponseInterceptor) (httpResponse, httpContext) -> LOGGER.debug(httpResponse.getStatusLine().toString()))
.build();
this.restClientBuilder = RestClient.create()
+ .executorService(Executors.newCachedThreadPool(), false)
.httpClient(httpclient, true)
.parser(
JsonParser.DEFAULT.builder()
@@ -138,7 +141,8 @@
.retryable(
configuration.getRetryMax().intValue(),
configuration.getRetrySleepMs().intValue(),
- new TwitterRetryHandler());
+ new TwitterRetryHandler()
+ );
if( configuration.getDebug() ) {
restClientBuilder = restClientBuilder.debug();
}
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProvider.java
index 2a59cba..86fd465 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProvider.java
@@ -42,6 +42,7 @@
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigParseOptions;
+import org.apache.commons.collections.iterators.IteratorChain;
import org.apache.commons.lang.NotImplementedException;
import org.joda.time.DateTime;
import org.slf4j.Logger;
@@ -59,6 +60,8 @@
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -97,8 +100,9 @@
StreamsConfiguration streamsConfiguration;
- private List<Callable<Object>> tasks = new ArrayList<>();
- private List<Future<Object>> futures = new ArrayList<>();
+ private List<Callable<Iterator<Tweet>>> tasks = new ArrayList<>();
+ private List<Future<Iterator<Tweet>>> futures = new ArrayList<>();
+ private CompletionService<Iterator<Tweet>> completionService;
protected final AtomicBoolean running = new AtomicBoolean();
@@ -199,7 +203,15 @@
request = new SevenDaySearchRequest();
request.setQ(config.getQ());
-
+ request.setGeocode(config.getGeocode());
+ if( !Objects.isNull(config.getIncludeEntities()) ) {
+ request.setIncludeEntities(config.getIncludeEntities().toString());
+ }
+ request.setLang(config.getLang());
+ request.setLocale(config.getLocale());
+ if( !Objects.isNull(config.getResultType())) {
+ request.setResultType(config.getResultType());
+ }
streamsConfiguration = StreamsConfigurator.detectConfiguration();
try {
@@ -217,6 +229,8 @@
)
);
+ completionService = new ExecutorCompletionService<>(executor);
+
submitSearchThread();
}
@@ -240,16 +254,16 @@
protected void submitSearchThread() {
- Callable providerTask = new SevenDaySearchProviderTask(
- this,
- client,
- request
- );
- LOGGER.info("Thread Created: {}", request);
- tasks.add(providerTask);
- Future future = executor.submit(providerTask);
- futures.add(future);
- LOGGER.info("Thread Submitted: {}", request);
+ Callable providerTask = new SevenDaySearchProviderTask(
+ this,
+ client,
+ request
+ );
+ LOGGER.info("Thread Created: {}", request);
+ tasks.add(providerTask);
+ Future<Iterator<Tweet>> future = completionService.submit(providerTask);
+ futures.add(future);
+ LOGGER.info("Thread Submitted: {}", request);
}
@@ -324,8 +338,15 @@
do {
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
} while ( isRunning());
+ IteratorChain chain = new IteratorChain();
+ int received = 0;
+ while(received < tasks.size()) {
+ Future<Iterator<Tweet>> resultFuture = completionService.take();
+ Iterator<Tweet> result = resultFuture.get();
+ chain.addIterator(result);
+ received ++;
+ }
cleanUp();
- return providerQueue.stream().map( x -> ((Tweet)x.getDocument())).distinct().iterator();
-
+ return chain;
}
}
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProviderTask.java
index d5b6d8d..719b734 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProviderTask.java
@@ -18,14 +18,12 @@
package org.apache.streams.twitter.provider;
-import org.apache.streams.core.StreamsDatum;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.twitter.api.SevenDaySearchRequest;
import org.apache.streams.twitter.api.SevenDaySearchResponse;
import org.apache.streams.twitter.api.Twitter;
import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.util.ComponentUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -40,7 +38,7 @@
import java.util.stream.Stream;
/**
- * Retrieve recent posts for a single user id.
+ * Retrieve recent posts from standard seven day search.
*/
public class SevenDaySearchProviderTask implements Callable<Iterator<Tweet>>, Runnable {
@@ -51,7 +49,7 @@
protected SevenDaySearchProvider provider;
protected Twitter client;
protected SevenDaySearchRequest request;
- protected List<Tweet> responseList;
+ protected List<Tweet> responseList = new ArrayList<>();
/**
* SevenDaySearchProviderTask constructor.
@@ -63,15 +61,15 @@
this.provider = provider;
this.client = twitter;
this.request = request;
- this.responseList = new ArrayList<>();
}
int item_count = 0;
int last_count = 0;
int page_count = 0;
+ Long maxId = null;
@Override
- public void run() {
+ public Iterator<Tweet> call() throws Exception {
LOGGER.info("Thread Starting: {}", request.toString());
@@ -81,48 +79,55 @@
List<Tweet> statuses = response.getStatuses();
+ last_count = statuses.size();
+
+ page_count++;
+
responseList.addAll(statuses);
- last_count = statuses.size();
if( statuses.size() > 0 ) {
for (Tweet status : statuses) {
if (item_count < provider.getConfig().getMaxItems()) {
- ComponentUtils.offerUntilSuccess(new StreamsDatum(status), provider.providerQueue);
+ responseList.add(status);
item_count++;
}
}
Stream<Long> statusIds = statuses.stream().map(status -> status.getId());
- long minId = statusIds.reduce(Math::min).get();
- page_count++;
- request.setMaxId(new Long(minId).toString());
+ maxId = statusIds.reduce(Math::min).get();
+ request.setMaxId(maxId.toString());
}
+ LOGGER.info("item_count: {} last_count: {} page_count: {} ", item_count, last_count, page_count);
+
}
while (shouldContinuePulling(last_count, page_count, item_count));
- LOGGER.info("item_count: {} last_count: {} page_count: {} ", item_count, last_count, page_count);
-
+ return responseList.iterator();
}
- public boolean shouldContinuePulling(int count, int page_count, int item_count) {
+ public boolean shouldContinuePulling(int item_count, int last_count, int page_count) {
+ boolean shouldContinuePulling = last_count > 0;
if ( item_count >= provider.getConfig().getMaxItems() ) {
return false;
} else if (page_count >= provider.getConfig().getMaxPages()) {
return false;
- } else {
- return ( count > 0 );
}
+ LOGGER.info("shouldContinuePulling: ", shouldContinuePulling);
+ return shouldContinuePulling;
}
@Override
- public Iterator<Tweet> call() throws Exception {
- run();
- return responseList.iterator();
+ public void run() {
+ try {
+ this.call();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
}
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProvider.java
index 5e0a8ec..6340c55 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProvider.java
@@ -21,11 +21,7 @@
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfiguration;
import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.core.util.ExecutorUtils;
-import org.apache.streams.core.util.QueueUtils;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.StreamsJacksonMapperConfiguration;
import org.apache.streams.twitter.api.ThirtyDaySearchRequest;
@@ -42,8 +38,7 @@
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigParseOptions;
-import org.apache.commons.lang.NotImplementedException;
-import org.joda.time.DateTime;
+import org.apache.commons.collections.iterators.IteratorChain;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,13 +47,14 @@
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.io.Serializable;
-import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
-import java.util.Queue;
import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -71,7 +67,7 @@
/**
* Retrieve recent posts from a list of user ids or names.
*/
-public class ThirtyDaySearchProvider implements Callable<Iterator<Tweet>>, StreamsProvider, Serializable {
+public class ThirtyDaySearchProvider implements Callable<Iterator<Tweet>>, Serializable {
private static final String STREAMS_ID = "ThirtyDaySearchProvider";
@@ -87,16 +83,15 @@
return config;
}
- protected volatile Queue<StreamsDatum> providerQueue;
-
protected ThirtyDaySearchRequest request;
protected Twitter client;
protected ExecutorService executor;
- private List<Callable<Object>> tasks = new ArrayList<>();
- private List<Future<Object>> futures = new ArrayList<>();
+ private List<Callable<Iterator<Tweet>>> tasks = new ArrayList<>();
+ private List<Future<Iterator<Tweet>>> futures = new ArrayList<>();
+ private CompletionService<Iterator<Tweet>> completionService;
StreamsConfiguration streamsConfiguration;
@@ -131,14 +126,12 @@
String configfile = args[0];
String outfile = args[1];
- Config reference = ConfigFactory.load();
File file = new File(configfile);
assert (file.exists());
Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false));
+ StreamsConfigurator.addConfig(testResourceConfig);
- Config typesafe = testResourceConfig.withFallback(reference).resolve();
-
- StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
+ StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration();
ThirtyDaySearchProviderConfiguration config = new ComponentConfigurator<>(ThirtyDaySearchProviderConfiguration.class).detectConfiguration();
ThirtyDaySearchProvider provider = new ThirtyDaySearchProvider(config);
@@ -163,30 +156,16 @@
this.config = config;
}
- public Queue<StreamsDatum> getProviderQueue() {
- return this.providerQueue;
- }
-
- @Override
public String getId() {
return STREAMS_ID;
}
- @Override
public void prepare(Object configurationObject) {
if( !(configurationObject instanceof ThirtyDaySearchProviderConfiguration ) ) {
this.config = (ThirtyDaySearchProviderConfiguration)configurationObject;
}
- try {
- lock.writeLock().lock();
- providerQueue = QueueUtils.constructQueue();
- } finally {
- lock.writeLock().unlock();
- }
-
- Objects.requireNonNull(providerQueue);
Objects.requireNonNull(config.getOauth().getConsumerKey());
Objects.requireNonNull(config.getOauth().getConsumerSecret());
Objects.requireNonNull(config.getOauth().getAccessToken());
@@ -196,7 +175,8 @@
request = new ThirtyDaySearchRequest();
request.setQuery(config.getQuery());
-
+ request.setTag(config.getTag());
+ request.setMaxResults(config.getPageSize());
streamsConfiguration = StreamsConfigurator.detectConfiguration();
try {
@@ -214,11 +194,12 @@
)
);
+ completionService = new ExecutorCompletionService<>(executor);
+
submitSearchThread();
}
- @Override
public void startStream() {
Objects.requireNonNull(executor);
@@ -238,54 +219,17 @@
protected void submitSearchThread() {
Callable providerTask = new ThirtyDaySearchProviderTask(
- this,
- client,
- request
- );
+ this,
+ client,
+ request
+ );
LOGGER.info("Thread Created: {}", request);
tasks.add(providerTask);
- Future future = executor.submit(providerTask);
+ Future<Iterator<Tweet>> future = completionService.submit(providerTask);
futures.add(future);
LOGGER.info("Thread Submitted: {}", request);
}
- @Override
- public StreamsResultSet readCurrent() {
-
- StreamsResultSet result;
-
- LOGGER.debug("Providing {} docs", providerQueue.size());
-
- try {
- lock.writeLock().lock();
- result = new StreamsResultSet(providerQueue);
- providerQueue = QueueUtils.constructQueue();
- } finally {
- lock.writeLock().unlock();
- }
-
- if ( result.size() == 0 && providerQueue.isEmpty() && executor.isTerminated() ) {
- LOGGER.info("Finished. Cleaning up...");
-
- running.set(false);
-
- LOGGER.info("Exiting");
- }
-
- return result;
-
- }
-
- public StreamsResultSet readNew(BigInteger sequence) {
- LOGGER.debug("{} readNew", STREAMS_ID);
- throw new NotImplementedException();
- }
-
- public StreamsResultSet readRange(DateTime start, DateTime end) {
- LOGGER.debug("{} readRange", STREAMS_ID);
- throw new NotImplementedException();
- }
-
/**
* get Twitter Client from TwitterUserInformationConfiguration.
* @return result
@@ -296,12 +240,10 @@
}
- @Override
public void cleanUp() {
ExecutorUtils.shutdownAndAwaitTermination(executor);
}
- @Override
public boolean isRunning() {
LOGGER.debug("executor.isTerminated: {}", executor.isTerminated());
LOGGER.debug("tasks.size(): {}", tasks.size());
@@ -314,13 +256,21 @@
}
@Override
- public Iterator<Tweet> call() throws Exception {
+ public Iterator<Tweet> call() throws InterruptedException, ExecutionException {
prepare(config);
startStream();
do {
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
- } while ( isRunning());
+ } while (isRunning());
+ IteratorChain chain = new IteratorChain();
+ int received = 0;
+ while (received < tasks.size()) {
+ Future<Iterator<Tweet>> resultFuture = completionService.take();
+ Iterator<Tweet> result = resultFuture.get();
+ chain.addIterator(result);
+ received++;
+ }
cleanUp();
- return providerQueue.stream().map( x -> ((Tweet)x.getDocument())).iterator();
+ return chain;
}
}
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProviderTask.java
index 92f1c64..7ef0493 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProviderTask.java
@@ -18,6 +18,7 @@
package org.apache.streams.twitter.provider;
+import com.google.common.base.Strings;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.twitter.api.ThirtyDaySearchRequest;
@@ -32,15 +33,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
/**
- * Retrieve recent posts for a single user id.
+ * Retrieve recent posts from premium thirty day search.
*/
public class ThirtyDaySearchProviderTask implements Callable<Iterator<Tweet>>, Runnable {
@@ -51,7 +51,7 @@
protected ThirtyDaySearchProvider provider;
protected Twitter client;
protected ThirtyDaySearchRequest request;
- protected List<Tweet> responseList;
+ protected List<Tweet> responseList = new ArrayList<>();
/**
* ThirtyDaySearchProviderTask constructor.
@@ -63,15 +63,15 @@
this.provider = provider;
this.client = twitter;
this.request = request;
- this.responseList = new ArrayList<>();
}
int item_count = 0;
int last_count = 0;
int page_count = 0;
+ String next = null;
@Override
- public void run() {
+ public Iterator<Tweet> call() throws Exception {
LOGGER.info("Thread Starting: {}", request.toString());
@@ -81,46 +81,47 @@
List<Tweet> statuses = response.getResults();
+ last_count = statuses.size();
+
+ // count items but dont truncate response b/c we already paid for them
+ item_count += statuses.size();
+
+ page_count++;
+
responseList.addAll(statuses);
- last_count = statuses.size();
- if( statuses.size() > 0 ) {
+ next = response.getNext();
- for (Tweet status : statuses) {
+ request.setNext(next);
- if (item_count < provider.getConfig().getMaxItems()) {
- ComponentUtils.offerUntilSuccess(new StreamsDatum(status), provider.providerQueue);
- item_count++;
- }
-
- }
-
- Stream<Long> statusIds = statuses.stream().map(status -> status.getId());
- page_count++;
- request.setNext(response.getNext());
-
- }
+ LOGGER.info("item_count: {} last_count: {} page_count: {} next: {} ", item_count, last_count, page_count, next);
}
- while (shouldContinuePulling(last_count, page_count, item_count));
+ while (shouldContinuePulling(last_count, page_count, item_count, next));
- LOGGER.info("item_count: {} last_count: {} page_count: {} ", item_count, last_count, page_count);
-
+ return responseList.iterator();
}
- public boolean shouldContinuePulling(int count, int page_count, int item_count) {
- if (item_count >= provider.getConfig().getMaxItems()) {
- return false;
- } else if (page_count >= provider.getConfig().getMaxPages()) {
- return false;
- } else {
- return (count > 0);
+ public boolean shouldContinuePulling(int count, int page_count, int item_count, String next) {
+ boolean shouldContinuePulling = count > 0;
+ if (Strings.isNullOrEmpty(next)) {
+ shouldContinuePulling = false;
}
+ if (item_count >= provider.getConfig().getMaxItems()) {
+ shouldContinuePulling = false;
+ } else if (page_count >= provider.getConfig().getMaxPages()) {
+ shouldContinuePulling = false;
+ }
+ LOGGER.info("shouldContinuePulling: ", shouldContinuePulling);
+ return shouldContinuePulling;
}
@Override
- public Iterator<Tweet> call() throws Exception {
- run();
- return responseList.iterator();
+ public void run() {
+ try {
+ this.call();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
}
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
index 3afb785..d3ff1b9 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
@@ -31,7 +31,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.Instant;
import java.util.ArrayList;
+import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
@@ -67,6 +69,7 @@
int item_count = 0;
int last_count = 0;
int page_count = 0;
+ Date earliest_timestamp = new Date(Long.MAX_VALUE);
@Override
public void run() {
@@ -86,9 +89,19 @@
for (Tweet status : statuses) {
+ earliest_timestamp = Date.from(Instant.ofEpochMilli(Math.min(earliest_timestamp.getTime(), status.getCreatedAt().getTime())));
+
if (item_count < provider.getConfig().getMaxItems()) {
- ComponentUtils.offerUntilSuccess(new StreamsDatum(status), provider.providerQueue);
- item_count++;
+
+ if( (provider.getConfig().getMinTimestamp() != null &&
+ earliest_timestamp.after(provider.getConfig().getMinTimestamp())) ||
+ provider.getConfig().getMinTimestamp() == null ) {
+
+ ComponentUtils.offerUntilSuccess(new StreamsDatum(status), provider.providerQueue);
+ item_count++;
+
+ }
+
}
}
@@ -101,14 +114,17 @@
}
}
- while (shouldContinuePulling(last_count, page_count, item_count));
+ while (shouldContinuePulling(last_count, page_count, item_count, earliest_timestamp));
LOGGER.info("item_count: {} last_count: {} page_count: {} ", item_count, last_count, page_count);
}
- public boolean shouldContinuePulling(int count, int page_count, int item_count) {
- if (item_count == provider.getConfig().getMaxItems()) {
+ public boolean shouldContinuePulling(int count, int page_count, int item_count, Date earliest_timestamp) {
+ if (provider.getConfig().getMinTimestamp() != null &&
+ earliest_timestamp.before(provider.getConfig().getMinTimestamp())) {
+ return false;
+ } else if (item_count == provider.getConfig().getMaxItems()) {
return false;
} else if (page_count == provider.getConfig().getMaxPages()) {
return false;
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterTimelineProviderConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterTimelineProviderConfiguration.json
index 41f2dd3..3bca4e4 100644
--- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterTimelineProviderConfiguration.json
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterTimelineProviderConfiguration.json
@@ -16,6 +16,15 @@
"max_pages": {
"type": "integer",
"description": "Max items per page to request"
+ },
+ "min_timestamp": {
+ "type": "string",
+ "format": "date-time",
+ "description": "Earliest timestamp permitted"
+ },
+ "page_size": {
+ "type": "integer",
+ "description": "Max items per page to request"
}
}
}
\ No newline at end of file