SAMZA-1050: Make samza-operator independent of avro version Prep for merging the samza-operator APIs to master: removing the direct dependency on avro. Author: Yi Pan (Data Infrastructure) <nickpan47@gmail.com> Reviewers: Jagadish <jagadish1989@gmail.com>, Prateek Maheshiwari <pmaheshiwari@linkedin.com> Closes #22 from nickpan47/SAMZA-1050
diff --git a/build.gradle b/build.gradle index 28c2dcf..1b3c278 100644 --- a/build.gradle +++ b/build.gradle
@@ -278,7 +278,12 @@ // Force scala joint compilation sourceSets.main.scala.srcDir "src/main/java" + sourceSets.test.scala.srcDir "src/test/java" + + // Disable the Javac compiler by forcing joint compilation by scalac. This is equivalent to setting + // tasks.compileTestJava.enabled = false sourceSets.main.java.srcDirs = [] + sourceSets.test.java.srcDirs = [] dependencies { compile project(':samza-api') @@ -304,6 +309,10 @@ // Exclude because YARN's 3.4.5 ZK version is incompatbile with Kafka's 3.3.4. exclude module: 'zookeeper' } + compile("org.apache.hadoop:hadoop-hdfs:$yarnVersion") { + exclude module: 'slf4j-log4j12' + exclude module: 'servlet-api' + } compile("org.scalatra:scalatra_$scalaVersion:$scalatraVersion") { exclude module: 'scala-compiler' exclude module: 'slf4j-api' @@ -317,6 +326,7 @@ testCompile "junit:junit:$junitVersion" testCompile "org.mockito:mockito-all:$mockitoVersion" testCompile project(":samza-core_$scalaVersion").sourceSets.test.output + testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion" } repositories { @@ -346,7 +356,6 @@ compile project(":samza-core_$scalaVersion") compile "commons-collections:commons-collections:$commonsCollectionVersion" compile "org.apache.commons:commons-lang3:$commonsLang3Version" - compile "org.apache.avro:avro:$avroVersion" compile "org.reactivestreams:reactive-streams:$reactiveStreamVersion" testCompile project(":samza-api").sourceSets.test.output @@ -368,6 +377,7 @@ dependencies { compile project(":samza-operator") + compile "org.apache.avro:avro:$avroVersion" compile "org.apache.calcite:calcite-core:$calciteVersion" testCompile "junit:junit:$junitVersion" testCompile "org.mockito:mockito-all:$mockitoVersion" @@ -470,7 +480,13 @@ project(":samza-kv-rocksdb_$scalaVersion") { apply plugin: 'scala' + // Force scala joint compilation + sourceSets.main.scala.srcDir "src/main/java" sourceSets.test.scala.srcDir "src/test/java" + + // Disable the Javac compiler by forcing joint compilation by scalac. This is equivalent to setting + // tasks.compileTestJava.enabled = false + sourceSets.main.java.srcDirs = [] sourceSets.test.java.srcDirs = [] dependencies {
diff --git a/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java b/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java index 493a688..a6d57da 100644 --- a/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java +++ b/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
@@ -19,11 +19,10 @@ package org.apache.samza.task; -import org.apache.avro.generic.GenericRecord; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.MessageStreams.SystemMessageStream; -import org.apache.samza.operators.Windows; import org.apache.samza.operators.TriggerBuilder; +import org.apache.samza.operators.Windows; import org.apache.samza.operators.data.IncomingSystemMessage; import org.apache.samza.operators.data.Offset; import org.apache.samza.operators.task.StreamOperatorTask; @@ -82,16 +81,7 @@ } JsonMessage getInputMessage(IncomingSystemMessage m1) { - return new JsonMessage( - m1.getKey().toString(), - (MessageType) m1.getMessage(), - m1.getOffset(), - this.getEventTime((GenericRecord)m1.getMessage()), - m1.getSystemStreamPartition()); - } - - long getEventTime(GenericRecord msg) { - return (Long) msg.get("event_time"); + return (JsonMessage) m1.getMessage(); } boolean myFilter1(JsonMessage m1) {
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Data.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Data.java similarity index 96% rename from samza-operator/src/main/java/org/apache/samza/operators/api/data/Data.java rename to samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Data.java index 69a3bee..7d6ee79 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Data.java +++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Data.java
@@ -17,7 +17,7 @@ * under the License. */ -package org.apache.samza.operators.api.data; +package org.apache.samza.sql.calcite.data; import java.util.List; import java.util.Map;
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Schema.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Schema.java similarity index 96% rename from samza-operator/src/main/java/org/apache/samza/operators/api/data/Schema.java rename to samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Schema.java index dc3f8f4..e2a79cf 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Schema.java +++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Schema.java
@@ -17,7 +17,7 @@ * under the License. */ -package org.apache.samza.operators.api.data; +package org.apache.samza.sql.calcite.data; import java.util.Map;
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroData.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroData.java similarity index 97% rename from samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroData.java rename to samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroData.java index e4f5d79..91d26a2 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroData.java +++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroData.java
@@ -17,7 +17,7 @@ * under the License. */ -package org.apache.samza.operators.impl.data.avro; +package org.apache.samza.sql.calcite.data.avro; import java.nio.ByteBuffer; import java.util.List; @@ -25,8 +25,8 @@ import org.apache.avro.generic.GenericArray; import org.apache.avro.generic.GenericRecord; -import org.apache.samza.operators.api.data.Data; -import org.apache.samza.operators.api.data.Schema; +import org.apache.samza.sql.calcite.data.Data; +import org.apache.samza.sql.calcite.data.Schema; public class AvroData implements Data {
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroSchema.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroSchema.java similarity index 98% rename from samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroSchema.java rename to samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroSchema.java index c04e4f6..c3bb150 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroSchema.java +++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroSchema.java
@@ -17,14 +17,14 @@ * under the License. */ -package org.apache.samza.operators.impl.data.avro; +package org.apache.samza.sql.calcite.data.avro; import java.util.HashMap; import java.util.Map; import org.apache.avro.Schema.Field; -import org.apache.samza.operators.api.data.Data; -import org.apache.samza.operators.api.data.Schema; +import org.apache.samza.sql.calcite.data.Data; +import org.apache.samza.sql.calcite.data.Schema; public class AvroSchema implements Schema {
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerde.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerde.java similarity index 94% rename from samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerde.java rename to samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerde.java index 2432aca..97a3b6c 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerde.java +++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerde.java
@@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.samza.operators.impl.data.serializers; +package org.apache.samza.sql.calcite.data.serializers; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; @@ -27,8 +27,8 @@ import org.apache.avro.io.BinaryEncoder; import org.apache.samza.SamzaException; import org.apache.samza.serializers.Serde; -import org.apache.samza.operators.impl.data.avro.AvroData; -import org.apache.samza.operators.impl.data.avro.AvroSchema; +import org.apache.samza.sql.calcite.data.avro.AvroData; +import org.apache.samza.sql.calcite.data.avro.AvroSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeFactory.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeFactory.java similarity index 92% rename from samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeFactory.java rename to samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeFactory.java index edd8859..caf4009 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeFactory.java +++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeFactory.java
@@ -16,14 +16,14 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.samza.operators.impl.data.serializers; +package org.apache.samza.sql.calcite.data.serializers; import org.apache.avro.Schema; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.serializers.Serde; import org.apache.samza.serializers.SerdeFactory; -import org.apache.samza.operators.impl.data.avro.AvroData; +import org.apache.samza.sql.calcite.data.avro.AvroData; public class SqlAvroSerdeFactory implements SerdeFactory<AvroData> { public static final String PROP_AVRO_SCHEMA = "serializers.%s.schema";
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerde.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerde.java similarity index 91% rename from samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerde.java rename to samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerde.java index 1267ab6..6651e97 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerde.java +++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerde.java
@@ -17,11 +17,11 @@ * under the License. */ -package org.apache.samza.operators.impl.data.serializers; +package org.apache.samza.sql.calcite.data.serializers; import org.apache.samza.serializers.Serde; import org.apache.samza.serializers.StringSerde; -import org.apache.samza.operators.impl.data.string.StringData; +import org.apache.samza.sql.calcite.data.string.StringData; public class SqlStringSerde implements Serde<StringData> {
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerdeFactory.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerdeFactory.java similarity index 90% rename from samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerdeFactory.java rename to samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerdeFactory.java index 3b6a3e0..80fb542 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerdeFactory.java +++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerdeFactory.java
@@ -17,13 +17,13 @@ * under the License. */ -package org.apache.samza.operators.impl.data.serializers; +package org.apache.samza.sql.calcite.data.serializers; import org.apache.samza.config.Config; import org.apache.samza.serializers.Serde; import org.apache.samza.serializers.SerdeFactory; -import org.apache.samza.operators.impl.data.string.StringData; +import org.apache.samza.sql.calcite.data.string.StringData; public class SqlStringSerdeFactory implements SerdeFactory<StringData> { @Override
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringData.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/string/StringData.java similarity index 94% rename from samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringData.java rename to samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/string/StringData.java index 86e9917..f7c8121 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringData.java +++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/string/StringData.java
@@ -17,10 +17,10 @@ * under the License. */ -package org.apache.samza.operators.impl.data.string; +package org.apache.samza.sql.calcite.data.string; -import org.apache.samza.operators.api.data.Data; -import org.apache.samza.operators.api.data.Schema; +import org.apache.samza.sql.calcite.data.Data; +import org.apache.samza.sql.calcite.data.Schema; import java.util.List; import java.util.Map;
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringSchema.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/string/StringSchema.java similarity index 93% rename from samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringSchema.java rename to samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/string/StringSchema.java index b19dfeb..829d61f 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringSchema.java +++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/string/StringSchema.java
@@ -17,10 +17,10 @@ * under the License. */ -package org.apache.samza.operators.impl.data.string; +package org.apache.samza.sql.calcite.data.string; -import org.apache.samza.operators.api.data.Data; -import org.apache.samza.operators.api.data.Schema; +import org.apache.samza.sql.calcite.data.Data; +import org.apache.samza.sql.calcite.data.Schema; import java.util.Map;
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java b/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeTest.java similarity index 85% rename from samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java rename to samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeTest.java index 5aa28bb..b891ddc 100644 --- a/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java +++ b/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeTest.java
@@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.samza.operators.impl.data.serializers; +package org.apache.samza.sql.calcite.data.serializers; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; @@ -27,7 +27,7 @@ import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; import org.apache.samza.serializers.Serde; -import org.apache.samza.operators.impl.data.avro.AvroData; +import org.apache.samza.sql.calcite.data.avro.AvroData; import org.junit.Assert; import org.junit.Test; @@ -55,10 +55,10 @@ public void testSqlAvroSerdeDeserialization() throws IOException { AvroData decodedDatum = (AvroData)serde.fromBytes(encodeMessage(sampleOrderRecord(), orderSchema)); - Assert.assertTrue(decodedDatum.schema().getType() == org.apache.samza.operators.api.data.Schema.Type.STRUCT); - Assert.assertTrue(decodedDatum.getFieldData("id").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.INTEGER); - Assert.assertTrue(decodedDatum.getFieldData("quantity").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.INTEGER); - Assert.assertTrue(decodedDatum.getFieldData("product").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.STRING); + Assert.assertTrue(decodedDatum.schema().getType() == org.apache.samza.sql.calcite.data.Schema.Type.STRUCT); + Assert.assertTrue(decodedDatum.getFieldData("id").schema().getType() == org.apache.samza.sql.calcite.data.Schema.Type.INTEGER); + Assert.assertTrue(decodedDatum.getFieldData("quantity").schema().getType() == org.apache.samza.sql.calcite.data.Schema.Type.INTEGER); + Assert.assertTrue(decodedDatum.getFieldData("product").schema().getType() == org.apache.samza.sql.calcite.data.Schema.Type.STRING); } @Test @@ -69,10 +69,10 @@ AvroData decodedDatum = (AvroData)serde.fromBytes(encodedDatum); - Assert.assertTrue(decodedDatum.schema().getType() == org.apache.samza.operators.api.data.Schema.Type.STRUCT); - Assert.assertTrue(decodedDatum.getFieldData("id").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.INTEGER); - Assert.assertTrue(decodedDatum.getFieldData("quantity").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.INTEGER); - Assert.assertTrue(decodedDatum.getFieldData("product").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.STRING); + Assert.assertTrue(decodedDatum.schema().getType() == org.apache.samza.sql.calcite.data.Schema.Type.STRUCT); + Assert.assertTrue(decodedDatum.getFieldData("id").schema().getType() == org.apache.samza.sql.calcite.data.Schema.Type.INTEGER); + Assert.assertTrue(decodedDatum.getFieldData("quantity").schema().getType() == org.apache.samza.sql.calcite.data.Schema.Type.INTEGER); + Assert.assertTrue(decodedDatum.getFieldData("product").schema().getType() == org.apache.samza.sql.calcite.data.Schema.Type.STRING); } private static Config sqlAvroSerdeTestConfig(){