SAMZA-1050: Make samza-operator independent of avro version
Prep for merging the samza-operator APIs to master: removing the direct dependency on avro.
Author: Yi Pan (Data Infrastructure) <nickpan47@gmail.com>
Reviewers: Jagadish <jagadish1989@gmail.com>, Prateek Maheshiwari <pmaheshiwari@linkedin.com>
Closes #22 from nickpan47/SAMZA-1050
diff --git a/build.gradle b/build.gradle
index 28c2dcf..1b3c278 100644
--- a/build.gradle
+++ b/build.gradle
@@ -278,7 +278,12 @@
// Force scala joint compilation
sourceSets.main.scala.srcDir "src/main/java"
+ sourceSets.test.scala.srcDir "src/test/java"
+
+ // Disable the Javac compiler by forcing joint compilation by scalac. This is equivalent to setting
+ // tasks.compileTestJava.enabled = false
sourceSets.main.java.srcDirs = []
+ sourceSets.test.java.srcDirs = []
dependencies {
compile project(':samza-api')
@@ -304,6 +309,10 @@
// Exclude because YARN's 3.4.5 ZK version is incompatbile with Kafka's 3.3.4.
exclude module: 'zookeeper'
}
+ compile("org.apache.hadoop:hadoop-hdfs:$yarnVersion") {
+ exclude module: 'slf4j-log4j12'
+ exclude module: 'servlet-api'
+ }
compile("org.scalatra:scalatra_$scalaVersion:$scalatraVersion") {
exclude module: 'scala-compiler'
exclude module: 'slf4j-api'
@@ -317,6 +326,7 @@
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-all:$mockitoVersion"
testCompile project(":samza-core_$scalaVersion").sourceSets.test.output
+ testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
}
repositories {
@@ -346,7 +356,6 @@
compile project(":samza-core_$scalaVersion")
compile "commons-collections:commons-collections:$commonsCollectionVersion"
compile "org.apache.commons:commons-lang3:$commonsLang3Version"
- compile "org.apache.avro:avro:$avroVersion"
compile "org.reactivestreams:reactive-streams:$reactiveStreamVersion"
testCompile project(":samza-api").sourceSets.test.output
@@ -368,6 +377,7 @@
dependencies {
compile project(":samza-operator")
+ compile "org.apache.avro:avro:$avroVersion"
compile "org.apache.calcite:calcite-core:$calciteVersion"
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-all:$mockitoVersion"
@@ -470,7 +480,13 @@
project(":samza-kv-rocksdb_$scalaVersion") {
apply plugin: 'scala'
+ // Force scala joint compilation
+ sourceSets.main.scala.srcDir "src/main/java"
sourceSets.test.scala.srcDir "src/test/java"
+
+ // Disable the Javac compiler by forcing joint compilation by scalac. This is equivalent to setting
+ // tasks.compileTestJava.enabled = false
+ sourceSets.main.java.srcDirs = []
sourceSets.test.java.srcDirs = []
dependencies {
diff --git a/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java b/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
index 493a688..a6d57da 100644
--- a/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
+++ b/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
@@ -19,11 +19,10 @@
package org.apache.samza.task;
-import org.apache.avro.generic.GenericRecord;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.MessageStreams.SystemMessageStream;
-import org.apache.samza.operators.Windows;
import org.apache.samza.operators.TriggerBuilder;
+import org.apache.samza.operators.Windows;
import org.apache.samza.operators.data.IncomingSystemMessage;
import org.apache.samza.operators.data.Offset;
import org.apache.samza.operators.task.StreamOperatorTask;
@@ -82,16 +81,7 @@
}
JsonMessage getInputMessage(IncomingSystemMessage m1) {
- return new JsonMessage(
- m1.getKey().toString(),
- (MessageType) m1.getMessage(),
- m1.getOffset(),
- this.getEventTime((GenericRecord)m1.getMessage()),
- m1.getSystemStreamPartition());
- }
-
- long getEventTime(GenericRecord msg) {
- return (Long) msg.get("event_time");
+ return (JsonMessage) m1.getMessage();
}
boolean myFilter1(JsonMessage m1) {
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Data.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Data.java
similarity index 96%
rename from samza-operator/src/main/java/org/apache/samza/operators/api/data/Data.java
rename to samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Data.java
index 69a3bee..7d6ee79 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Data.java
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Data.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.samza.operators.api.data;
+package org.apache.samza.sql.calcite.data;
import java.util.List;
import java.util.Map;
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Schema.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Schema.java
similarity index 96%
rename from samza-operator/src/main/java/org/apache/samza/operators/api/data/Schema.java
rename to samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Schema.java
index dc3f8f4..e2a79cf 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Schema.java
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Schema.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.samza.operators.api.data;
+package org.apache.samza.sql.calcite.data;
import java.util.Map;
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroData.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroData.java
similarity index 97%
rename from samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroData.java
rename to samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroData.java
index e4f5d79..91d26a2 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroData.java
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroData.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.samza.operators.impl.data.avro;
+package org.apache.samza.sql.calcite.data.avro;
import java.nio.ByteBuffer;
import java.util.List;
@@ -25,8 +25,8 @@
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericRecord;
-import org.apache.samza.operators.api.data.Data;
-import org.apache.samza.operators.api.data.Schema;
+import org.apache.samza.sql.calcite.data.Data;
+import org.apache.samza.sql.calcite.data.Schema;
public class AvroData implements Data {
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroSchema.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroSchema.java
similarity index 98%
rename from samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroSchema.java
rename to samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroSchema.java
index c04e4f6..c3bb150 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroSchema.java
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroSchema.java
@@ -17,14 +17,14 @@
* under the License.
*/
-package org.apache.samza.operators.impl.data.avro;
+package org.apache.samza.sql.calcite.data.avro;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.Schema.Field;
-import org.apache.samza.operators.api.data.Data;
-import org.apache.samza.operators.api.data.Schema;
+import org.apache.samza.sql.calcite.data.Data;
+import org.apache.samza.sql.calcite.data.Schema;
public class AvroSchema implements Schema {
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerde.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerde.java
similarity index 94%
rename from samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerde.java
rename to samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerde.java
index 2432aca..97a3b6c 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerde.java
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerde.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.samza.operators.impl.data.serializers;
+package org.apache.samza.sql.calcite.data.serializers;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
@@ -27,8 +27,8 @@
import org.apache.avro.io.BinaryEncoder;
import org.apache.samza.SamzaException;
import org.apache.samza.serializers.Serde;
-import org.apache.samza.operators.impl.data.avro.AvroData;
-import org.apache.samza.operators.impl.data.avro.AvroSchema;
+import org.apache.samza.sql.calcite.data.avro.AvroData;
+import org.apache.samza.sql.calcite.data.avro.AvroSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeFactory.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeFactory.java
similarity index 92%
rename from samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeFactory.java
rename to samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeFactory.java
index edd8859..caf4009 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeFactory.java
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeFactory.java
@@ -16,14 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.samza.operators.impl.data.serializers;
+package org.apache.samza.sql.calcite.data.serializers;
import org.apache.avro.Schema;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.SerdeFactory;
-import org.apache.samza.operators.impl.data.avro.AvroData;
+import org.apache.samza.sql.calcite.data.avro.AvroData;
public class SqlAvroSerdeFactory implements SerdeFactory<AvroData> {
public static final String PROP_AVRO_SCHEMA = "serializers.%s.schema";
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerde.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerde.java
similarity index 91%
rename from samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerde.java
rename to samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerde.java
index 1267ab6..6651e97 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerde.java
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerde.java
@@ -17,11 +17,11 @@
* under the License.
*/
-package org.apache.samza.operators.impl.data.serializers;
+package org.apache.samza.sql.calcite.data.serializers;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.operators.impl.data.string.StringData;
+import org.apache.samza.sql.calcite.data.string.StringData;
public class SqlStringSerde implements Serde<StringData> {
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerdeFactory.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerdeFactory.java
similarity index 90%
rename from samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerdeFactory.java
rename to samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerdeFactory.java
index 3b6a3e0..80fb542 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerdeFactory.java
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerdeFactory.java
@@ -17,13 +17,13 @@
* under the License.
*/
-package org.apache.samza.operators.impl.data.serializers;
+package org.apache.samza.sql.calcite.data.serializers;
import org.apache.samza.config.Config;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.SerdeFactory;
-import org.apache.samza.operators.impl.data.string.StringData;
+import org.apache.samza.sql.calcite.data.string.StringData;
public class SqlStringSerdeFactory implements SerdeFactory<StringData> {
@Override
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringData.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/string/StringData.java
similarity index 94%
rename from samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringData.java
rename to samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/string/StringData.java
index 86e9917..f7c8121 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringData.java
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/string/StringData.java
@@ -17,10 +17,10 @@
* under the License.
*/
-package org.apache.samza.operators.impl.data.string;
+package org.apache.samza.sql.calcite.data.string;
-import org.apache.samza.operators.api.data.Data;
-import org.apache.samza.operators.api.data.Schema;
+import org.apache.samza.sql.calcite.data.Data;
+import org.apache.samza.sql.calcite.data.Schema;
import java.util.List;
import java.util.Map;
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringSchema.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/string/StringSchema.java
similarity index 93%
rename from samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringSchema.java
rename to samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/string/StringSchema.java
index b19dfeb..829d61f 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringSchema.java
+++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/string/StringSchema.java
@@ -17,10 +17,10 @@
* under the License.
*/
-package org.apache.samza.operators.impl.data.string;
+package org.apache.samza.sql.calcite.data.string;
-import org.apache.samza.operators.api.data.Data;
-import org.apache.samza.operators.api.data.Schema;
+import org.apache.samza.sql.calcite.data.Data;
+import org.apache.samza.sql.calcite.data.Schema;
import java.util.Map;
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java b/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeTest.java
similarity index 85%
rename from samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java
rename to samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeTest.java
index 5aa28bb..b891ddc 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java
+++ b/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.samza.operators.impl.data.serializers;
+package org.apache.samza.sql.calcite.data.serializers;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
@@ -27,7 +27,7 @@
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.serializers.Serde;
-import org.apache.samza.operators.impl.data.avro.AvroData;
+import org.apache.samza.sql.calcite.data.avro.AvroData;
import org.junit.Assert;
import org.junit.Test;
@@ -55,10 +55,10 @@
public void testSqlAvroSerdeDeserialization() throws IOException {
AvroData decodedDatum = (AvroData)serde.fromBytes(encodeMessage(sampleOrderRecord(), orderSchema));
- Assert.assertTrue(decodedDatum.schema().getType() == org.apache.samza.operators.api.data.Schema.Type.STRUCT);
- Assert.assertTrue(decodedDatum.getFieldData("id").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.INTEGER);
- Assert.assertTrue(decodedDatum.getFieldData("quantity").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.INTEGER);
- Assert.assertTrue(decodedDatum.getFieldData("product").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.STRING);
+ Assert.assertTrue(decodedDatum.schema().getType() == org.apache.samza.sql.calcite.data.Schema.Type.STRUCT);
+ Assert.assertTrue(decodedDatum.getFieldData("id").schema().getType() == org.apache.samza.sql.calcite.data.Schema.Type.INTEGER);
+ Assert.assertTrue(decodedDatum.getFieldData("quantity").schema().getType() == org.apache.samza.sql.calcite.data.Schema.Type.INTEGER);
+ Assert.assertTrue(decodedDatum.getFieldData("product").schema().getType() == org.apache.samza.sql.calcite.data.Schema.Type.STRING);
}
@Test
@@ -69,10 +69,10 @@
AvroData decodedDatum = (AvroData)serde.fromBytes(encodedDatum);
- Assert.assertTrue(decodedDatum.schema().getType() == org.apache.samza.operators.api.data.Schema.Type.STRUCT);
- Assert.assertTrue(decodedDatum.getFieldData("id").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.INTEGER);
- Assert.assertTrue(decodedDatum.getFieldData("quantity").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.INTEGER);
- Assert.assertTrue(decodedDatum.getFieldData("product").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.STRING);
+ Assert.assertTrue(decodedDatum.schema().getType() == org.apache.samza.sql.calcite.data.Schema.Type.STRUCT);
+ Assert.assertTrue(decodedDatum.getFieldData("id").schema().getType() == org.apache.samza.sql.calcite.data.Schema.Type.INTEGER);
+ Assert.assertTrue(decodedDatum.getFieldData("quantity").schema().getType() == org.apache.samza.sql.calcite.data.Schema.Type.INTEGER);
+ Assert.assertTrue(decodedDatum.getFieldData("product").schema().getType() == org.apache.samza.sql.calcite.data.Schema.Type.STRING);
}
private static Config sqlAvroSerdeTestConfig(){