STREAMS-490: Upgrade Flink to 1.2.0 and remove the remaining guava references in streams-examples
diff --git a/flink/flink-twitter-collection/pom.xml b/flink/flink-twitter-collection/pom.xml
index 2d06c86..ff682c7 100644
--- a/flink/flink-twitter-collection/pom.xml
+++ b/flink/flink-twitter-collection/pom.xml
@@ -34,7 +34,7 @@
     <properties>
         <testng.version>6.9.10</testng.version>
         <hdfs.version>2.7.0</hdfs.version>
-        <flink.version>1.1.2</flink.version>
+        <flink.version>1.2.0</flink.version>
         <scala.version>2.10.6</scala.version>
         <scalatest.version>2.2.5</scalatest.version>
         <scala.suffix>2.10</scala.suffix>
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
index b6d806c..f4ce6dd 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
@@ -20,8 +20,8 @@
 
 import java.net.MalformedURLException
 
-import com.google.common.base.Strings
 import com.typesafe.config.Config
+import org.apache.commons.lang3.StringUtils
 import org.apache.flink.api.common.restartstrategy.RestartStrategies
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.streaming.api.CheckpointingMode
@@ -63,7 +63,7 @@
 
   def setup(configUrl : String): Boolean =  {
     BASELOGGER.info("StreamsConfigurator.config: {}", StreamsConfigurator.config)
-    if( !Strings.isNullOrEmpty(configUrl)) {
+    if(StringUtils.isNotEmpty(configUrl)) {
       BASELOGGER.info("StreamsConfigurator.resolveConfig(configUrl): {}", StreamsConfigurator.resolveConfig(configUrl))
       try {
         typesafe = StreamsConfigurator.resolveConfig(configUrl).withFallback(StreamsConfigurator.config).resolve()
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
index 17246e5..5e8ccfd 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
@@ -18,17 +18,18 @@
 
 package org.apache.streams.examples.flink.twitter.collection
 
+import java.util.Objects
 import java.util.concurrent.TimeUnit
 
 import com.fasterxml.jackson.databind.ObjectMapper
-import com.google.common.base.{Preconditions, Strings}
 import com.google.common.util.concurrent.Uninterruptibles
+import org.apache.commons.lang3.StringUtils
 import org.apache.flink.api.common.functions.RichFlatMapFunction
 import org.apache.flink.api.scala._
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
-import org.apache.flink.streaming.connectors.fs.RollingSink
+import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink
 import org.apache.flink.util.Collector
 import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
 import org.apache.streams.core.StreamsDatum
@@ -39,6 +40,7 @@
 import org.apache.streams.twitter.TwitterFollowingConfiguration
 import org.apache.streams.twitter.pojo.Follow
 import org.apache.streams.twitter.provider.TwitterFollowingProvider
+import org.hamcrest.MatcherAssert
 import org.slf4j.{Logger, LoggerFactory}
 
 import scala.collection.JavaConversions._
@@ -92,11 +94,15 @@
       return false
     }
 
-    Preconditions.checkNotNull(jobConfig.getTwitter.getOauth)
-    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessToken))
-    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessTokenSecret))
-    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
-    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
+    Objects.requireNonNull(jobConfig.getTwitter.getOauth)
+    MatcherAssert.assertThat("OAuth Access Token is not Empty",
+      StringUtils.isNotEmpty(jobConfig.getTwitter.getOauth.getAccessToken))
+    MatcherAssert.assertThat("OAuth Access Secret is not Empty",
+      StringUtils.isNotEmpty(jobConfig.getTwitter.getOauth.getAccessTokenSecret))
+    MatcherAssert.assertThat("OAuth Consumer Key is not Empty",
+      StringUtils.isNotEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
+    MatcherAssert.assertThat("OAuth Consumer Secret is not Empty",
+      StringUtils.isNotEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
 
     true
 
@@ -135,7 +141,7 @@
       })
 
     if( config.getTest == false )
-      jsons.addSink(new RollingSink[String](outPath)).setParallelism(3)
+      jsons.addSink(new BucketingSink[String](outPath)).setParallelism(3)
     else
       jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
         .setParallelism(env.getParallelism)
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
index eb7f2c1..f9b033e 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
@@ -18,17 +18,18 @@
 
 package org.apache.streams.examples.flink.twitter.collection
 
+import java.util.Objects
 import java.util.concurrent.TimeUnit
 
 import com.fasterxml.jackson.databind.ObjectMapper
-import com.google.common.base.{Preconditions, Strings}
 import com.google.common.util.concurrent.Uninterruptibles
+import org.apache.commons.lang3.StringUtils
 import org.apache.flink.api.common.functions.RichFlatMapFunction
 import org.apache.flink.api.scala._
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
-import org.apache.flink.streaming.connectors.fs.RollingSink
+import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink
 import org.apache.flink.util.Collector
 import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
 import org.apache.streams.core.StreamsDatum
@@ -36,9 +37,9 @@
 import org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration
 import org.apache.streams.flink.FlinkStreamingConfiguration
 import org.apache.streams.jackson.StreamsJacksonMapper
-import org.apache.streams.twitter.TwitterTimelineProviderConfiguration
 import org.apache.streams.twitter.pojo.Tweet
 import org.apache.streams.twitter.provider.TwitterTimelineProvider
+import org.hamcrest.MatcherAssert
 import org.slf4j.{Logger, LoggerFactory}
 
 import scala.collection.JavaConversions._
@@ -92,11 +93,15 @@
       return false
     }
 
-    Preconditions.checkNotNull(jobConfig.getTwitter.getOauth)
-    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessToken))
-    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessTokenSecret))
-    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
-    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
+    Objects.requireNonNull(jobConfig.getTwitter.getOauth)
+    MatcherAssert.assertThat("OAuth Access Token is not Empty",
+      StringUtils.isNotEmpty(jobConfig.getTwitter.getOauth.getAccessToken))
+    MatcherAssert.assertThat("OAuth Access Secret is not Empty",
+      StringUtils.isNotEmpty(jobConfig.getTwitter.getOauth.getAccessTokenSecret))
+    MatcherAssert.assertThat("OAuth Consumer Key is not Empty",
+      StringUtils.isNotEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
+    MatcherAssert.assertThat("OAuth Consumer Secret is not Empty",
+      StringUtils.isNotEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
 
     true
 
@@ -137,7 +142,7 @@
       }).name("json")
 
     if( config.getTest == false )
-      jsons.addSink(new RollingSink[String](outPath)).setParallelism(3).name("hdfs")
+      jsons.addSink(new BucketingSink[String](outPath)).setParallelism(3).name("hdfs")
     else
       jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
         .setParallelism(env.getParallelism)
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
index dbb8a33..f4379c1 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
@@ -19,11 +19,12 @@
 package org.apache.streams.examples.flink.twitter.collection
 
 import java.io.Serializable
+import java.util.Objects
 import java.util.concurrent.TimeUnit
 
 import com.fasterxml.jackson.databind.ObjectMapper
-import com.google.common.base.{Preconditions, Strings}
 import com.google.common.util.concurrent.Uninterruptibles
+import org.apache.commons.lang3.StringUtils
 import org.apache.flink.api.common.functions.StoppableFunction
 import org.apache.flink.api.scala._
 import org.apache.flink.configuration.Configuration
@@ -31,7 +32,7 @@
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
-import org.apache.flink.streaming.connectors.fs.RollingSink
+import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink
 import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
 import org.apache.streams.core.StreamsDatum
 import org.apache.streams.examples.flink.FlinkBase
@@ -41,6 +42,7 @@
 import org.apache.streams.twitter.TwitterStreamConfiguration
 import org.apache.streams.twitter.converter.TwitterDateTimeFormat
 import org.apache.streams.twitter.provider.TwitterStreamProvider
+import org.hamcrest.MatcherAssert
 import org.slf4j.{Logger, LoggerFactory}
 
 import scala.collection.JavaConversions._
@@ -88,11 +90,15 @@
       return false
     }
 
-    Preconditions.checkNotNull(jobConfig.getTwitter.getOauth)
-    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessToken))
-    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessTokenSecret))
-    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
-    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
+    Objects.requireNonNull(jobConfig.getTwitter.getOauth)
+    MatcherAssert.assertThat("OAuth Access Token is not Empty",
+      StringUtils.isNotEmpty(jobConfig.getTwitter.getOauth.getAccessToken))
+    MatcherAssert.assertThat("OAuth Access Secret is not Empty",
+      StringUtils.isNotEmpty(jobConfig.getTwitter.getOauth.getAccessTokenSecret))
+    MatcherAssert.assertThat("OAuth Consumer Key is not Empty",
+      StringUtils.isNotEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
+    MatcherAssert.assertThat("OAuth Consumer Secret is not Empty",
+      StringUtils.isNotEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
 
     true
 
@@ -118,7 +124,7 @@
     val streamSource : DataStream[String] = env.addSource(spritzerSource)
 
     if( config.getTest == false )
-      streamSource.addSink(new RollingSink[String](outPath)).setParallelism(3).name("hdfs")
+      streamSource.addSink(new BucketingSink[String](outPath)).setParallelism(3).name("hdfs")
     else
       streamSource.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
         .setParallelism(env.getParallelism)
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
index c180089..e3160f1 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
@@ -18,11 +18,12 @@
 
 package org.apache.streams.examples.flink.twitter.collection
 
+import java.util.Objects
 import java.util.concurrent.TimeUnit
 
 import com.fasterxml.jackson.databind.ObjectMapper
-import com.google.common.base.{Preconditions, Strings}
 import com.google.common.util.concurrent.Uninterruptibles
+import org.apache.commons.lang3.StringUtils
 import org.apache.flink.api.common.functions.RichFlatMapFunction
 import org.apache.flink.api.scala._
 import org.apache.flink.core.fs.FileSystem
@@ -30,7 +31,7 @@
 import org.apache.flink.streaming.api.scala.function.WindowFunction
 import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
-import org.apache.flink.streaming.connectors.fs.RollingSink
+import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink
 import org.apache.flink.util.Collector
 import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
 import org.apache.streams.core.StreamsDatum
@@ -40,6 +41,7 @@
 import org.apache.streams.jackson.StreamsJacksonMapper
 import org.apache.streams.twitter.pojo.User
 import org.apache.streams.twitter.provider.TwitterUserInformationProvider
+import org.hamcrest.MatcherAssert
 import org.slf4j.{Logger, LoggerFactory}
 
 import scala.collection.JavaConversions._
@@ -93,11 +95,15 @@
       return false
     }
 
-    Preconditions.checkNotNull(jobConfig.getTwitter.getOauth)
-    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessToken))
-    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessTokenSecret))
-    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
-    Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
+    Objects.requireNonNull(jobConfig.getTwitter.getOauth)
+    MatcherAssert.assertThat("OAuth Access Token is not Empty",
+      StringUtils.isNotEmpty(jobConfig.getTwitter.getOauth.getAccessToken))
+    MatcherAssert.assertThat("OAuth Access Secret is not Empty",
+      StringUtils.isNotEmpty(jobConfig.getTwitter.getOauth.getAccessTokenSecret))
+    MatcherAssert.assertThat("OAuth Consumer Key is not Empty",
+      StringUtils.isNotEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
+    MatcherAssert.assertThat("OAuth Consumer Secret is not Empty",
+      StringUtils.isNotEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
 
     true
 
@@ -139,7 +145,7 @@
       }).name("jsons")
 
     if( config.getTest == false )
-      jsons.addSink(new RollingSink[String](outPath)).setParallelism(3).name("hdfs")
+      jsons.addSink(new BucketingSink[String](outPath)).setParallelism(3).name("hdfs")
     else
       jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
         .setParallelism(env.getParallelism)
diff --git a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java
index b859d60..b4485b1 100644
--- a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java
+++ b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java
@@ -27,12 +27,9 @@
 import org.apache.streams.local.LocalRuntimeConfiguration;
 import org.apache.streams.local.builders.LocalStreamBuilder;
 
-import com.google.common.collect.Maps;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-
 /**
  * Copies documents from an elasticsearch index to new-line delimited json on dfs.
  */
diff --git a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java
index caf9cbc..4fac11d 100644
--- a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java
+++ b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java
@@ -27,12 +27,9 @@
 import org.apache.streams.local.LocalRuntimeConfiguration;
 import org.apache.streams.local.builders.LocalStreamBuilder;
 
-import com.google.common.collect.Maps;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-
 /**
  * Copies documents from new-line delimited json on dfs to an elasticsearch index.
  */
diff --git a/local/elasticsearch-reindex/src/main/java/org/apache/streams/example/ElasticsearchReindex.java b/local/elasticsearch-reindex/src/main/java/org/apache/streams/example/ElasticsearchReindex.java
index 476e369..8b92c03 100644
--- a/local/elasticsearch-reindex/src/main/java/org/apache/streams/example/ElasticsearchReindex.java
+++ b/local/elasticsearch-reindex/src/main/java/org/apache/streams/example/ElasticsearchReindex.java
@@ -27,12 +27,9 @@
 import org.apache.streams.local.LocalRuntimeConfiguration;
 import org.apache.streams.local.builders.LocalStreamBuilder;
 
-import com.google.common.collect.Maps;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-
 /**
  * Copies documents from the source index to the destination index.
  */