SAMZA-1050: Make samza-operator independent of avro version

Prep for merging the samza-operator APIs to master: removing the direct dependency on avro.

Author: Yi Pan (Data Infrastructure) <nickpan47@gmail.com>

Reviewers: Jagadish <jagadish1989@gmail.com>, Prateek Maheshiwari <pmaheshiwari@linkedin.com>

Closes #22 from nickpan47/SAMZA-1050
diff --git a/build.gradle b/build.gradle
index 28c2dcf..1b3c278 100644
--- a/build.gradle
+++ b/build.gradle
@@ -278,7 +278,12 @@
 
   // Force scala joint compilation
   sourceSets.main.scala.srcDir "src/main/java"
+  sourceSets.test.scala.srcDir "src/test/java"
+
+  // Disable the Javac compiler by forcing joint compilation by scalac. This is equivalent to setting
+  // tasks.compileTestJava.enabled = false
   sourceSets.main.java.srcDirs = []
+  sourceSets.test.java.srcDirs = []
 
   dependencies {
     compile project(':samza-api')
@@ -304,6 +309,10 @@
       // Exclude because YARN's 3.4.5 ZK version is incompatbile with Kafka's 3.3.4.
       exclude module: 'zookeeper'
     }
+    compile("org.apache.hadoop:hadoop-hdfs:$yarnVersion") {
+      exclude module: 'slf4j-log4j12'
+      exclude module: 'servlet-api'
+    }
     compile("org.scalatra:scalatra_$scalaVersion:$scalatraVersion") {
       exclude module: 'scala-compiler'
       exclude module: 'slf4j-api'
@@ -317,6 +326,7 @@
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-all:$mockitoVersion"
     testCompile project(":samza-core_$scalaVersion").sourceSets.test.output
+    testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
   }
 
   repositories {
@@ -346,7 +356,6 @@
       compile project(":samza-core_$scalaVersion")
       compile "commons-collections:commons-collections:$commonsCollectionVersion"
       compile "org.apache.commons:commons-lang3:$commonsLang3Version"
-      compile "org.apache.avro:avro:$avroVersion"
       compile "org.reactivestreams:reactive-streams:$reactiveStreamVersion"
 
       testCompile project(":samza-api").sourceSets.test.output
@@ -368,6 +377,7 @@
 
     dependencies {
       compile project(":samza-operator")
+      compile "org.apache.avro:avro:$avroVersion"
       compile "org.apache.calcite:calcite-core:$calciteVersion"
       testCompile "junit:junit:$junitVersion"
       testCompile "org.mockito:mockito-all:$mockitoVersion"
@@ -470,7 +480,13 @@
 project(":samza-kv-rocksdb_$scalaVersion") {
   apply plugin: 'scala'
 
+  // Force scala joint compilation
+  sourceSets.main.scala.srcDir "src/main/java"
   sourceSets.test.scala.srcDir "src/test/java"
+
+  // Disable the Javac compiler by forcing joint compilation by scalac. This is equivalent to setting
+  // tasks.compileTestJava.enabled = false
+  sourceSets.main.java.srcDirs = []
   sourceSets.test.java.srcDirs = []
 
   dependencies {
diff --git a/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java b/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
index 493a688..a6d57da 100644
--- a/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
+++ b/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
@@ -19,11 +19,10 @@
 
 package org.apache.samza.task;
 
-import org.apache.avro.generic.GenericRecord;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.MessageStreams.SystemMessageStream;
-import org.apache.samza.operators.Windows;
 import org.apache.samza.operators.TriggerBuilder;
+import org.apache.samza.operators.Windows;
 import org.apache.samza.operators.data.IncomingSystemMessage;
 import org.apache.samza.operators.data.Offset;
 import org.apache.samza.operators.task.StreamOperatorTask;
@@ -82,16 +81,7 @@
   }
 
   JsonMessage getInputMessage(IncomingSystemMessage m1) {
-    return new JsonMessage(
-        m1.getKey().toString(),
-        (MessageType) m1.getMessage(),
-        m1.getOffset(),
-        this.getEventTime((GenericRecord)m1.getMessage()),
-        m1.getSystemStreamPartition());
-  }
-
-  long getEventTime(GenericRecord msg) {
-    return (Long) msg.get("event_time");
+    return (JsonMessage) m1.getMessage();
   }
 
   boolean myFilter1(JsonMessage m1) {
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Data.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Data.java
similarity index 96%
rename from samza-operator/src/main/java/org/apache/samza/operators/api/data/Data.java
rename to samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Data.java
index 69a3bee..7d6ee79 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Data.java
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Data.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.samza.operators.api.data;
+package org.apache.samza.sql.calcite.data;
 
 import java.util.List;
 import java.util.Map;
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Schema.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Schema.java
similarity index 96%
rename from samza-operator/src/main/java/org/apache/samza/operators/api/data/Schema.java
rename to samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Schema.java
index dc3f8f4..e2a79cf 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Schema.java
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Schema.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.samza.operators.api.data;
+package org.apache.samza.sql.calcite.data;
 
 import java.util.Map;
 
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroData.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroData.java
similarity index 97%
rename from samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroData.java
rename to samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroData.java
index e4f5d79..91d26a2 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroData.java
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroData.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.samza.operators.impl.data.avro;
+package org.apache.samza.sql.calcite.data.avro;
 
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -25,8 +25,8 @@
 
 import org.apache.avro.generic.GenericArray;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.samza.operators.api.data.Data;
-import org.apache.samza.operators.api.data.Schema;
+import org.apache.samza.sql.calcite.data.Data;
+import org.apache.samza.sql.calcite.data.Schema;
 
 
 public class AvroData implements Data {
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroSchema.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroSchema.java
similarity index 98%
rename from samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroSchema.java
rename to samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroSchema.java
index c04e4f6..c3bb150 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroSchema.java
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroSchema.java
@@ -17,14 +17,14 @@
  * under the License.
  */
 
-package org.apache.samza.operators.impl.data.avro;
+package org.apache.samza.sql.calcite.data.avro;
 
 import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.avro.Schema.Field;
-import org.apache.samza.operators.api.data.Data;
-import org.apache.samza.operators.api.data.Schema;
+import org.apache.samza.sql.calcite.data.Data;
+import org.apache.samza.sql.calcite.data.Schema;
 
 
 public class AvroSchema implements Schema {
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerde.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerde.java
similarity index 94%
rename from samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerde.java
rename to samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerde.java
index 2432aca..97a3b6c 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerde.java
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerde.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.samza.operators.impl.data.serializers;
+package org.apache.samza.sql.calcite.data.serializers;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericDatumReader;
@@ -27,8 +27,8 @@
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.samza.SamzaException;
 import org.apache.samza.serializers.Serde;
-import org.apache.samza.operators.impl.data.avro.AvroData;
-import org.apache.samza.operators.impl.data.avro.AvroSchema;
+import org.apache.samza.sql.calcite.data.avro.AvroData;
+import org.apache.samza.sql.calcite.data.avro.AvroSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeFactory.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeFactory.java
similarity index 92%
rename from samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeFactory.java
rename to samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeFactory.java
index edd8859..caf4009 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeFactory.java
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeFactory.java
@@ -16,14 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.samza.operators.impl.data.serializers;
+package org.apache.samza.sql.calcite.data.serializers;
 
 import org.apache.avro.Schema;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.SerdeFactory;
-import org.apache.samza.operators.impl.data.avro.AvroData;
+import org.apache.samza.sql.calcite.data.avro.AvroData;
 
 public class SqlAvroSerdeFactory implements SerdeFactory<AvroData> {
   public static final String PROP_AVRO_SCHEMA = "serializers.%s.schema";
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerde.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerde.java
similarity index 91%
rename from samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerde.java
rename to samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerde.java
index 1267ab6..6651e97 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerde.java
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerde.java
@@ -17,11 +17,11 @@
  * under the License.
  */
 
-package org.apache.samza.operators.impl.data.serializers;
+package org.apache.samza.sql.calcite.data.serializers;
 
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.operators.impl.data.string.StringData;
+import org.apache.samza.sql.calcite.data.string.StringData;
 
 
 public class SqlStringSerde implements Serde<StringData> {
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerdeFactory.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerdeFactory.java
similarity index 90%
rename from samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerdeFactory.java
rename to samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerdeFactory.java
index 3b6a3e0..80fb542 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerdeFactory.java
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerdeFactory.java
@@ -17,13 +17,13 @@
  * under the License.
  */
 
-package org.apache.samza.operators.impl.data.serializers;
+package org.apache.samza.sql.calcite.data.serializers;
 
 
 import org.apache.samza.config.Config;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.SerdeFactory;
-import org.apache.samza.operators.impl.data.string.StringData;
+import org.apache.samza.sql.calcite.data.string.StringData;
 
 public class SqlStringSerdeFactory implements SerdeFactory<StringData> {
     @Override
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringData.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/string/StringData.java
similarity index 94%
rename from samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringData.java
rename to samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/string/StringData.java
index 86e9917..f7c8121 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringData.java
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/string/StringData.java
@@ -17,10 +17,10 @@
  * under the License.
  */
 
-package org.apache.samza.operators.impl.data.string;
+package org.apache.samza.sql.calcite.data.string;
 
-import org.apache.samza.operators.api.data.Data;
-import org.apache.samza.operators.api.data.Schema;
+import org.apache.samza.sql.calcite.data.Data;
+import org.apache.samza.sql.calcite.data.Schema;
 
 import java.util.List;
 import java.util.Map;
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringSchema.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/string/StringSchema.java
similarity index 93%
rename from samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringSchema.java
rename to samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/string/StringSchema.java
index b19dfeb..829d61f 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringSchema.java
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/string/StringSchema.java
@@ -17,10 +17,10 @@
  * under the License.
  */
 
-package org.apache.samza.operators.impl.data.string;
+package org.apache.samza.sql.calcite.data.string;
 
-import org.apache.samza.operators.api.data.Data;
-import org.apache.samza.operators.api.data.Schema;
+import org.apache.samza.sql.calcite.data.Data;
+import org.apache.samza.sql.calcite.data.Schema;
 
 import java.util.Map;
 
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java b/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeTest.java
similarity index 85%
rename from samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java
rename to samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeTest.java
index 5aa28bb..b891ddc 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java
+++ b/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.samza.operators.impl.data.serializers;
+package org.apache.samza.sql.calcite.data.serializers;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
@@ -27,7 +27,7 @@
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.serializers.Serde;
-import org.apache.samza.operators.impl.data.avro.AvroData;
+import org.apache.samza.sql.calcite.data.avro.AvroData;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -55,10 +55,10 @@
   public void testSqlAvroSerdeDeserialization() throws IOException {
     AvroData decodedDatum = (AvroData)serde.fromBytes(encodeMessage(sampleOrderRecord(), orderSchema));
 
-    Assert.assertTrue(decodedDatum.schema().getType() == org.apache.samza.operators.api.data.Schema.Type.STRUCT);
-    Assert.assertTrue(decodedDatum.getFieldData("id").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.INTEGER);
-    Assert.assertTrue(decodedDatum.getFieldData("quantity").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.INTEGER);
-    Assert.assertTrue(decodedDatum.getFieldData("product").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.STRING);
+    Assert.assertTrue(decodedDatum.schema().getType() == org.apache.samza.sql.calcite.data.Schema.Type.STRUCT);
+    Assert.assertTrue(decodedDatum.getFieldData("id").schema().getType() == org.apache.samza.sql.calcite.data.Schema.Type.INTEGER);
+    Assert.assertTrue(decodedDatum.getFieldData("quantity").schema().getType() == org.apache.samza.sql.calcite.data.Schema.Type.INTEGER);
+    Assert.assertTrue(decodedDatum.getFieldData("product").schema().getType() == org.apache.samza.sql.calcite.data.Schema.Type.STRING);
   }
 
   @Test
@@ -69,10 +69,10 @@
 
     AvroData decodedDatum = (AvroData)serde.fromBytes(encodedDatum);
 
-    Assert.assertTrue(decodedDatum.schema().getType() == org.apache.samza.operators.api.data.Schema.Type.STRUCT);
-    Assert.assertTrue(decodedDatum.getFieldData("id").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.INTEGER);
-    Assert.assertTrue(decodedDatum.getFieldData("quantity").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.INTEGER);
-    Assert.assertTrue(decodedDatum.getFieldData("product").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.STRING);
+    Assert.assertTrue(decodedDatum.schema().getType() == org.apache.samza.sql.calcite.data.Schema.Type.STRUCT);
+    Assert.assertTrue(decodedDatum.getFieldData("id").schema().getType() == org.apache.samza.sql.calcite.data.Schema.Type.INTEGER);
+    Assert.assertTrue(decodedDatum.getFieldData("quantity").schema().getType() == org.apache.samza.sql.calcite.data.Schema.Type.INTEGER);
+    Assert.assertTrue(decodedDatum.getFieldData("product").schema().getType() == org.apache.samza.sql.calcite.data.Schema.Type.STRING);
   }
 
   private static Config sqlAvroSerdeTestConfig(){