STREAMS-677,STREAMS-680 (#516)

* resolves STREAMS-677 and STREAMS-680

resolves STREAMS-677
resolves STREAMS-680

* reorganize imports and fix a few compile problems
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Twitter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Twitter.java
index ebb9c64..cf3e963 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Twitter.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Twitter.java
@@ -58,6 +58,8 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /**
  * Implementation of all twitter interfaces using juneau.
@@ -110,8 +112,8 @@
       .setDefaultRequestConfig(
         RequestConfig.custom()
           .setConnectionRequestTimeout(5000)
-          .setConnectTimeout(5000)
-          .setSocketTimeout(5000)
+          .setConnectTimeout(60000)
+          .setSocketTimeout(60000)
           .setCookieSpec("easy")
           .build()
       )
@@ -122,6 +124,7 @@
       .addInterceptorLast((HttpResponseInterceptor) (httpResponse, httpContext) -> LOGGER.debug(httpResponse.getStatusLine().toString()))
       .build();
     this.restClientBuilder = RestClient.create()
+      .executorService(Executors.newCachedThreadPool(), false)
       .httpClient(httpclient, true)
       .parser(
         JsonParser.DEFAULT.builder()
@@ -138,7 +141,8 @@
       .retryable(
         configuration.getRetryMax().intValue(),
         configuration.getRetrySleepMs().intValue(),
-        new TwitterRetryHandler());
+        new TwitterRetryHandler()
+      );
     if( configuration.getDebug() ) {
       restClientBuilder = restClientBuilder.debug();
     }
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProvider.java
index 2a59cba..86fd465 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProvider.java
@@ -42,6 +42,7 @@
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
 
+import org.apache.commons.collections.iterators.IteratorChain;
 import org.apache.commons.lang.NotImplementedException;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
@@ -59,6 +60,8 @@
 import java.util.Objects;
 import java.util.Queue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -97,8 +100,9 @@
 
   StreamsConfiguration streamsConfiguration;
 
-  private List<Callable<Object>> tasks = new ArrayList<>();
-  private List<Future<Object>> futures = new ArrayList<>();
+  private List<Callable<Iterator<Tweet>>> tasks = new ArrayList<>();
+  private List<Future<Iterator<Tweet>>> futures = new ArrayList<>();
+  private CompletionService<Iterator<Tweet>> completionService;
 
   protected final AtomicBoolean running = new AtomicBoolean();
 
@@ -199,7 +203,15 @@
 
     request = new SevenDaySearchRequest();
     request.setQ(config.getQ());
-
+    request.setGeocode(config.getGeocode());
+    if( !Objects.isNull(config.getIncludeEntities()) ) {
+      request.setIncludeEntities(config.getIncludeEntities().toString());
+    }
+    request.setLang(config.getLang());
+    request.setLocale(config.getLocale());
+    if( !Objects.isNull(config.getResultType())) {
+      request.setResultType(config.getResultType());
+    }
     streamsConfiguration = StreamsConfigurator.detectConfiguration();
 
     try {
@@ -217,6 +229,8 @@
         )
     );
 
+    completionService = new ExecutorCompletionService<>(executor);
+
     submitSearchThread();
 
   }
@@ -240,16 +254,16 @@
 
   protected void submitSearchThread() {
 
-      Callable providerTask = new SevenDaySearchProviderTask(
-          this,
-          client,
-        request
-      );
-      LOGGER.info("Thread Created: {}", request);
-      tasks.add(providerTask);
-      Future future = executor.submit(providerTask);
-      futures.add(future);
-      LOGGER.info("Thread Submitted: {}", request);
+    Callable providerTask = new SevenDaySearchProviderTask(
+      this,
+      client,
+      request
+    );
+    LOGGER.info("Thread Created: {}", request);
+    tasks.add(providerTask);
+    Future<Iterator<Tweet>> future = completionService.submit(providerTask);
+    futures.add(future);
+    LOGGER.info("Thread Submitted: {}", request);
 
   }
 
@@ -324,8 +338,15 @@
     do {
       Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
     } while ( isRunning());
+    IteratorChain chain = new IteratorChain();
+    int received = 0;
+    while(received < tasks.size()) {
+      Future<Iterator<Tweet>> resultFuture = completionService.take();
+      Iterator<Tweet> result = resultFuture.get();
+      chain.addIterator(result);
+      received ++;
+    }
     cleanUp();
-    return providerQueue.stream().map( x -> ((Tweet)x.getDocument())).distinct().iterator();
-
+    return chain;
   }
 }
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProviderTask.java
index d5b6d8d..719b734 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProviderTask.java
@@ -18,14 +18,12 @@
 
 package org.apache.streams.twitter.provider;
 
-import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.api.SevenDaySearchRequest;
 import org.apache.streams.twitter.api.SevenDaySearchResponse;
 import org.apache.streams.twitter.api.Twitter;
 import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
 import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.util.ComponentUtils;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 
@@ -40,7 +38,7 @@
 import java.util.stream.Stream;
 
 /**
- *  Retrieve recent posts for a single user id.
+ *  Retrieve recent posts from standard seven day search.
  */
 public class SevenDaySearchProviderTask implements Callable<Iterator<Tweet>>, Runnable {
 
@@ -51,7 +49,7 @@
   protected SevenDaySearchProvider provider;
   protected Twitter client;
   protected SevenDaySearchRequest request;
-  protected List<Tweet> responseList;
+  protected List<Tweet> responseList = new ArrayList<>();
 
   /**
    * SevenDaySearchProviderTask constructor.
@@ -63,15 +61,15 @@
     this.provider = provider;
     this.client = twitter;
     this.request = request;
-    this.responseList = new ArrayList<>();
   }
 
   int item_count = 0;
   int last_count = 0;
   int page_count = 0;
+  Long maxId = null;
 
   @Override
-  public void run() {
+  public Iterator<Tweet> call() throws Exception {
 
     LOGGER.info("Thread Starting: {}", request.toString());
 
@@ -81,48 +79,55 @@
 
       List<Tweet> statuses = response.getStatuses();
 
+      last_count = statuses.size();
+
+      page_count++;
+
       responseList.addAll(statuses);
 
-      last_count = statuses.size();
       if( statuses.size() > 0 ) {
 
         for (Tweet status : statuses) {
 
           if (item_count < provider.getConfig().getMaxItems()) {
-            ComponentUtils.offerUntilSuccess(new StreamsDatum(status), provider.providerQueue);
+            responseList.add(status);
             item_count++;
           }
 
         }
 
         Stream<Long> statusIds = statuses.stream().map(status -> status.getId());
-        long minId = statusIds.reduce(Math::min).get();
-        page_count++;
-        request.setMaxId(new Long(minId).toString());
+        maxId = statusIds.reduce(Math::min).get();
+        request.setMaxId(maxId.toString());
 
       }
 
+      LOGGER.info("item_count: {} last_count: {} page_count: {} ", item_count, last_count, page_count);
+
     }
     while (shouldContinuePulling(last_count, page_count, item_count));
 
-    LOGGER.info("item_count: {} last_count: {} page_count: {} ", item_count, last_count, page_count);
-    
+    return responseList.iterator();
   }
 
-  public boolean shouldContinuePulling(int count, int page_count, int item_count) {
+  public boolean shouldContinuePulling(int item_count, int last_count, int page_count) {
+    boolean shouldContinuePulling = last_count > 0;
     if ( item_count >= provider.getConfig().getMaxItems() ) {
       return false;
     } else if (page_count >= provider.getConfig().getMaxPages()) {
       return false;
-    } else {
-      return ( count > 0 );
     }
+    LOGGER.info("shouldContinuePulling: ", shouldContinuePulling);
+    return shouldContinuePulling;
   }
 
   @Override
-  public Iterator<Tweet> call() throws Exception {
-    run();
-    return responseList.iterator();
+  public void run() {
+    try {
+      this.call();
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
   }
 
 }
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProvider.java
index 5e0a8ec..6340c55 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProvider.java
@@ -21,11 +21,7 @@
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfiguration;
 import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.core.util.ExecutorUtils;
-import org.apache.streams.core.util.QueueUtils;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.StreamsJacksonMapperConfiguration;
 import org.apache.streams.twitter.api.ThirtyDaySearchRequest;
@@ -42,8 +38,7 @@
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
 
-import org.apache.commons.lang.NotImplementedException;
-import org.joda.time.DateTime;
+import org.apache.commons.collections.iterators.IteratorChain;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,13 +47,14 @@
 import java.io.FileOutputStream;
 import java.io.PrintStream;
 import java.io.Serializable;
-import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
-import java.util.Queue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -71,7 +67,7 @@
 /**
  * Retrieve recent posts from a list of user ids or names.
  */
-public class ThirtyDaySearchProvider implements Callable<Iterator<Tweet>>, StreamsProvider, Serializable {
+public class ThirtyDaySearchProvider implements Callable<Iterator<Tweet>>, Serializable {
 
   private static final String STREAMS_ID = "ThirtyDaySearchProvider";
 
@@ -87,16 +83,15 @@
     return config;
   }
 
-  protected volatile Queue<StreamsDatum> providerQueue;
-
   protected ThirtyDaySearchRequest request;
 
   protected Twitter client;
 
   protected ExecutorService executor;
 
-  private List<Callable<Object>> tasks = new ArrayList<>();
-  private List<Future<Object>> futures = new ArrayList<>();
+  private List<Callable<Iterator<Tweet>>> tasks = new ArrayList<>();
+  private List<Future<Iterator<Tweet>>> futures = new ArrayList<>();
+  private CompletionService<Iterator<Tweet>> completionService;
 
   StreamsConfiguration streamsConfiguration;
 
@@ -131,14 +126,12 @@
     String configfile = args[0];
     String outfile = args[1];
 
-    Config reference = ConfigFactory.load();
     File file = new File(configfile);
     assert (file.exists());
     Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false));
+    StreamsConfigurator.addConfig(testResourceConfig);
 
-    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
-
-    StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
+    StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration();
     ThirtyDaySearchProviderConfiguration config = new ComponentConfigurator<>(ThirtyDaySearchProviderConfiguration.class).detectConfiguration();
     ThirtyDaySearchProvider provider = new ThirtyDaySearchProvider(config);
 
@@ -163,30 +156,16 @@
     this.config = config;
   }
 
-  public Queue<StreamsDatum> getProviderQueue() {
-    return this.providerQueue;
-  }
-
-  @Override
   public String getId() {
     return STREAMS_ID;
   }
 
-  @Override
   public void prepare(Object configurationObject) {
 
     if( !(configurationObject instanceof ThirtyDaySearchProviderConfiguration ) ) {
       this.config = (ThirtyDaySearchProviderConfiguration)configurationObject;
     }
 
-    try {
-      lock.writeLock().lock();
-      providerQueue = QueueUtils.constructQueue();
-    } finally {
-      lock.writeLock().unlock();
-    }
-
-    Objects.requireNonNull(providerQueue);
     Objects.requireNonNull(config.getOauth().getConsumerKey());
     Objects.requireNonNull(config.getOauth().getConsumerSecret());
     Objects.requireNonNull(config.getOauth().getAccessToken());
@@ -196,7 +175,8 @@
 
     request = new ThirtyDaySearchRequest();
     request.setQuery(config.getQuery());
-
+    request.setTag(config.getTag());
+    request.setMaxResults(config.getPageSize());
     streamsConfiguration = StreamsConfigurator.detectConfiguration();
 
     try {
@@ -214,11 +194,12 @@
         )
     );
 
+    completionService = new ExecutorCompletionService<>(executor);
+
     submitSearchThread();
 
   }
 
-  @Override
   public void startStream() {
 
     Objects.requireNonNull(executor);
@@ -238,54 +219,17 @@
   protected void submitSearchThread() {
 
     Callable providerTask = new ThirtyDaySearchProviderTask(
-          this,
-          client,
-        request
-      );
+      this,
+      client,
+      request
+    );
     LOGGER.info("Thread Created: {}", request);
     tasks.add(providerTask);
-    Future future = executor.submit(providerTask);
+    Future<Iterator<Tweet>> future = completionService.submit(providerTask);
     futures.add(future);
     LOGGER.info("Thread Submitted: {}", request);
   }
 
-  @Override
-  public StreamsResultSet readCurrent() {
-
-    StreamsResultSet result;
-
-    LOGGER.debug("Providing {} docs", providerQueue.size());
-
-    try {
-      lock.writeLock().lock();
-      result = new StreamsResultSet(providerQueue);
-      providerQueue = QueueUtils.constructQueue();
-    } finally {
-      lock.writeLock().unlock();
-    }
-
-    if ( result.size() == 0 && providerQueue.isEmpty() && executor.isTerminated() ) {
-      LOGGER.info("Finished.  Cleaning up...");
-
-      running.set(false);
-
-      LOGGER.info("Exiting");
-    }
-
-    return result;
-
-  }
-
-  public StreamsResultSet readNew(BigInteger sequence) {
-    LOGGER.debug("{} readNew", STREAMS_ID);
-    throw new NotImplementedException();
-  }
-
-  public StreamsResultSet readRange(DateTime start, DateTime end) {
-    LOGGER.debug("{} readRange", STREAMS_ID);
-    throw new NotImplementedException();
-  }
-
   /**
    * get Twitter Client from TwitterUserInformationConfiguration.
    * @return result
@@ -296,12 +240,10 @@
 
   }
 
-  @Override
   public void cleanUp() {
     ExecutorUtils.shutdownAndAwaitTermination(executor);
   }
 
-  @Override
   public boolean isRunning() {
     LOGGER.debug("executor.isTerminated: {}", executor.isTerminated());
     LOGGER.debug("tasks.size(): {}", tasks.size());
@@ -314,13 +256,21 @@
   }
 
   @Override
-  public Iterator<Tweet> call() throws Exception {
+  public Iterator<Tweet> call() throws InterruptedException, ExecutionException {
     prepare(config);
     startStream();
     do {
       Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
-    } while ( isRunning());
+    } while (isRunning());
+    IteratorChain chain = new IteratorChain();
+    int received = 0;
+    while (received < tasks.size()) {
+      Future<Iterator<Tweet>> resultFuture = completionService.take();
+      Iterator<Tweet> result = resultFuture.get();
+      chain.addIterator(result);
+      received++;
+    }
     cleanUp();
-    return providerQueue.stream().map( x -> ((Tweet)x.getDocument())).iterator();
+    return chain;
   }
 }
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProviderTask.java
index 92f1c64..7ef0493 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProviderTask.java
@@ -18,6 +18,7 @@
 
 package org.apache.streams.twitter.provider;
 
+import com.google.common.base.Strings;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.api.ThirtyDaySearchRequest;
@@ -32,15 +33,14 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 /**
- *  Retrieve recent posts for a single user id.
+ *  Retrieve recent posts from premium thirty day search.
  */
 public class ThirtyDaySearchProviderTask implements Callable<Iterator<Tweet>>, Runnable {
 
@@ -51,7 +51,7 @@
   protected ThirtyDaySearchProvider provider;
   protected Twitter client;
   protected ThirtyDaySearchRequest request;
-  protected List<Tweet> responseList;
+  protected List<Tweet> responseList = new ArrayList<>();
 
   /**
    * ThirtyDaySearchProviderTask constructor.
@@ -63,15 +63,15 @@
     this.provider = provider;
     this.client = twitter;
     this.request = request;
-    this.responseList = new ArrayList<>();
   }
 
   int item_count = 0;
   int last_count = 0;
   int page_count = 0;
+  String next = null;
 
   @Override
-  public void run() {
+  public Iterator<Tweet> call() throws Exception {
 
     LOGGER.info("Thread Starting: {}", request.toString());
 
@@ -81,46 +81,47 @@
 
       List<Tweet> statuses = response.getResults();
 
+      last_count = statuses.size();
+
+      // count items but dont truncate response b/c we already paid for them
+      item_count += statuses.size();
+
+      page_count++;
+
       responseList.addAll(statuses);
 
-      last_count = statuses.size();
-      if( statuses.size() > 0 ) {
+      next = response.getNext();
 
-        for (Tweet status : statuses) {
+      request.setNext(next);
 
-          if (item_count < provider.getConfig().getMaxItems()) {
-            ComponentUtils.offerUntilSuccess(new StreamsDatum(status), provider.providerQueue);
-            item_count++;
-          }
-
-        }
-
-        Stream<Long> statusIds = statuses.stream().map(status -> status.getId());
-        page_count++;
-        request.setNext(response.getNext());
-
-      }
+      LOGGER.info("item_count: {} last_count: {} page_count: {} next: {} ", item_count, last_count, page_count, next);
 
     }
-    while (shouldContinuePulling(last_count, page_count, item_count));
+    while (shouldContinuePulling(last_count, page_count, item_count, next));
 
-    LOGGER.info("item_count: {} last_count: {} page_count: {} ", item_count, last_count, page_count);
-
+    return responseList.iterator();
   }
 
-  public boolean shouldContinuePulling(int count, int page_count, int item_count) {
-    if (item_count >= provider.getConfig().getMaxItems()) {
-      return false;
-    } else if (page_count >= provider.getConfig().getMaxPages()) {
-      return false;
-    } else {
-      return (count > 0);
+  public boolean shouldContinuePulling(int count, int page_count, int item_count, String next) {
+    boolean shouldContinuePulling = count > 0;
+    if (Strings.isNullOrEmpty(next)) {
+      shouldContinuePulling = false;
     }
+    if (item_count >= provider.getConfig().getMaxItems()) {
+      shouldContinuePulling = false;
+    } else if (page_count >= provider.getConfig().getMaxPages()) {
+      shouldContinuePulling = false;
+    }
+    LOGGER.info("shouldContinuePulling: ", shouldContinuePulling);
+    return shouldContinuePulling;
   }
 
   @Override
-  public Iterator<Tweet> call() throws Exception {
-    run();
-    return responseList.iterator();
+  public void run() {
+    try {
+      this.call();
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
   }
 }
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
index 3afb785..d3ff1b9 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
@@ -31,7 +31,9 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Instant;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -67,6 +69,7 @@
   int item_count = 0;
   int last_count = 0;
   int page_count = 0;
+  Date earliest_timestamp = new Date(Long.MAX_VALUE);
 
   @Override
   public void run() {
@@ -86,9 +89,19 @@
 
         for (Tweet status : statuses) {
 
+          earliest_timestamp = Date.from(Instant.ofEpochMilli(Math.min(earliest_timestamp.getTime(), status.getCreatedAt().getTime())));
+
           if (item_count < provider.getConfig().getMaxItems()) {
-            ComponentUtils.offerUntilSuccess(new StreamsDatum(status), provider.providerQueue);
-            item_count++;
+
+            if( (provider.getConfig().getMinTimestamp() != null &&
+                  earliest_timestamp.after(provider.getConfig().getMinTimestamp())) ||
+                provider.getConfig().getMinTimestamp() == null ) {
+
+              ComponentUtils.offerUntilSuccess(new StreamsDatum(status), provider.providerQueue);
+              item_count++;
+
+            }
+
           }
 
         }
@@ -101,14 +114,17 @@
       }
 
     }
-    while (shouldContinuePulling(last_count, page_count, item_count));
+    while (shouldContinuePulling(last_count, page_count, item_count, earliest_timestamp));
 
     LOGGER.info("item_count: {} last_count: {} page_count: {} ", item_count, last_count, page_count);
     
   }
 
-  public boolean shouldContinuePulling(int count, int page_count, int item_count) {
-    if (item_count == provider.getConfig().getMaxItems()) {
+  public boolean shouldContinuePulling(int count, int page_count, int item_count, Date earliest_timestamp) {
+    if (provider.getConfig().getMinTimestamp() != null &&
+          earliest_timestamp.before(provider.getConfig().getMinTimestamp())) {
+      return false;
+    } else if (item_count == provider.getConfig().getMaxItems()) {
       return false;
     } else if (page_count == provider.getConfig().getMaxPages()) {
       return false;
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterTimelineProviderConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterTimelineProviderConfiguration.json
index 41f2dd3..3bca4e4 100644
--- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterTimelineProviderConfiguration.json
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterTimelineProviderConfiguration.json
@@ -16,6 +16,15 @@
         "max_pages": {
             "type": "integer",
             "description": "Max items per page to request"
+        },
+        "min_timestamp": {
+            "type": "string",
+            "format": "date-time",
+            "description": "Earliest timestamp permitted"
+        },
+        "page_size": {
+            "type": "integer",
+            "description": "Max items per page to request"
         }
     }
 }
\ No newline at end of file