[BAHIR-207] Add tests for scala 2.12 on travis (#59)
diff --git a/.travis.yml b/.travis.yml
index aaab50d..fd8c519 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -28,6 +28,8 @@
directories:
- $HOME/.m2
+install: true
+
services:
- docker
@@ -36,26 +38,19 @@
- openjdk8
env:
- - |
- FLINK_VERSION="1.8.0" SCALA_VERSION="2.11" DOCKER="false"
- PROJECTS="flink-connector-activemq,flink-connector-akka,flink-connector-influxdb,flink-connector-netty,flink-connector-redis,flink-library-siddhi,flink-connector-kudu"
- - |
- FLINK_VERSION="1.8.0" SCALA_VERSION="2.11" DOCKER="true"
- PROJECTS="flink-connector-flume"
+ - FLINK_VERSION="1.8.0" SCALA_VERSION="2.11"
+ - FLINK_VERSION="1.8.0" SCALA_VERSION="2.11" PROJECT="flink-connector-flume" DOCKER="true"
+ - FLINK_VERSION="1.8.0" SCALA_VERSION="2.12"
+ - FLINK_VERSION="1.8.0" SCALA_VERSION="2.12" PROJECT="flink-connector-flume" DOCKER="true"
before_install:
- ./dev/change-scala-version.sh $SCALA_VERSION
-install: true
-
-before_script:
+script:
- if [[ $DOCKER == "true" ]]; then
- docker-compose -f "$PROJECTS/dockers/docker-compose.yml" up -d;
- fi
-
-script: mvn clean verify -pl $PROJECTS -Pscala-$SCALA_VERSION -Dflink.version=$FLINK_VERSION
-
-after_script:
- - if [[ $DOCKER == "true" ]]; then
- docker-compose -f "$PROJECTS/dockers/docker-compose.yml" down;
+ docker-compose -f "$PROJECT/dockers/docker-compose.yml" up -d;
+ mvn clean verify -pl $PROJECT -Dscala-$SCALA_VERSION -Dflink.version=$FLINK_VERSION ;
+ docker-compose -f "$PROJECT/dockers/docker-compose.yml" down;
+ else
+ mvn clean verify -Dscala-$SCALA_VERSION -Dflink.version=$FLINK_VERSION ;
fi
diff --git a/flink-connector-activemq/pom.xml b/flink-connector-activemq/pom.xml
index 8f7cdaf..8efbedb 100644
--- a/flink-connector-activemq/pom.xml
+++ b/flink-connector-activemq/pom.xml
@@ -78,13 +78,6 @@
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
diff --git a/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
index 992d19f..13136ba 100644
--- a/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -47,7 +47,7 @@
private int maxRetryAttempts;
private long waitTimeMs;
private List<IN> incomingList;
- private FlumeEventBuilder eventBuilder;
+ private FlumeEventBuilder<IN> eventBuilder;
private RpcClient client;
public FlumeSink(String clientType, String hostname, int port, FlumeEventBuilder<IN> eventBuilder) {
@@ -71,7 +71,7 @@
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
- incomingList = new ArrayList();
+ incomingList = new ArrayList<>();
client = FlumeUtils.getRpcClient(clientType, hostname, port, batchSize);
}
@@ -104,7 +104,7 @@
return;
}
toFlushList = incomingList;
- incomingList = new ArrayList();
+ incomingList = new ArrayList<>();
}
for (IN value: toFlushList) {
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java
index b9aaa40..4e91310 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java
@@ -45,7 +45,7 @@
public void testNotTableExist() throws IOException {
String masterAddresses = harness.getMasterAddressesAsString();
KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false);
- KuduOutputFormat outputFormat = new KuduOutputFormat<>(masterAddresses, tableInfo, new DefaultSerDe());
+ KuduOutputFormat<KuduRow> outputFormat = new KuduOutputFormat<>(masterAddresses, tableInfo, new DefaultSerDe());
Assertions.assertThrows(UnsupportedOperationException.class, () -> outputFormat.open(0,1));
}
@@ -54,7 +54,7 @@
String masterAddresses = harness.getMasterAddressesAsString();
KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true);
- KuduOutputFormat outputFormat = new KuduOutputFormat<>(masterAddresses, tableInfo, new DefaultSerDe())
+ KuduOutputFormat<KuduRow> outputFormat = new KuduOutputFormat<>(masterAddresses, tableInfo, new DefaultSerDe())
.withStrongConsistency();
outputFormat.open(0,1);
@@ -74,7 +74,7 @@
String masterAddresses = harness.getMasterAddressesAsString();
KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true);
- KuduOutputFormat outputFormat = new KuduOutputFormat<>(masterAddresses, tableInfo, new DefaultSerDe())
+ KuduOutputFormat<KuduRow> outputFormat = new KuduOutputFormat<>(masterAddresses, tableInfo, new DefaultSerDe())
.withEventualConsistency();
outputFormat.open(0,1);
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java
index 83e060d..225bf7c 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java
@@ -58,7 +58,7 @@
public void testNotTableExist() throws IOException {
String masterAddresses = harness.getMasterAddressesAsString();
KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false);
- KuduSink sink = new KuduSink<>(masterAddresses, tableInfo, new DefaultSerDe());
+ KuduSink<KuduRow> sink = new KuduSink<>(masterAddresses, tableInfo, new DefaultSerDe());
sink.setRuntimeContext(context);
Assertions.assertThrows(UnsupportedOperationException.class, () -> sink.open(new Configuration()));
}
@@ -68,7 +68,7 @@
String masterAddresses = harness.getMasterAddressesAsString();
KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true);
- KuduSink sink = new KuduSink<>(masterAddresses, tableInfo, new DefaultSerDe())
+ KuduSink<KuduRow> sink = new KuduSink<>(masterAddresses, tableInfo, new DefaultSerDe())
.withStrongConsistency();
sink.setRuntimeContext(context);
sink.open(new Configuration());
@@ -88,7 +88,7 @@
String masterAddresses = harness.getMasterAddressesAsString();
KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true);
- KuduSink sink = new KuduSink<>(masterAddresses, tableInfo, new DefaultSerDe())
+ KuduSink<KuduRow> sink = new KuduSink<>(masterAddresses, tableInfo, new DefaultSerDe())
.withEventualConsistency();
sink.setRuntimeContext(context);
sink.open(new Configuration());
diff --git a/flink-connector-redis/pom.xml b/flink-connector-redis/pom.xml
index c9b7a63..46d546b 100644
--- a/flink-connector-redis/pom.xml
+++ b/flink-connector-redis/pom.xml
@@ -78,6 +78,7 @@
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
+ <version>4.2.0</version>
<inherited>true</inherited>
<extensions>true</extensions>
</plugin>
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactory.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactory.java
index 88c15eb..6e96345 100644
--- a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactory.java
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactory.java
@@ -61,55 +61,55 @@
case 0:
return setTupleValue(new Tuple0(), row);
case 1:
- return setTupleValue(new Tuple1(), row);
+ return setTupleValue(new Tuple1<>(), row);
case 2:
- return setTupleValue(new Tuple2(), row);
+ return setTupleValue(new Tuple2<>(), row);
case 3:
- return setTupleValue(new Tuple3(), row);
+ return setTupleValue(new Tuple3<>(), row);
case 4:
- return setTupleValue(new Tuple4(), row);
+ return setTupleValue(new Tuple4<>(), row);
case 5:
- return setTupleValue(new Tuple5(), row);
+ return setTupleValue(new Tuple5<>(), row);
case 6:
- return setTupleValue(new Tuple6(), row);
+ return setTupleValue(new Tuple6<>(), row);
case 7:
- return setTupleValue(new Tuple7(), row);
+ return setTupleValue(new Tuple7<>(), row);
case 8:
- return setTupleValue(new Tuple8(), row);
+ return setTupleValue(new Tuple8<>(), row);
case 9:
- return setTupleValue(new Tuple9(), row);
+ return setTupleValue(new Tuple9<>(), row);
case 10:
- return setTupleValue(new Tuple10(), row);
+ return setTupleValue(new Tuple10<>(), row);
case 11:
- return setTupleValue(new Tuple11(), row);
+ return setTupleValue(new Tuple11<>(), row);
case 12:
- return setTupleValue(new Tuple12(), row);
+ return setTupleValue(new Tuple12<>(), row);
case 13:
- return setTupleValue(new Tuple13(), row);
+ return setTupleValue(new Tuple13<>(), row);
case 14:
- return setTupleValue(new Tuple14(), row);
+ return setTupleValue(new Tuple14<>(), row);
case 15:
- return setTupleValue(new Tuple15(), row);
+ return setTupleValue(new Tuple15<>(), row);
case 16:
- return setTupleValue(new Tuple16(), row);
+ return setTupleValue(new Tuple16<>(), row);
case 17:
- return setTupleValue(new Tuple17(), row);
+ return setTupleValue(new Tuple17<>(), row);
case 18:
- return setTupleValue(new Tuple18(), row);
+ return setTupleValue(new Tuple18<>(), row);
case 19:
- return setTupleValue(new Tuple19(), row);
+ return setTupleValue(new Tuple19<>(), row);
case 20:
- return setTupleValue(new Tuple20(), row);
+ return setTupleValue(new Tuple20<>(), row);
case 21:
- return setTupleValue(new Tuple21(), row);
+ return setTupleValue(new Tuple21<>(), row);
case 22:
- return setTupleValue(new Tuple22(), row);
+ return setTupleValue(new Tuple22<>(), row);
case 23:
- return setTupleValue(new Tuple23(), row);
+ return setTupleValue(new Tuple23<>(), row);
case 24:
- return setTupleValue(new Tuple24(), row);
+ return setTupleValue(new Tuple24<>(), row);
case 25:
- return setTupleValue(new Tuple25(), row);
+ return setTupleValue(new Tuple25<>(), row);
default:
throw new IllegalArgumentException("Too long row: " + row.length + ", unable to convert to Tuple");
}
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
index 651288f..9b38539 100755
--- a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
@@ -253,7 +253,7 @@
DataStream<Event> input1 = env.addSource(new RandomEventSource(5), "input1");
DataStream<Event> input2 = env.addSource(new RandomEventSource(5), "input2");
- DataStream<? extends Map> output = SiddhiCEP
+ DataStream<? extends Map<?,?>> output = SiddhiCEP
.define("inputStream1", input1.keyBy("id"), "id", "name", "price", "timestamp")
.union("inputStream2", input2.keyBy("id"), "id", "name", "price", "timestamp")
.cql(
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSchemaTest.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSchemaTest.java
index b9dcac7..73888fe 100644
--- a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSchemaTest.java
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSchemaTest.java
@@ -41,8 +41,8 @@
@Test
public void testStreamSchemaWithTuple() {
- TypeInformation<Tuple4> typeInfo = Types.TUPLE(Types.INT, Types.LONG, Types.STRING, Types.DOUBLE);
- StreamSchema<Tuple4> schema = new StreamSchema<>(typeInfo, "id", "timestamp", "name", "price");
+ TypeInformation<Tuple4<Integer,Long,String,Double>> typeInfo = Types.TUPLE(Types.INT, Types.LONG, Types.STRING, Types.DOUBLE);
+ StreamSchema<Tuple4<Integer,Long,String,Double>> schema = new StreamSchema<>(typeInfo, "id", "timestamp", "name", "price");
assertEquals(Tuple4.class, schema.getTypeInfo().getTypeClass());
assertEquals(4, schema.getFieldIndexes().length);
assertEquals(Tuple4.class, schema.getTypeInfo().getTypeClass());
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/Event.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/Event.java
index 357e1d2..125b771 100644
--- a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/Event.java
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/Event.java
@@ -83,7 +83,7 @@
}
public static TypeSerializer<Event> createTypeSerializer() {
- TypeInformation<Event> typeInformation = (TypeInformation<Event>) TypeExtractor.createTypeInfo(Event.class);
+ TypeInformation<Event> typeInformation = TypeExtractor.createTypeInfo(Event.class);
return typeInformation.createSerializer(new ExecutionConfig());
}
diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactoryTest.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactoryTest.java
index 4753a3f..2d8f04b 100644
--- a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactoryTest.java
+++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactoryTest.java
@@ -27,7 +27,7 @@
@Test
public void testConvertObjectArrayToTuple() {
Object[] row = new Object[]{1, "message", 1234567L, true, new Object()};
- Tuple5 tuple5 = SiddhiTupleFactory.newTuple(row);
+ Tuple5<Integer,String,Long,Boolean,Object> tuple5 = SiddhiTupleFactory.newTuple(row);
assertEquals(5, tuple5.getArity());
assertArrayEquals(row, new Object[]{
tuple5.f0,
diff --git a/pom.xml b/pom.xml
index 9f735d2..111e8a9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -88,8 +88,7 @@
<!-- General project dependencies version -->
<java.version>1.8</java.version>
- <scala.version>2.11.8</scala.version>
-
+ <scala.version>2.11.12</scala.version>
<scala.binary.version>2.11</scala.binary.version>
<slf4j.version>1.7.16</slf4j.version>
@@ -703,7 +702,7 @@
<id>scala-2.11</id>
<activation>
<property>
- <name>!scala-2.12</name>
+ <name>scala-2.11</name>
</property>
</activation>
<properties>