[GRIFFIN-347] Updated with master
diff --git a/.gitignore b/.gitignore
index 93610da..8d3873e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,6 +2,7 @@
*.iml
.idea/
.DS_Store
+.java-version
target/**
# Mobile Tools for Java (J2ME)
@@ -11,6 +12,7 @@
*.jar
*.war
*.ear
+*.cfg
target
service/src/main/resources/public/
ui/.tmp/
@@ -20,7 +22,7 @@
.settings/
.classpath
bin
-
+**/site-packages/**
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
# don't need binary
diff --git a/griffin-doc/deploy/measure-build-guide.md b/griffin-doc/deploy/measure-build-guide.md
new file mode 100644
index 0000000..8914bc6
--- /dev/null
+++ b/griffin-doc/deploy/measure-build-guide.md
@@ -0,0 +1,74 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Apache Griffin Build Guide - Measure Module
+
+Like other modules of Apache Griffin, `measure` module is also built using Maven build tool. Building `measure` module
+requires Maven version 3.5+ and Java 8.
+
+## Version Compatibility
+
+Starting from Apache Griffin 0.7, the `measure` module will be (scala-spark) cross version compatible. Since both Scala
+and Spark are dependencies for Apache Griffin, details of Spark-Scala cross version compatibility is mentioned below,
+
+| | Spark 2.3.x | Spark 2.4.x | Spark 3.0.x |
+| --------- |:-----------:|:-----------:|:-----------:|
+| Scala 2.11| ✓ | ✓ | x |
+| Scala 2.12| x | ✓ | ✓ |
+
+## Building a Distribution
+
+Execute the below commands to build `measure` with desired version of Spark and Scala,
+
+By default, the build is compiled with Scala 2.11 and Apache Spark 2.4.x.
+
+```
+# For measure module with Scala 2.11 and Spark 2.4.x
+mvn clean package
+```
+
+To change Scala or Spark version you can use the commands below,
+
+```
+# For measure module with Scala 2.12 and Spark 2.4.x
+mvn clean package -Dscala-2.12
+```
+
+```
+# For measure module with Scala 2.11 and Spark 2.3.x
+mvn clean package -Dspark-2.3
+```
+
+```
+# For measure module with Scala 2.12 and Spark 3.0.x
+mvn clean package -Dscala-2.12 -Dspark-3.0
+```
+
+Note:
+ - Using `-Dscala-2.12` and `-Dspark-2.3` option together will cause build failure due to missing dependencies as it
+is not cross compiled, see details [here](#version-compatibility)
+ - Using `-Dspark-3.0` option without `-Dscala-2.12` option will cause build failure due to missing dependencies as it
+ is not cross compiled, see details [here](#version-compatibility)
+
+### AVRO Source Support
+
+Starting Spark 2.4.x, AVRO source (`spark-avro` package) was migrated from `com.databricks` group to `org.apache.spark`.
+Additionally, the older dependency does not support scala 2.12.
+
+All builds of `measure` module will contain AVRO source support by default.
diff --git a/griffin-doc/measure/predicates.md b/griffin-doc/measure/predicates.md
index 87f1a0d..f9187b9 100644
--- a/griffin-doc/measure/predicates.md
+++ b/griffin-doc/measure/predicates.md
@@ -17,14 +17,13 @@
under the License.
-->
-#About predicates
+# About predicates
-##Overview
+## Overview
The purpose of predicates is obligate Griffin to check certain conditions before starting SparkSubmitJob.
Depending on these conditions Griffin need to start or not start the measurement.
-##Configure predicates
-
+## Configure predicates
For configuring predicates need add property to measure json:
```
{
@@ -74,10 +73,10 @@
- implement interface **org.apache.griffin.core.job.Predicator**
- have constructor with argument of type **org.apache.griffin.core.job.entity.SegmentPredicate**
-##Deployment custom predicates
+## Deployment custom predicates
For the creating custom predicate you need
1. Build the Griffin service using command
-As a result, two artifacts will be built
+As a result, two artifacts will be built,
- **service-VERSION.jar** - executable Spring-Boot application
- **service-VERSION-lib.jar** - jar, which we can use as a dependency
This step is necessary because we can't use executable Spring-Boot application as a dependency in our plugin.
diff --git a/measure/assembly.xml b/measure/assembly.xml
deleted file mode 100644
index 7890497..0000000
--- a/measure/assembly.xml
+++ /dev/null
@@ -1,64 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
- <id>package</id>
- <formats>
- <format>tar.gz</format>
- </formats>
- <fileSets>
- <fileSet>
- <directory>${project.basedir}/sbin</directory>
- <outputDirectory>/bin</outputDirectory>
- <includes>
- <include>/**</include>
- </includes>
- <lineEnding>unix</lineEnding>
- <fileMode>0777</fileMode>
- <directoryMode>0755</directoryMode>
- </fileSet>
- <fileSet>
- <directory>${project.basedir}/src/main/resources</directory>
- <outputDirectory>/conf</outputDirectory>
- <includes>
- <include>/**</include>
- </includes>
- </fileSet>
- </fileSets>
-
- <dependencySets>
- <dependencySet>
- <useProjectArtifact>true</useProjectArtifact>
- <outputDirectory>/lib</outputDirectory>
- <unpack>false</unpack>
- <scope>provided</scope>
- <excludes>
- <exclude>com.twitter:parquet-hadoop-bundle</exclude>
- <exclude>io.dropwizard.metrics:*</exclude>
- <exclude>org.glassfish.jersey.core:*</exclude>
- <exclude>org.glassfish.jersey.containers:*</exclude>
- <exclude>org.apache.thrift:*</exclude>
- <exclude>org.apache.parquet:*</exclude>
- <exclude>org.apache.hadoop:*</exclude>
- <exclude>org.apache.spark:*</exclude>
- </excludes>
- </dependencySet>
- </dependencySets>
-</assembly>
diff --git a/measure/pom.xml b/measure/pom.xml
index a1d8f8e..3cd8bc6 100644
--- a/measure/pom.xml
+++ b/measure/pom.xml
@@ -24,297 +24,314 @@
<parent>
<groupId>org.apache.griffin</groupId>
<artifactId>griffin</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.7.0-SNAPSHOT</version>
</parent>
<artifactId>measure</artifactId>
<packaging>jar</packaging>
- <name>Apache Griffin :: Measures</name>
- <url>http://maven.apache.org</url>
+ <name>Apache Griffin :: Measure</name>
+ <url>http://griffin.apache.org</url>
<properties>
- <scala.version>2.11.8</scala.version>
- <spark.version>2.2.1</spark.version>
- <scala.binary.version>2.11</scala.binary.version>
- <avro.version>1.7.7</avro.version>
- <jackson.version>2.8.7</jackson.version>
+
+ <!-- Spark -->
+ <spark.major.version>2.4</spark.major.version>
+ <spark.version>${spark.major.version}.4</spark.version>
+ <spark.scope>${provided.scope}</spark.scope>
+ <spark-streaming-kafka.version>${spark.version}</spark-streaming-kafka.version>
+
+ <!-- Code Standardization -->
+ <scalastyle.version>1.0.0</scalastyle.version>
+ <scalafmt.version>1.0.3</scalafmt.version>
+
+ <!-- Data Connectors -->
+ <mysql.java.version>5.1.47</mysql.java.version>
+ <mariadb.version>2.7.0</mariadb.version>
+ <cassandra.connector.version>2.5.1</cassandra.connector.version>
+ <elasticsearch.version>6.4.1</elasticsearch.version>
<scalaj.version>2.3.0</scalaj.version>
<mongo.version>2.1.0</mongo.version>
- <scalatest.version>3.0.0</scalatest.version>
- <slf4j.version>1.7.21</slf4j.version>
- <log4j.version>1.2.16</log4j.version>
<curator.version>2.10.0</curator.version>
- <mockito.version>1.10.19</mockito.version>
- <mysql.java.version>5.1.47</mysql.java.version>
- <cassandra.connector.version>2.4.1</cassandra.connector.version>
- <scalastyle.version>1.0.0</scalastyle.version>
- <scalafmt.parameters>--diff --test</scalafmt.parameters>
- <scalafmt.skip>false</scalafmt.skip>
- <elasticsearch.version>6.4.1</elasticsearch.version>
- <spark.scope>provided</spark.scope>
+
+ <!-- Testing -->
+ <scoverage.plugin.version>1.4.0</scoverage.plugin.version>
+ <mockito.version>3.2.2.0</mockito.version>
+ <scalatest.version>3.2.3</scalatest.version>
</properties>
<dependencies>
- <!--scala-->
+
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
- <!--spark, spark streaming, spark hive-->
+ <!-- Spark Dependencies -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>${spark.scope}</scope>
</dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_${scala.binary.version}</artifactId>
- <version>${spark.version}</version>
- <scope>${spark.scope}</scope>
- </dependency>
+
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>${spark.scope}</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>${spark.scope}</scope>
+ </dependency>
+
+ <!-- Data Connectors -->
+
+ <!-- Spark Hive-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>${spark.scope}</scope>
+ </dependency>
+
+ <!-- Spark Kafka-->
+ <!-- TODO
+ Spark Streaming Kafka 08 is not cross compiled with scala 2.12.
+ Thus, this dependency has been temporary un-banned in the enforcer plugin.
+ This will be removed with structured streaming implementation
+ -->
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-kafka-0-8_${scala211.binary.version}</artifactId>
+ <version>${spark-streaming-kafka.version}</version>
<exclusions>
<exclusion>
- <groupId>commons-httpclient</groupId>
- <artifactId>commons-httpclient</artifactId>
+ <groupId>org.scala-lang.modules</groupId>
+ <artifactId>scala-xml_2.11</artifactId>
</exclusion>
<exclusion>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
+ <groupId>org.scala-lang.modules</groupId>
+ <artifactId>scala-parser-combinators_2.11</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-tags_2.11</artifactId>
</exclusion>
</exclusions>
</dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-kafka-0-8_${scala.binary.version}</artifactId>
- <version>${spark.version}</version>
- </dependency>
- <!--jackson-->
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- <version>${jackson.version}</version>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.module</groupId>
- <artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
- <version>${jackson.version}</version>
- </dependency>
-
- <!--scalaj for http request-->
- <dependency>
- <groupId>org.scalaj</groupId>
- <artifactId>scalaj-http_${scala.binary.version}</artifactId>
- <version>${scalaj.version}</version>
- </dependency>
-
- <!--mongo request-->
+ <!-- MongoDB-->
<dependency>
<groupId>org.mongodb.scala</groupId>
- <artifactId>mongo-scala-driver_2.11</artifactId>
+ <artifactId>mongo-scala-driver_${scala.binary.version}</artifactId>
<version>${mongo.version}</version>
</dependency>
- <!--avro-->
+ <!-- MySql -->
<dependency>
- <groupId>com.databricks</groupId>
- <artifactId>spark-avro_${scala.binary.version}</artifactId>
- <version>4.0.0</version>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ <version>${mysql.java.version}</version>
</dependency>
- <!--log4j-->
+ <!-- MariaDB -->
<dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>${slf4j.version}</version>
+ <groupId>org.mariadb.jdbc</groupId>
+ <artifactId>mariadb-java-client</artifactId>
+ <version>${mariadb.version}</version>
</dependency>
+ <!-- Spark Cassandra-->
<dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>${slf4j.version}</version>
- </dependency>
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <version>${log4j.version}</version>
+ <groupId>com.datastax.spark</groupId>
+ <artifactId>spark-cassandra-connector_${scala.binary.version}</artifactId>
+ <version>${cassandra.connector.version}</version>
</dependency>
- <!--curator-->
+ <!-- Spark Elasticsearch-->
+ <!-- TODO
+ Elasticsearch Spark has recently added support for scala 2.12 but it is not published in the Maven Central yet.
+ Thus, this dependency has been temporary un-banned in the enforcer plugin.
+ See https://github.com/elastic/elasticsearch-hadoop/pull/1589#issuecomment-776920189
+ -->
+ <dependency>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch-spark-20_${scala211.binary.version}</artifactId>
+ <version>${elasticsearch.version}</version>
+ </dependency>
+
+ <!-- Miscellaneous-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>${curator.version}</version>
</dependency>
- <!--junit-->
+ <dependency>
+ <groupId>org.scalaj</groupId>
+ <artifactId>scalaj-http_${scala.binary.version}</artifactId>
+ <version>${scalaj.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.5.9</version>
+ </dependency>
+
+ <!-- Test -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
+ <scope>test</scope>
</dependency>
- <!--scala test-->
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<version>${scalatest.version}</version>
<scope>test</scope>
</dependency>
+
<dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- <version>${mockito.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
+ <groupId>org.scalatestplus</groupId>
+ <artifactId>mockito-3-4_${scala.binary.version}</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>${mysql.java.version}</version>
- </dependency>
- <dependency>
- <groupId>com.datastax.spark</groupId>
- <artifactId>spark-cassandra-connector_2.11</artifactId>
- <version>${cassandra.connector.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
- <version>4.5.9</version>
- </dependency>
- <dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>1.4.200</version>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.elasticsearch</groupId>
- <artifactId>elasticsearch-spark-20_2.11</artifactId>
- <version>${elasticsearch.version}</version>
- </dependency>
+
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>elasticsearch</artifactId>
<version>1.14.3</version>
<scope>test</scope>
</dependency>
+
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
+
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
- <version>3.3.1</version>
+ <version>4.3.0</version>
<executions>
<execution>
- <id>compile</id>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
- <phase>compile</phase>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
+ <checkMultipleScalaVersions>true</checkMultipleScalaVersions>
+ <failOnMultipleScalaVersions>true</failOnMultipleScalaVersions>
+ <recompileMode>incremental</recompileMode>
<args>
+ <arg>-unchecked</arg>
<arg>-deprecation</arg>
<arg>-feature</arg>
- <arg>-unchecked</arg>
+ <arg>-explaintypes</arg>
</args>
+ <jvmArgs>
+ <jvmArg>-Xms64m</jvmArg>
+ <jvmArg>-Xmx1024m</jvmArg>
+ </jvmArgs>
+ <javacArgs>
+ <javacArg>-source</javacArg>
+ <javacArg>-target</javacArg>
+ <javacArg>${java.version}</javacArg>
+ <javacArg>-Xlint:all,-serial,-path,-try</javacArg>
+ </javacArgs>
</configuration>
</plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- </plugin>
+
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <testFailureIgnore>false</testFailureIgnore>
+ <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+ <junitxml>.</junitxml>
+ <filereports>TestSuiteReport.txt</filereports>
+ <stderr/>
+ <systemProperties>
+ <java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
+ </systemProperties>
+ </configuration>
+ <executions>
+ <execution>
+ <id>test</id>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ </execution>
+ </executions>
</plugin>
+
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
- <version>3.1.0</version>
- <configuration>
- <createDependencyReducedPom>false</createDependencyReducedPom>
- </configuration>
+ <version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
- <configuration>
- <transformers>
- <transformer
- implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
- <mainClass>org.apache.griffin.measure.Application</mainClass>
- </transformer>
- </transformers>
- <relocations>
- <relocation>
- <pattern>org.apache.http</pattern>
- <shadedPattern>griffin.org.apache.http</shadedPattern>
- </relocation>
- </relocations>
- <filters>
- <filter>
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/maven/**</exclude>
- </excludes>
- </filter>
- </filters>
- </configuration>
</execution>
</executions>
+ <configuration>
+ <createDependencyReducedPom>false</createDependencyReducedPom>
+ <minimizeJar>true</minimizeJar>
+ <relocations>
+ <relocation>
+ <pattern>org.apache.http</pattern>
+ <shadedPattern>griffin.org.apache.http</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
</plugin>
+
<plugin>
<groupId>org.antipathy</groupId>
<artifactId>mvn-scalafmt_${scala.binary.version}</artifactId>
- <version>0.12_1.5.1</version>
+ <version>${scalafmt.version}</version>
<configuration>
- <parameters>${scalafmt.parameters}</parameters>
- <skip>${scalafmt.skip}</skip>
- <skipSources>${scalafmt.skip}</skipSources>
- <skipTestSources>${scalafmt.skip}</skipTestSources>
<configLocation>${project.parent.basedir}/.scalafmt.conf</configLocation>
+ <skipTestSources>false</skipTestSources>
+ <skipSources>false</skipSources>
+ <respectVersion>false</respectVersion>
+ <validateOnly>false</validateOnly>
</configuration>
<executions>
<execution>
- <phase>validate</phase>
+ <phase>compile</phase>
<goals>
<goal>format</goal>
</goals>
</execution>
</executions>
</plugin>
+
<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
@@ -333,21 +350,81 @@
</configuration>
<executions>
<execution>
- <phase>validate</phase>
+ <phase>compile</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
+
+ <plugin>
+ <groupId>org.jacoco</groupId>
+ <artifactId>jacoco-maven-plugin</artifactId>
+ <version>0.8.2</version>
+ <executions>
+ <execution>
+ <id>default-prepare-agent</id>
+ <goals>
+ <goal>prepare-agent</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>report</id>
+ <phase>test</phase>
+ <goals>
+ <goal>report</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.scoverage</groupId>
+ <artifactId>scoverage-maven-plugin</artifactId>
+ <version>${scoverage.plugin.version}</version>
+ <configuration>
+ <scalaVersion>${scala.version}</scalaVersion>
+ <aggregate>true</aggregate>
+ <highlighting>true</highlighting>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>3.2.0</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <finalName>${project.artifactId}-${project.version}</finalName>
+ <classifier>spark-${spark.major.version}_scala-${scala.binary.version}</classifier>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
+ <version>3.3.0</version>
<configuration>
- <descriptors>
- <descriptor>assembly.xml</descriptor>
- </descriptors>
- <finalName>${artifactId}-${project.version}</finalName>
+ <tarLongFileMode>posix</tarLongFileMode>
+ <finalName>
+ ${project.artifactId}-${project.version}-spark-${spark.major.version}_scala-${scala.binary.version}
+ </finalName>
+ <archive>
+ <manifest>
+ <mainClass>org.apache.griffin.measure.Application</mainClass>
+ </manifest>
+ </archive>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
</configuration>
<executions>
<execution>
@@ -359,6 +436,172 @@
</execution>
</executions>
</plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ <version>3.0.0-M2</version>
+ <executions>
+ <execution>
+ <id>enforce-versions</id>
+ <goals>
+ <goal>enforce</goal>
+ </goals>
+ <configuration>
+ <rules>
+ <requireJavaVersion>
+ <version>${java.version}</version>
+ </requireJavaVersion>
+ <bannedDependencies>
+ <excludes>
+ <exclude>*:*_2.10</exclude>
+ <exclude>*:*_2.12</exclude>
+ <exclude>*:*_2.13</exclude>
+ </excludes>
+ <searchTransitive>true</searchTransitive>
+ </bannedDependencies>
+ </rules>
+ </configuration>
+ </execution>
+ <execution>
+ <id>enforce-no-duplicate-dependencies</id>
+ <goals>
+ <goal>enforce</goal>
+ </goals>
+ <configuration>
+ <rules>
+ <banDuplicatePomDependencyVersions/>
+ </rules>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
+
+ <profiles>
+ <!-- Scala 2.12 -->
+ <profile>
+ <id>scala-2.12</id>
+ <activation>
+ <property>
+ <name>scala-2.12</name>
+ </property>
+ </activation>
+
+ <properties>
+ <scala.binary.version>2.12</scala.binary.version>
+ <scala.version>${scala.binary.version}.12</scala.version>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>enforce-versions</id>
+ <goals>
+ <goal>enforce</goal>
+ </goals>
+ <configuration>
+ <rules>
+ <requireJavaVersion>
+ <version>${java.version}</version>
+ </requireJavaVersion>
+ <bannedDependencies>
+ <!-- TODO Temporary Fix: These will be removed in a future iteration. -->
+ <includes>
+ <include>org.elasticsearch:elasticsearch-spark-20_2.11:*</include>
+ <include>org.apache.spark:spark-streaming-kafka-0-8_2.11:*</include>
+ <include>org.apache.kafka:kafka_2.11:*</include>
+ </includes>
+ <excludes>
+ <exclude>*:*_2.10</exclude>
+ <exclude>*:*_2.11</exclude>
+ <exclude>*:*_2.13</exclude>
+ </excludes>
+ <searchTransitive>true</searchTransitive>
+ </bannedDependencies>
+ </rules>
+ </configuration>
+ </execution>
+ <execution>
+ <id>enforce-no-duplicate-dependencies</id>
+ <goals>
+ <goal>enforce</goal>
+ </goals>
+ <configuration>
+ <rules>
+ <banDuplicatePomDependencyVersions/>
+ </rules>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+
+ <!-- Spark 2.3.x -->
+ <profile>
+ <id>spark-2.3</id>
+ <activation>
+ <property>
+ <name>spark-2.3</name>
+ </property>
+ </activation>
+
+ <properties>
+ <spark.major.version>2.3</spark.major.version>
+ <spark.version>${spark.major.version}.4</spark.version>
+ </properties>
+
+ <dependencies>
+ <!-- Old Spark Avro-->
+ <dependency>
+ <groupId>com.databricks</groupId>
+ <artifactId>spark-avro_${scala211.binary.version}</artifactId>
+ <version>4.0.0</version>
+ </dependency>
+ </dependencies>
+ </profile>
+
+ <!-- Spark 3.0.x -->
+ <profile>
+ <id>spark-3.0</id>
+ <activation>
+ <property>
+ <name>spark-3.0</name>
+ </property>
+ </activation>
+
+ <properties>
+ <spark.major.version>3.0</spark.major.version>
+ <spark.version>${spark.major.version}.2</spark.version>
+ <spark-streaming-kafka.version>2.4.7</spark-streaming-kafka.version>
+ </properties>
+ </profile>
+
+ <!-- Spark 2.4+ Avro -->
+ <profile>
+ <id>spark-avro-2.4</id>
+ <activation>
+ <property>
+ <name>!spark-2.3</name>
+ </property>
+ </activation>
+
+ <dependencies>
+ <!-- New Spark Avro-->
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-avro_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
+
+ </profiles>
</project>
diff --git a/measure/src/main/resources/env-batch.json b/measure/src/main/resources/env-batch.json
index bbec4e5..fd9261d 100644
--- a/measure/src/main/resources/env-batch.json
+++ b/measure/src/main/resources/env-batch.json
@@ -7,14 +7,14 @@
},
"sinks": [
{
- "name": "consoleSink",
+ "name": "console",
"type": "CONSOLE",
"config": {
"max.log.lines": 10
}
},
{
- "name": "hdfsSink",
+ "name": "hdfs",
"type": "HDFS",
"config": {
"path": "hdfs://localhost/griffin/batch/persist",
@@ -23,7 +23,7 @@
}
},
{
- "name": "elasticSink",
+ "name": "elasticsearch",
"type": "ELASTICSEARCH",
"config": {
"method": "post",
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamHttpReader.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamHttpReader.scala
new file mode 100644
index 0000000..e7c19a8
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamHttpReader.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.griffin.measure.configuration.dqdefinition.reader
+
+import scala.reflect.ClassTag
+import scala.util.Try
+
+import org.apache.griffin.measure.configuration.dqdefinition.Param
+import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil}
+
+/**
+ * read params by http url
+ *
+ * @param httpUrl
+ */
+case class ParamHttpReader(httpUrl: String) extends ParamReader {
+
+ def readConfig[T <: Param](implicit m: ClassTag[T]): Try[T] = {
+ Try {
+ val params = Map[String, Object]()
+ val headers = Map[String, Object](("Content-Type", "application/json"))
+ val jsonString = HttpUtil.doHttpRequest(httpUrl, "get", params, headers, null)._2
+ val param = JsonUtil.fromJson[T](jsonString)
+ validate(param)
+ }
+ }
+}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReaderFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReaderFactory.scala
index 5067a9d..f4bfd28 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReaderFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReaderFactory.scala
@@ -23,6 +23,7 @@
val json = "json"
val file = "file"
+ val httpRegex = "^http[s]?://.*"
/**
* parse string content to get param reader
@@ -30,9 +31,13 @@
* @return
*/
def getParamReader(pathOrJson: String): ParamReader = {
- val strType = paramStrType(pathOrJson)
- if (json.equals(strType)) ParamJsonReader(pathOrJson)
- else ParamFileReader(pathOrJson)
+ if (pathOrJson.matches(httpRegex)) {
+ ParamHttpReader(pathOrJson)
+ } else {
+ val strType = paramStrType(pathOrJson)
+ if (json.equals(strType)) ParamJsonReader(pathOrJson)
+ else ParamFileReader(pathOrJson)
+ }
}
private def paramStrType(str: String): String = {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
index 04775c7..643ff5e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
@@ -184,14 +184,15 @@
}
// new cache data
- val newDfOpt = try {
- val dfr = sparkSession.read
- readDataFrameOpt(dfr, newFilePath).map(_.filter(filterStr))
- } catch {
- case e: Throwable =>
- warn(s"read data source cache warn: ${e.getMessage}")
- None
- }
+ val newDfOpt =
+ try {
+ val dfr = sparkSession.read
+ readDataFrameOpt(dfr, newFilePath).map(_.filter(filterStr))
+ } catch {
+ case e: Throwable =>
+ warn(s"read data source cache warn: ${e.getMessage}")
+ None
+ }
// old cache data
val oldCacheIndexOpt = if (updatable) readOldCacheIndex() else None
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
index 39827cf..0b08dd4 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
@@ -32,8 +32,15 @@
object DataConnectorFactory extends Loggable {
- @deprecated val AvroRegex: Regex = """^(?i)avro$""".r
- @deprecated val TextDirRegex: Regex = """^(?i)text-dir$""".r
+ @deprecated(
+ s"This class is deprecated. Use '${classOf[FileBasedDataConnector].getCanonicalName}' with correct format.",
+ "0.6.0")
+ val AvroRegex: Regex = """^(?i)avro$""".r
+
+ @deprecated(
+ s"This class is deprecated. Use '${classOf[FileBasedDataConnector].getCanonicalName}' with correct format.",
+ "0.6.0")
+ val TextDirRegex: Regex = """^(?i)text-dir$""".r
val HiveRegex: Regex = """^(?i)hive$""".r
val FileRegex: Regex = """^(?i)file$""".r
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/CassandraDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/CassandraDataConnector.scala
index d135f3b..2fd2912 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/CassandraDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/CassandraDataConnector.scala
@@ -52,33 +52,34 @@
override def data(ms: Long): (Option[DataFrame], TimeRange) = {
- val dfOpt = try {
- sparkSession.conf.set("spark.cassandra.connection.host", host)
- sparkSession.conf.set("spark.cassandra.connection.port", port)
- sparkSession.conf.set("spark.cassandra.auth.username", user)
- sparkSession.conf.set("spark.cassandra.auth.password", password)
+ val dfOpt =
+ try {
+ sparkSession.conf.set("spark.cassandra.connection.host", host)
+ sparkSession.conf.set("spark.cassandra.connection.port", port)
+ sparkSession.conf.set("spark.cassandra.auth.username", user)
+ sparkSession.conf.set("spark.cassandra.auth.password", password)
- val tableDef: DataFrameReader = sparkSession.read
- .format("org.apache.spark.sql.cassandra")
- .options(Map("table" -> tableName, "keyspace" -> database))
+ val tableDef: DataFrameReader = sparkSession.read
+ .format("org.apache.spark.sql.cassandra")
+ .options(Map("table" -> tableName, "keyspace" -> database))
- val dataWh: String = dataWhere()
+ val dataWh: String = dataWhere()
- var data: DataFrame = null
- if (wheres.length > 0) {
- data = tableDef.load().where(dataWh)
- } else {
- data = tableDef.load()
+ var data: DataFrame = null
+ if (wheres.length > 0) {
+ data = tableDef.load().where(dataWh)
+ } else {
+ data = tableDef.load()
+ }
+
+ val dfOpt = Some(data)
+ val preDfOpt = preProcess(dfOpt, ms)
+ preDfOpt
+ } catch {
+ case e: Throwable =>
+ error(s"load cassandra table $database.$TableName fails: ${e.getMessage}", e)
+ None
}
-
- val dfOpt = Some(data)
- val preDfOpt = preProcess(dfOpt, ms)
- preDfOpt
- } catch {
- case e: Throwable =>
- error(s"load cassandra table $database.$TableName fails: ${e.getMessage}", e)
- None
- }
val tmsts = readTmst(ms)
(dfOpt, TimeRange(ms, tmsts))
}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala
index dfdacc8..597695e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala
@@ -77,23 +77,24 @@
def dataBySql(ms: Long): (Option[DataFrame], TimeRange) = {
val path: String = s"/_sql?format=csv"
info(s"ElasticSearchGriffinDataConnector data : sql: $sql")
- val dfOpt = try {
- val answer = httpPost(path, sql)
- if (answer._1) {
- import sparkSession.implicits._
- val rdd: RDD[String] = sparkSession.sparkContext.parallelize(answer._2.lines.toList)
- val reader: DataFrameReader = sparkSession.read
- reader.option("header", true).option("inferSchema", true)
- val df: DataFrame = reader.csv(rdd.toDS())
- val dfOpt = Some(df)
- val preDfOpt = preProcess(dfOpt, ms)
- preDfOpt
- } else None
- } catch {
- case e: Throwable =>
- error(s"load ES by sql $host:$port $sql fails: ${e.getMessage}", e)
- None
- }
+ val dfOpt =
+ try {
+ val answer = httpPost(path, sql)
+ if (answer._1) {
+ import sparkSession.implicits._
+ val rdd: RDD[String] = sparkSession.sparkContext.parallelize(answer._2.lines.toList)
+ val reader: DataFrameReader = sparkSession.read
+ reader.option("header", true).option("inferSchema", true)
+ val df: DataFrame = reader.csv(rdd.toDS())
+ val dfOpt = Some(df)
+ val preDfOpt = preProcess(dfOpt, ms)
+ preDfOpt
+ } else None
+ } catch {
+ case e: Throwable =>
+ error(s"load ES by sql $host:$port $sql fails: ${e.getMessage}", e)
+ None
+ }
val tmsts = readTmst(ms)
(dfOpt, TimeRange(ms, tmsts))
}
@@ -102,44 +103,44 @@
val path: String = s"/$index/$dataType/_search?sort=tmst:desc&q=name:$metricName&size=$size"
info(s"ElasticSearchGriffinDataConnector data : host: $host port: $port path:$path")
- val dfOpt = try {
- val answer = httpGet(path)
- val data = ArrayBuffer[Map[String, Number]]()
+ val dfOpt =
+ try {
+ val answer = httpGet(path)
+ val data = ArrayBuffer[Map[String, Number]]()
- if (answer._1) {
- val arrayAnswers: util.Iterator[JsonNode] =
- parseString(answer._2).get("hits").get("hits").elements()
+ if (answer._1) {
+ val arrayAnswers: util.Iterator[JsonNode] =
+ parseString(answer._2).get("hits").get("hits").elements()
- while (arrayAnswers.hasNext) {
- val answer = arrayAnswers.next()
- val values = answer.get("_source").get("value")
- val fields: util.Iterator[util.Map.Entry[String, JsonNode]] = values.fields()
- val fieldsMap = mutable.Map[String, Number]()
- while (fields.hasNext) {
- val fld: util.Map.Entry[String, JsonNode] = fields.next()
- fieldsMap.put(fld.getKey, fld.getValue.numberValue())
+ while (arrayAnswers.hasNext) {
+ val answer = arrayAnswers.next()
+ val values = answer.get("_source").get("value")
+ val fields: util.Iterator[util.Map.Entry[String, JsonNode]] = values.fields()
+ val fieldsMap = mutable.Map[String, Number]()
+ while (fields.hasNext) {
+ val fld: util.Map.Entry[String, JsonNode] = fields.next()
+ fieldsMap.put(fld.getKey, fld.getValue.numberValue())
+ }
+ data += fieldsMap.toMap
}
- data += fieldsMap.toMap
}
+ val rdd1: RDD[Map[String, Number]] = sparkSession.sparkContext.parallelize(data)
+ val columns: Array[String] = fields.toArray
+ val defaultNumber: Number = 0.0
+ val rdd: RDD[Row] = rdd1
+ .map { x: Map[String, Number] =>
+ Row(columns.map(c => x.getOrElse(c, defaultNumber).doubleValue()): _*)
+ }
+ val schema = dfSchema(columns.toList)
+ val df: DataFrame = sparkSession.createDataFrame(rdd, schema).limit(size)
+ val dfOpt = Some(df)
+ val preDfOpt = preProcess(dfOpt, ms)
+ preDfOpt
+ } catch {
+ case e: Throwable =>
+ error(s"load ES table $host:$port $index/$dataType fails: ${e.getMessage}", e)
+ None
}
- val rdd1: RDD[Map[String, Number]] = sparkSession.sparkContext.parallelize(data)
- val columns: Array[String] = fields.toArray
- val defaultNumber: Number = 0.0
- val rdd: RDD[Row] = rdd1
- .map { x: Map[String, Number] =>
- Row(columns.map(c => x.getOrElse(c, defaultNumber).doubleValue()): _*)
- }
- val schema = dfSchema(columns.toList)
- val df: DataFrame = sparkSession.createDataFrame(rdd, schema).limit(size)
- df.show(20)
- val dfOpt = Some(df)
- val preDfOpt = preProcess(dfOpt, ms)
- preDfOpt
- } catch {
- case e: Throwable =>
- error(s"load ES table $host:$port $index/$dataType fails: ${e.getMessage}", e)
- None
- }
val tmsts = readTmst(ms)
(dfOpt, TimeRange(ms, tmsts))
}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnector.scala
index 86049bd..2e4f482 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnector.scala
@@ -79,7 +79,14 @@
SupportedFormats.contains(format),
s"Invalid format '$format' specified. Must be one of ${SupportedFormats.mkString("['", "', '", "']")}")
- if (format == "csv") validateCSVOptions()
+ if (format.equalsIgnoreCase("avro") && sparkSession.version < "2.3.0") {
+ format = "com.databricks.spark.avro"
+ }
+
+ if (format == "csv") {
+ validateCSVOptions()
+ }
+
if (format == "tsv") {
format = "csv"
options.getOrElseUpdate(Delimiter, TabDelimiter)
@@ -169,7 +176,8 @@
private val TabDelimiter: String = "\t"
private val DefaultFormat: String = SQLConf.DEFAULT_DATA_SOURCE_NAME.defaultValueString
- private val SupportedFormats: Seq[String] = Seq("parquet", "orc", "avro", "text", "csv", "tsv")
+ private val SupportedFormats: Seq[String] =
+ Seq("parquet", "orc", "avro", "text", "csv", "tsv", "com.databricks.spark.avro")
/**
* Validates the existence of paths in a given sequence.
@@ -189,7 +197,7 @@
else throw new IllegalArgumentException(msg)
false
- })
+ })
assert(validPaths.nonEmpty, "No paths were given for the data source.")
validPaths
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnector.scala
index 9fc21d7..2f77db3 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnector.scala
@@ -69,27 +69,28 @@
assert(isJDBCDriverLoaded(driver), s"JDBC driver $driver not present in classpath")
override def data(ms: Long): (Option[DataFrame], TimeRange) = {
- val dfOpt = try {
- val dtSql = createSqlStmt()
- val prop = new java.util.Properties
- prop.setProperty("user", user)
- prop.setProperty("password", password)
- prop.setProperty("driver", driver)
- val dfOpt = Try(sparkSession.read.jdbc(url, s"($dtSql) as t", prop))
+ val dfOpt =
+ try {
+ val dtSql = createSqlStmt()
+ val prop = new java.util.Properties
+ prop.setProperty("user", user)
+ prop.setProperty("password", password)
+ prop.setProperty("driver", driver)
+ val dfOpt = Try(sparkSession.read.jdbc(url, s"($dtSql) as t", prop))
- dfOpt match {
- case Success(_) =>
- case Failure(exception) =>
- griffinLogger.error("Error occurred while reading data set.", exception)
+ dfOpt match {
+ case Success(_) =>
+ case Failure(exception) =>
+ griffinLogger.error("Error occurred while reading data set.", exception)
+ }
+
+ val preDfOpt = preProcess(dfOpt.toOption, ms)
+ preDfOpt
+ } catch {
+ case e: Throwable =>
+ error(s"loading table $fullTableName fails: ${e.getMessage}", e)
+ None
}
-
- val preDfOpt = preProcess(dfOpt.toOption, ms)
- preDfOpt
- } catch {
- case e: Throwable =>
- error(s"loading table $fullTableName fails: ${e.getMessage}", e)
- None
- }
val tmsts = readTmst(ms)
(dfOpt, TimeRange(ms, tmsts))
}
@@ -99,7 +100,7 @@
*/
private def createSqlStmt(): String = {
val tableClause = s"SELECT * FROM $fullTableName"
- if (whereString.length > 0) {
+ if (whereString.nonEmpty) {
s"$tableClause WHERE $whereString"
} else tableClause
}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala
index feacfc9..26321fc 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala
@@ -52,21 +52,22 @@
override def data(ms: Long): (Option[DataFrame], TimeRange) = {
- val dfOpt = try {
- val dtSql = dataSql()
- val prop = new java.util.Properties
- prop.setProperty("user", user)
- prop.setProperty("password", password)
- prop.setProperty("driver", driver)
- val df: DataFrame = sparkSession.read.jdbc(url, s"($dtSql) as t", prop)
- val dfOpt = Some(df)
- val preDfOpt = preProcess(dfOpt, ms)
- preDfOpt
- } catch {
- case e: Throwable =>
- error(s"load mysql table $fullTableName fails: ${e.getMessage}", e)
- None
- }
+ val dfOpt =
+ try {
+ val dtSql = dataSql()
+ val prop = new java.util.Properties
+ prop.setProperty("user", user)
+ prop.setProperty("password", password)
+ prop.setProperty("driver", driver)
+ val df: DataFrame = sparkSession.read.jdbc(url, s"($dtSql) as t", prop)
+ val dfOpt = Some(df)
+ val preDfOpt = preProcess(dfOpt, ms)
+ preDfOpt
+ } catch {
+ case e: Throwable =>
+ error(s"load mysql table $fullTableName fails: ${e.getMessage}", e)
+ None
+ }
val tmsts = readTmst(ms)
(dfOpt, TimeRange(ms, tmsts))
}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala
index 0c501dc..b0be557 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala
@@ -51,23 +51,24 @@
}
ds.foreachRDD((rdd, time) => {
val ms = time.milliseconds
- val saveDfOpt = try {
- // coalesce partition number
- val prlCount = rdd.sparkContext.defaultParallelism
- val ptnCount = rdd.getNumPartitions
- val repartitionedRdd = if (prlCount < ptnCount) {
- rdd.coalesce(prlCount)
- } else rdd
+ val saveDfOpt =
+ try {
+ // coalesce partition number
+ val prlCount = rdd.sparkContext.defaultParallelism
+ val ptnCount = rdd.getNumPartitions
+ val repartitionedRdd = if (prlCount < ptnCount) {
+ rdd.coalesce(prlCount)
+ } else rdd
- val dfOpt = transform(repartitionedRdd)
+ val dfOpt = transform(repartitionedRdd)
- // pre-process
- preProcess(dfOpt, ms)
- } catch {
- case e: Throwable =>
- error(s"streaming data connector error: ${e.getMessage}")
- None
- }
+ // pre-process
+ preProcess(dfOpt, ms)
+ } catch {
+ case e: Throwable =>
+ error(s"streaming data connector error: ${e.getMessage}")
+ None
+ }
// save data frame
streamingCacheClientOpt.foreach(_.saveData(saveDfOpt, ms))
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
index d8cce41..3134b44 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
@@ -63,7 +63,8 @@
def func(): (Long, Future[Boolean]) = {
import scala.concurrent.ExecutionContext.Implicits.global
- (timeStamp, Future(HttpUtil.doHttpRequest(api, method, params, header, data)))
+ val code = HttpUtil.doHttpRequest(api, method, params, header, data)._1
+ (timeStamp, Future(code >= 200 && code < 300))
}
if (block) SinkTaskRunner.addBlockTask(func _, retry, connectionTimeout)
else SinkTaskRunner.addNonBlockTask(func _, retry)
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/BasicAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/BasicAnalyzer.scala
index 743bba9..9aa27f7 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/BasicAnalyzer.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/BasicAnalyzer.scala
@@ -43,7 +43,7 @@
v :+ se
case _ => v
}
- }
+ }
val combSelectionExprs: (Seq[SelectionExpr], Seq[SelectionExpr]) => Seq[SelectionExpr] =
(a: Seq[SelectionExpr], b: Seq[SelectionExpr]) => a ++ b
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
index 4f4bf99..980bded 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
@@ -33,20 +33,21 @@
def execute(context: DQContext): Try[Boolean] = Try {
context.metricWrapper.flush.foldLeft(true) { (ret, pair) =>
val (t, metric) = pair
- val pr = try {
- context.getSinks(t).foreach { sink =>
- try {
- sink.sinkMetrics(metric)
- } catch {
- case e: Throwable => error(s"sink metrics error: ${e.getMessage}", e)
+ val pr =
+ try {
+ context.getSinks(t).foreach { sink =>
+ try {
+ sink.sinkMetrics(metric)
+ } catch {
+ case e: Throwable => error(s"sink metrics error: ${e.getMessage}", e)
+ }
}
+ true
+ } catch {
+ case e: Throwable =>
+ error(s"flush metrics error: ${e.getMessage}", e)
+ false
}
- true
- } catch {
- case e: Throwable =>
- error(s"flush metrics error: ${e.getMessage}", e)
- false
- }
ret && pr
}
}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/FSUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/FSUtil.scala
index b7f50e8..680f7a8 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/FSUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/FSUtil.scala
@@ -38,13 +38,14 @@
fsMap.get(uri.getScheme) match {
case Some(fs) => fs
case _ =>
- val fs = try {
- FileSystem.get(uri, getConfiguration)
- } catch {
- case e: Throwable =>
- error(s"get file system error: ${e.getMessage}", e)
- throw e
- }
+ val fs =
+ try {
+ FileSystem.get(uri, getConfiguration)
+ } catch {
+ case e: Throwable =>
+ error(s"get file system error: ${e.getMessage}", e)
+ throw e
+ }
fsMap += (uri.getScheme -> fs)
fs
}
@@ -60,11 +61,12 @@
}
private def getUriOpt(path: String): Option[URI] = {
- val uriOpt = try {
- Some(new URI(path))
- } catch {
- case _: Throwable => None
- }
+ val uriOpt =
+ try {
+ Some(new URI(path))
+ } catch {
+ case _: Throwable => None
+ }
uriOpt.flatMap { uri =>
if (uri.getScheme == null) {
try {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala
index 66648d0..652bbf7 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala
@@ -19,10 +19,10 @@
import scala.util.matching.Regex
-import org.apache.http.client.methods.{HttpGet, HttpPost}
+import org.apache.http.client.methods.{HttpDelete, HttpGet, HttpPost, HttpPut}
+import org.apache.http.client.utils.URIBuilder
import org.apache.http.entity.{ContentType, StringEntity}
-import org.apache.http.impl.client.HttpClientBuilder
-import scalaj.http._
+import org.apache.http.impl.client.{BasicResponseHandler, HttpClientBuilder}
object HttpUtil {
@@ -31,65 +31,34 @@
val PUT_REGEX: Regex = """^(?i)put$""".r
val DELETE_REGEX: Regex = """^(?i)delete$""".r
- def postData(
- url: String,
- params: Map[String, Object],
- headers: Map[String, Object],
- data: String): Boolean = {
- val response = Http(url)
- .params(convertObjMap2StrMap(params))
- .headers(convertObjMap2StrMap(headers))
- .postData(data)
- .asString
-
- response.isSuccess
- }
-
def doHttpRequest(
url: String,
method: String,
params: Map[String, Object],
headers: Map[String, Object],
- data: String): Boolean = {
+ data: String): (Integer, String) = {
val client = HttpClientBuilder.create.build
- method match {
+ val uriBuilder = new URIBuilder(url)
+ convertObjMap2StrMap(params) foreach (param => uriBuilder.setParameter(param._1, param._2))
+ val handler = new BasicResponseHandler()
+ val request = method match {
case POST_REGEX() =>
- val post = new HttpPost(url)
- convertObjMap2StrMap(headers) foreach (header => post.addHeader(header._1, header._2))
+ val post = new HttpPost(uriBuilder.build())
post.setEntity(new StringEntity(data, ContentType.APPLICATION_JSON))
-
- // send the post request
- val response = client.execute(post)
- val code = response.getStatusLine.getStatusCode
- code >= 200 && code < 300
+ post
case PUT_REGEX() =>
- val get = new HttpGet(url)
- convertObjMap2StrMap(headers) foreach (header => get.addHeader(header._1, header._2))
- val response = client.execute(get)
- val code = response.getStatusLine.getStatusCode
- code >= 200 && code < 300
- case _ => false
+ val put = new HttpPut(uriBuilder.build())
+ put.setEntity(new StringEntity(data, ContentType.APPLICATION_JSON))
+ put
+ case GET_REGEX() =>
+ new HttpGet(uriBuilder.build())
+ case DELETE_REGEX() =>
+ new HttpDelete(uriBuilder.build())
+ case _ => throw new UnsupportedOperationException("Unsupported http method error!")
}
- }
-
- def httpRequest(
- url: String,
- method: String,
- params: Map[String, Object],
- headers: Map[String, Object],
- data: String): Boolean = {
- val httpReq = Http(url)
- .params(convertObjMap2StrMap(params))
- .headers(convertObjMap2StrMap(headers))
- method match {
- case POST_REGEX() =>
- val res = httpReq.postData(data).asString
- res.isSuccess
- case PUT_REGEX() =>
- val res = httpReq.put(data).asString
- res.isSuccess
- case _ => false
- }
+ convertObjMap2StrMap(headers) foreach (header => request.addHeader(header._1, header._2))
+ val response = client.execute(request)
+ (response.getStatusLine.getStatusCode, handler.handleResponse(response).trim)
}
private def convertObjMap2StrMap(map: Map[String, Object]): Map[String, String] = {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/ThreadUtils.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/ThreadUtils.scala
index 52c5f38..bc1e292 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/ThreadUtils.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/ThreadUtils.scala
@@ -21,10 +21,6 @@
import scala.concurrent.{Awaitable, ExecutionContext, ExecutionContextExecutor}
import scala.concurrent.duration.Duration
-import scala.concurrent.forkjoin.{
- ForkJoinPool => SForkJoinPool,
- ForkJoinWorkerThread => SForkJoinWorkerThread
-}
import scala.util.control.NonFatal
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
@@ -172,15 +168,15 @@
/**
* Construct a new Scala ForkJoinPool with a specified max parallelism and name prefix.
*/
- def newForkJoinPool(prefix: String, maxThreadNumber: Int): SForkJoinPool = {
+ def newForkJoinPool(prefix: String, maxThreadNumber: Int): ForkJoinPool = {
// Custom factory to set thread names
- val factory = new SForkJoinPool.ForkJoinWorkerThreadFactory {
- override def newThread(pool: SForkJoinPool): SForkJoinWorkerThread =
- new SForkJoinWorkerThread(pool) {
+ val factory = new ForkJoinPool.ForkJoinWorkerThreadFactory {
+ override def newThread(pool: ForkJoinPool): ForkJoinWorkerThread =
+ new ForkJoinWorkerThread(pool) {
setName(prefix + "-" + super.getName)
}
}
- new SForkJoinPool(
+ new ForkJoinPool(
maxThreadNumber,
factory,
null, // handler
diff --git a/measure/src/test/scala/org/apache/griffin/measure/SparkSuiteBase.scala b/measure/src/test/scala/org/apache/griffin/measure/SparkSuiteBase.scala
index dbed89c..011122f 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/SparkSuiteBase.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/SparkSuiteBase.scala
@@ -22,9 +22,10 @@
import org.apache.commons.io.FileUtils
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
-import org.scalatest.{BeforeAndAfterAll, FlatSpec}
+import org.scalatest._
+import org.scalatest.flatspec.AnyFlatSpec
-trait SparkSuiteBase extends FlatSpec with BeforeAndAfterAll {
+trait SparkSuiteBase extends AnyFlatSpec with BeforeAndAfterAll {
@transient var spark: SparkSession = _
@transient var sc: SparkContext = _
diff --git a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamEnumReaderSpec.scala b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamEnumReaderSpec.scala
index efaa91f..8f191bc 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamEnumReaderSpec.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamEnumReaderSpec.scala
@@ -17,7 +17,9 @@
package org.apache.griffin.measure.configuration.dqdefinition.reader
-import org.scalatest.{FlatSpec, Matchers}
+import org.scalatest._
+import flatspec.AnyFlatSpec
+import matchers.should._
import org.apache.griffin.measure.configuration.dqdefinition.{
DQConfig,
@@ -26,7 +28,7 @@
RuleParam
}
-class ParamEnumReaderSpec extends FlatSpec with Matchers {
+class ParamEnumReaderSpec extends AnyFlatSpec with Matchers {
import org.apache.griffin.measure.configuration.enums.DslType._
"dsltype" should "be parsed to predefined set of values" in {
val validDslSparkSqlValues =
diff --git a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
index a05df90..94c7e07 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
@@ -22,8 +22,10 @@
import org.apache.griffin.measure.configuration.dqdefinition.DQConfig
import org.apache.griffin.measure.configuration.enums.DslType.GriffinDsl
-
-class ParamFileReaderSpec extends FlatSpec with Matchers {
+import org.scalatest._
+import flatspec.AnyFlatSpec
+import matchers.should._
+class ParamFileReaderSpec extends AnyFlatSpec with Matchers {
"params " should "be parsed from a valid file" in {
val reader: ParamReader =
diff --git a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
index 86d68b5..d8bf125 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
@@ -18,14 +18,15 @@
package org.apache.griffin.measure.configuration.dqdefinition.reader
import scala.io.Source
-
-import org.scalatest.{FlatSpec, Matchers}
import scala.util.{Failure, Success}
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
+
import org.apache.griffin.measure.configuration.dqdefinition.DQConfig
import org.apache.griffin.measure.configuration.enums.DslType.GriffinDsl
-class ParamJsonReaderSpec extends FlatSpec with Matchers {
+class ParamJsonReaderSpec extends AnyFlatSpec with Matchers {
"params " should "be parsed from a valid file" in {
val bufferedSource =
diff --git a/measure/src/test/scala/org/apache/griffin/measure/context/DataFrameCacheTest.scala b/measure/src/test/scala/org/apache/griffin/measure/context/DataFrameCacheTest.scala
index 898c8fd..6f5717a 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/context/DataFrameCacheTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/context/DataFrameCacheTest.scala
@@ -19,11 +19,12 @@
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.types._
-import org.scalatest._
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
import org.apache.griffin.measure.SparkSuiteBase
-class DataFrameCacheTest extends FlatSpec with Matchers with SparkSuiteBase {
+class DataFrameCacheTest extends AnyFlatSpec with Matchers with SparkSuiteBase {
def createDataFrame(arr: Seq[Int]): DataFrame = {
val schema = StructType(
diff --git a/measure/src/test/scala/org/apache/griffin/measure/context/MetricWrapperTest.scala b/measure/src/test/scala/org/apache/griffin/measure/context/MetricWrapperTest.scala
index 7d25cb1..9a63abd 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/context/MetricWrapperTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/context/MetricWrapperTest.scala
@@ -17,9 +17,10 @@
package org.apache.griffin.measure.context
-import org.scalatest._
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
-class MetricWrapperTest extends FlatSpec with Matchers {
+class MetricWrapperTest extends AnyFlatSpec with Matchers {
"metric wrapper" should "flush empty if no metric inserted" in {
val metricWrapper = MetricWrapper("name", "appId")
diff --git a/measure/src/test/scala/org/apache/griffin/measure/context/TimeRangeTest.scala b/measure/src/test/scala/org/apache/griffin/measure/context/TimeRangeTest.scala
index 28c195b..9648d96 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/context/TimeRangeTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/context/TimeRangeTest.scala
@@ -17,9 +17,10 @@
package org.apache.griffin.measure.context
-import org.scalatest._
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
-class TimeRangeTest extends FlatSpec with Matchers {
+class TimeRangeTest extends AnyFlatSpec with Matchers {
"time range" should "be able to merge another time range" in {
val tr1 = TimeRange(1, 10, Set(2, 5, 8))
diff --git a/measure/src/test/scala/org/apache/griffin/measure/datasource/TimestampStorageTest.scala b/measure/src/test/scala/org/apache/griffin/measure/datasource/TimestampStorageTest.scala
index a90768e..c60b439 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/datasource/TimestampStorageTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/datasource/TimestampStorageTest.scala
@@ -17,9 +17,10 @@
package org.apache.griffin.measure.datasource
-import org.scalatest._
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
-class TimestampStorageTest extends FlatSpec with Matchers {
+class TimestampStorageTest extends AnyFlatSpec with Matchers {
"timestamp storage" should "be able to insert a timestamp" in {
val timestampStorage = TimestampStorage()
diff --git a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactorySpec.scala b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactorySpec.scala
index 466196b..fd4e6a5 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactorySpec.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactorySpec.scala
@@ -23,7 +23,7 @@
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
-import org.scalatest.FlatSpec
+import org.scalatest.flatspec.AnyFlatSpec
import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
import org.apache.griffin.measure.context.TimeRange
@@ -75,7 +75,7 @@
override def data(ms: Long): (Option[DataFrame], TimeRange) = null
}
-class DataConnectorFactorySpec extends FlatSpec {
+class DataConnectorFactorySpec extends AnyFlatSpec {
"DataConnectorFactory" should "be able to create custom batch connector" in {
val param = DataConnectorParam(
diff --git a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchDataConnectorTest.scala b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchDataConnectorTest.scala
index 60c50ca..37c63fc 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchDataConnectorTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchDataConnectorTest.scala
@@ -18,13 +18,15 @@
package org.apache.griffin.measure.datasource.connector.batch
import org.apache.spark.sql.types.StructType
-import org.scalatest.Matchers
+import org.scalatest.matchers.should._
+import org.scalatest.Ignore
import org.testcontainers.elasticsearch.ElasticsearchContainer
import org.apache.griffin.measure.SparkSuiteBase
import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
import org.apache.griffin.measure.datasource.TimestampStorage
+@Ignore
class ElasticSearchDataConnectorTest extends SparkSuiteBase with Matchers {
// ignorance flag that could skip cases
diff --git a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala
index 2493df3..2ae65be 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala
@@ -18,7 +18,7 @@
package org.apache.griffin.measure.datasource.connector.batch
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType}
-import org.scalatest._
+import org.scalatest.matchers.should._
import org.apache.griffin.measure.SparkSuiteBase
import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
@@ -65,13 +65,12 @@
// valid schema
val result1 = FileBasedDataConnector(
spark,
- dcParam.copy(
- config = configs + (
- (
- "schema",
- Seq(
- Map("name" -> "name", "type" -> "string"),
- Map("name" -> "age", "type" -> "int", "nullable" -> "true"))))),
+ dcParam.copy(config = configs + (
+ (
+ "schema",
+ Seq(
+ Map("name" -> "name", "type" -> "string"),
+ Map("name" -> "age", "type" -> "int", "nullable" -> "true"))))),
timestampStorage)
.data(1L)
diff --git a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnectorTest.scala b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnectorTest.scala
index 77040e3..8a755b6 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnectorTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnectorTest.scala
@@ -19,7 +19,7 @@
import java.sql.DriverManager
import java.util.Properties
-import org.scalatest.Matchers
+import org.scalatest.matchers.should._
import org.apache.griffin.measure.SparkSuiteBase
import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
diff --git a/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala b/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
index a557dda..9fc9883 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
@@ -19,7 +19,7 @@
import scala.util.{Failure, Success}
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+import org.scalatest.matchers.should._
import org.apache.griffin.measure.{Loggable, SparkSuiteBase}
import org.apache.griffin.measure.Application._
@@ -30,12 +30,7 @@
import org.apache.griffin.measure.launch.batch.BatchDQApp
import org.apache.griffin.measure.launch.streaming.StreamingDQApp
-class DQAppTest
- extends FlatSpec
- with SparkSuiteBase
- with BeforeAndAfterAll
- with Matchers
- with Loggable {
+class DQAppTest extends SparkSuiteBase with Matchers with Loggable {
var envParam: EnvConfig = _
var sparkParam: SparkParam = _
diff --git a/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala
index e4754e0..8bf81b1 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala
@@ -57,7 +57,7 @@
}
sinks.headOption match {
case Some(sink: CustomSink) => sink.allMetrics
- case _ => mutable.ListBuffer[String]()
+ case _ => Map.empty
}
})
diff --git a/measure/src/test/scala/org/apache/griffin/measure/sink/SinkTestBase.scala b/measure/src/test/scala/org/apache/griffin/measure/sink/SinkTestBase.scala
index 919183b..b13858f 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/sink/SinkTestBase.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/sink/SinkTestBase.scala
@@ -19,14 +19,15 @@
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.types._
-import org.scalatest.{FlatSpec, Matchers}
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
import org.apache.griffin.measure.{Loggable, SparkSuiteBase}
import org.apache.griffin.measure.configuration.dqdefinition.SinkParam
import org.apache.griffin.measure.configuration.enums.ProcessType.BatchProcessType
import org.apache.griffin.measure.context.{ContextId, DQContext}
-trait SinkTestBase extends FlatSpec with Matchers with SparkSuiteBase with Loggable {
+trait SinkTestBase extends AnyFlatSpec with Matchers with SparkSuiteBase with Loggable {
var sinkParams: Seq[SinkParam]
diff --git a/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala b/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
index 834d8e0..5892c11 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
@@ -17,15 +17,17 @@
package org.apache.griffin.measure.step
-import org.scalatest._
import scala.util.Try
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
+
import org.apache.griffin.measure.{Loggable, SparkSuiteBase}
import org.apache.griffin.measure.configuration.enums.ProcessType.BatchProcessType
import org.apache.griffin.measure.context.{ContextId, DQContext}
import org.apache.griffin.measure.step.transform.TransformStep
-class TransformStepTest extends FlatSpec with Matchers with SparkSuiteBase with Loggable {
+class TransformStepTest extends AnyFlatSpec with Matchers with SparkSuiteBase with Loggable {
case class DualTransformStep(
name: String,
diff --git a/measure/src/test/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQStepsTest.scala b/measure/src/test/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQStepsTest.scala
index 67e5236..f80fb13 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQStepsTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQStepsTest.scala
@@ -17,15 +17,15 @@
package org.apache.griffin.measure.step.builder.dsl.transform
-import org.scalatest._
-import org.scalatest.mockito.MockitoSugar
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
+import org.scalatestplus.mockito.MockitoSugar
-import org.apache.griffin.measure.configuration.dqdefinition.RuleErrorConfParam
-import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
+import org.apache.griffin.measure.configuration.dqdefinition.{RuleErrorConfParam, RuleParam}
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.builder.dsl.expr.Expr
-class CompletenessExpr2DQStepsTest extends FlatSpec with Matchers with MockitoSugar {
+class CompletenessExpr2DQStepsTest extends AnyFlatSpec with Matchers with MockitoSugar {
"CompletenessExpr2DQSteps" should "get correct where clause" in {
val completeness = CompletenessExpr2DQSteps(mock[DQContext], mock[Expr], mock[RuleParam])
diff --git a/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala b/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
index 2f06ad3..5b019a5 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
@@ -18,7 +18,8 @@
package org.apache.griffin.measure.transformations
import org.apache.spark.sql.DataFrame
-import org.scalatest._
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
import org.apache.griffin.measure.SparkSuiteBase
import org.apache.griffin.measure.configuration.dqdefinition._
@@ -29,7 +30,10 @@
case class AccuracyResult(total: Long, miss: Long, matched: Long, matchedFraction: Double)
-class AccuracyTransformationsIntegrationTest extends FlatSpec with Matchers with SparkSuiteBase {
+class AccuracyTransformationsIntegrationTest
+ extends AnyFlatSpec
+ with Matchers
+ with SparkSuiteBase {
private val EMPTY_PERSON_TABLE = "empty_person"
private val PERSON_TABLE = "person"
@@ -80,10 +84,9 @@
sourceName: String,
targetName: String,
expectedResult: AccuracyResult) = {
- val dqContext: DQContext = getDqContext(
- dataSourcesParam = List(
- DataSourceParam(name = "source", connector = dataConnectorParam(tableName = sourceName)),
- DataSourceParam(name = "target", connector = dataConnectorParam(tableName = targetName))))
+ val dqContext: DQContext = getDqContext(dataSourcesParam = List(
+ DataSourceParam(name = "source", connector = dataConnectorParam(tableName = sourceName)),
+ DataSourceParam(name = "target", connector = dataConnectorParam(tableName = targetName))))
val accuracyRule = RuleParam(
dslType = "griffin-dsl",
diff --git a/measure/src/test/scala/org/apache/griffin/measure/utils/ParamUtilTest.scala b/measure/src/test/scala/org/apache/griffin/measure/utils/ParamUtilTest.scala
index 720f9b2..9f204a8 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/utils/ParamUtilTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/utils/ParamUtilTest.scala
@@ -17,11 +17,13 @@
package org.apache.griffin.measure.utils
-import org.scalatest._
+import org.scalatest.BeforeAndAfter
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
import org.apache.griffin.measure.utils.ParamUtil._
-class ParamUtilTest extends FlatSpec with Matchers with BeforeAndAfter {
+class ParamUtilTest extends AnyFlatSpec with Matchers with BeforeAndAfter {
val fruits: Map[String, Any] =
Map[String, Any]("A" -> "apple", "B" -> "banana", "O" -> "orange")
diff --git a/pom.xml b/pom.xml
index 7a68625..e51c94e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,7 +30,7 @@
<groupId>org.apache.griffin</groupId>
<artifactId>griffin</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.7.0-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Apache Griffin ${project.version}</name>
<url>http://griffin.apache.org</url>
@@ -40,13 +40,23 @@
</prerequisites>
<properties>
+ <encoding>UTF-8</encoding>
+ <project.build.sourceEncoding>${encoding}</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>${encoding}</project.reporting.outputEncoding>
+
<java.version>1.8</java.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ <scala211.binary.version>2.11</scala211.binary.version>
+ <scala.version>${scala.binary.version}.0</scala.version>
+
+ <maven.compiler.source>${java.version}</maven.compiler.source>
+ <maven.compiler.target>${java.version}</maven.compiler.target>
+
<maven-apache-rat.version>0.11</maven-apache-rat.version>
<exec-maven-plugin.version>1.6.0</exec-maven-plugin.version>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
- <maven.compiler.source>1.8</maven.compiler.source>
- <maven.compiler.target>1.8</maven.compiler.target>
+
+ <compile.scope>compile</compile.scope>
+ <provided.scope>provided</provided.scope>
</properties>
<licenses>
@@ -109,7 +119,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
- <version>3.6.1</version>
+ <version>3.8.1</version>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
@@ -119,10 +129,16 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
- <version>2.22.0</version>
+ <version>3.0.0-M3</version>
<configuration>
+ <testFailureIgnore>false</testFailureIgnore>
+ <useSystemClassLoader>true</useSystemClassLoader>
+ <forkMode>once</forkMode>
+ <failIfNoTests>false</failIfNoTests>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
- <argLine>-ea -Xmx1g -Xss4m -XX:ReservedCodeCacheSize=128m</argLine>
+ <systemProperties>
+ <java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
+ </systemProperties>
</configuration>
<executions>
<execution>
@@ -133,24 +149,7 @@
</execution>
</executions>
</plugin>
- <!-- Scalatest runs all Scala tests -->
- <!-- enable scalatest -->
- <plugin>
- <groupId>org.scalatest</groupId>
- <artifactId>scalatest-maven-plugin</artifactId>
- <version>1.0</version>
- <configuration>
- <argLine>-ea -Xmx1g -Xss4m -XX:ReservedCodeCacheSize=128m</argLine>
- </configuration>
- <executions>
- <execution>
- <id>test</id>
- <goals>
- <goal>test</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
+
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
@@ -196,6 +195,7 @@
<exclude>**/src/main/resources/public/**</exclude>
<exclude>**/pom.xml.releaseBackup</exclude>
<exclude>**/pom.xml.tag</exclude>
+ <exclude>**/_SUCCESS</exclude>
</excludes>
</configuration>
<executions>
diff --git a/service/pom.xml b/service/pom.xml
index dd2466f..8121998 100644
--- a/service/pom.xml
+++ b/service/pom.xml
@@ -23,7 +23,7 @@
<parent>
<groupId>org.apache.griffin</groupId>
<artifactId>griffin</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.7.0-SNAPSHOT</version>
</parent>
<artifactId>service</artifactId>
@@ -34,7 +34,6 @@
<properties>
<hadoop.version>2.7.1</hadoop.version>
<hive.version>2.2.0</hive.version>
- <scala.version>2.10</scala.version>
<spring.boot.version>2.1.7.RELEASE</spring.boot.version>
<spring.security.kerberos.version>1.0.1.RELEASE</spring.security.kerberos.version>
<confluent.version>3.2.0</confluent.version>
@@ -258,7 +257,7 @@
<!--livy-core-->
<dependency>
<groupId>com.cloudera.livy</groupId>
- <artifactId>livy-core_${scala.version}</artifactId>
+ <artifactId>livy-core_2.10</artifactId>
<version>${livy.core.version}</version>
</dependency>
diff --git a/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java b/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
index b46ea8b..9eb60ad 100644
--- a/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
@@ -403,7 +403,7 @@
Set<String> sets = new HashSet<>();
List<DataSource> sources = measure.getDataSources();
for (DataSource source : sources) {
- source.getConnectors().forEach(dc -> sets.add(dc.getName()));
+ sets.add(source.getConnector().getName());
}
if (sets.size() < sources.size()) {
LOGGER.warn("Connector names cannot be repeated.");
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobInstance.java b/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
index 3899147..344cf75 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
@@ -146,8 +146,7 @@
jobStartTime = triggerTime.getTime();
}
- private void setSourcesPartitionsAndPredicates(List<DataSource> sources)
- throws Exception {
+ private void setSourcesPartitionsAndPredicates(List<DataSource> sources) {
boolean isFirstBaseline = true;
for (JobDataSegment jds : job.getSegments()) {
if (jds.isAsTsBaseline() && isFirstBaseline) {
@@ -157,22 +156,14 @@
isFirstBaseline = false;
}
for (DataSource ds : sources) {
- setDataSourcePartitions(jds, ds);
+ setDataConnectorPartitions(jds, ds.getConnector());
}
}
}
- private void setDataSourcePartitions(JobDataSegment jds, DataSource ds)
- throws Exception {
- List<DataConnector> connectors = ds.getConnectors();
- for (DataConnector dc : connectors) {
- setDataConnectorPartitions(jds, dc);
- }
- }
-
private void setDataConnectorPartitions(
JobDataSegment jds,
- DataConnector dc) throws Exception {
+ DataConnector dc) {
String dcName = jds.getDataConnectorName();
if (dcName.equals(dc.getName())) {
Long[] sampleTs = genSampleTs(jds.getSegmentRange(), dc);
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java b/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java
index 970977b..378b397 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java
@@ -28,21 +28,10 @@
import java.io.IOException;
import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
-import javax.persistence.CascadeType;
-import javax.persistence.Column;
-import javax.persistence.Entity;
-import javax.persistence.FetchType;
-import javax.persistence.JoinColumn;
-import javax.persistence.OneToMany;
-import javax.persistence.PostLoad;
-import javax.persistence.PrePersist;
-import javax.persistence.PreUpdate;
-import javax.persistence.Transient;
+import javax.persistence.*;
import org.apache.griffin.core.util.JsonUtil;
-import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
@Entity
@@ -51,10 +40,10 @@
private String name;
- @OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST,
+ @OneToOne(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST,
CascadeType.REMOVE, CascadeType.MERGE})
@JoinColumn(name = "data_source_id")
- private List<DataConnector> connectors = new ArrayList<>();
+ private DataConnector connector = new DataConnector();
private boolean baseline = false;
@@ -75,15 +64,12 @@
this.name = name;
}
- public List<DataConnector> getConnectors() {
- return connectors;
+ public DataConnector getConnector() {
+ return connector;
}
- public void setConnectors(List<DataConnector> connectors) {
- if (CollectionUtils.isEmpty(connectors)) {
- throw new NullPointerException("Data connector can not be empty.");
- }
- this.connectors = connectors;
+ public void setConnector(DataConnector connector) {
+ this.connector = connector;
}
public boolean isBaseline() {
@@ -132,18 +118,18 @@
public DataSource() {
}
- public DataSource(String name, List<DataConnector> connectors) {
+ public DataSource(String name, DataConnector connector) {
this.name = name;
- this.connectors = connectors;
+ this.connector = connector;
}
public DataSource(String name, boolean baseline,
Map<String, Object> checkpointMap,
- List<DataConnector> connectors) {
+ DataConnector connector) {
this.name = name;
this.baseline = baseline;
this.checkpointMap = checkpointMap;
- this.connectors = connectors;
+ this.connector = connector;
}
}
diff --git a/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceImpl.java b/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceImpl.java
index d855183..4b83907 100644
--- a/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceImpl.java
@@ -49,7 +49,7 @@
private static final Logger LOGGER = LoggerFactory
.getLogger(HiveMetaStoreService.class);
- @Autowired
+ @Autowired(required = false)
private IMetaStoreClient client = null;
@Value("${hive.metastore.dbname}")
diff --git a/service/src/main/java/org/apache/griffin/core/util/MeasureUtil.java b/service/src/main/java/org/apache/griffin/core/util/MeasureUtil.java
index 7e57d87..59526bd 100644
--- a/service/src/main/java/org/apache/griffin/core/util/MeasureUtil.java
+++ b/service/src/main/java/org/apache/griffin/core/util/MeasureUtil.java
@@ -42,7 +42,7 @@
public class MeasureUtil {
private static final Logger LOGGER = LoggerFactory
- .getLogger(MeasureUtil.class);
+ .getLogger(MeasureUtil.class);
public static void validateMeasure(Measure measure) {
if (measure instanceof GriffinMeasure) {
@@ -56,7 +56,7 @@
private static void validateGriffinMeasure(GriffinMeasure measure) {
if (getConnectorNamesIfValid(measure) == null) {
throw new GriffinException.BadRequestException
- (INVALID_CONNECTOR_NAME);
+ (INVALID_CONNECTOR_NAME);
}
if (!validatePredicates(measure)) {
throw new GriffinException.BadRequestException(INVALID_MEASURE_PREDICATE);
@@ -65,13 +65,11 @@
private static boolean validatePredicates(GriffinMeasure measure) {
for (DataSource dataSource : measure.getDataSources()) {
- for (DataConnector dataConnector : dataSource.getConnectors()) {
- for (SegmentPredicate segmentPredicate : dataConnector.getPredicates()) {
- try {
- PredicatorFactory.newPredicateInstance(segmentPredicate);
- } catch (Exception e) {
- return false;
- }
+ for (SegmentPredicate segmentPredicate : dataSource.getConnector().getPredicates()) {
+ try {
+ PredicatorFactory.newPredicateInstance(segmentPredicate);
+ } catch (Exception e) {
+ return false;
}
}
}
@@ -81,7 +79,7 @@
private static void validateExternalMeasure(ExternalMeasure measure) {
if (StringUtils.isBlank(measure.getMetricName())) {
LOGGER.warn("Failed to create external measure {}. " +
- "Its metric name is blank.", measure.getName());
+ "Its metric name is blank.", measure.getName());
throw new GriffinException.BadRequestException(MISSING_METRIC_NAME);
}
}
@@ -90,8 +88,9 @@
Set<String> sets = new HashSet<>();
List<DataSource> sources = measure.getDataSources();
for (DataSource source : sources) {
- source.getConnectors().stream().filter(dc -> dc.getName() != null)
- .forEach(dc -> sets.add(dc.getName()));
+ if(source.getConnector() != null && source.getConnector().getName() != null){
+ sets.add(source.getConnector().getName());
+ }
}
if (sets.size() == 0 || sets.size() < sources.size()) {
LOGGER.warn("Connector names cannot be repeated or empty.");
diff --git a/service/src/main/resources/env/env_batch.json b/service/src/main/resources/env/env_batch.json
index 72a3839..9ed9ef7 100644
--- a/service/src/main/resources/env/env_batch.json
+++ b/service/src/main/resources/env/env_batch.json
@@ -4,20 +4,23 @@
},
"sinks": [
{
+ "name": "console",
"type": "CONSOLE",
"config": {
"max.log.lines": 10
}
},
{
+ "name": "hdfs",
"type": "HDFS",
"config": {
- "path": "hdfs:///griffin/persist",
+ "path": "hdfs://localhost/griffin/persist",
"max.persist.lines": 10000,
"max.lines.per.file": 10000
}
},
{
+ "name": "elasticsearch",
"type": "ELASTICSEARCH",
"config": {
"method": "post",
diff --git a/service/src/main/resources/log4j2-spring.xml b/service/src/main/resources/log4j2-spring.xml
index c021b4a..9cee7a5 100644
--- a/service/src/main/resources/log4j2-spring.xml
+++ b/service/src/main/resources/log4j2-spring.xml
@@ -1,4 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?><!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
<Configuration status="WARN">
<Properties>
<Property name="PID">????</Property>
diff --git a/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java b/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java
index 00ce1fa..534527f 100644
--- a/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java
+++ b/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java
@@ -66,7 +66,7 @@
GriffinMeasure m = (GriffinMeasure) measures.get(0);
List<DataSource> sources = m.getDataSources();
- DataConnector connector = sources.get(0).getConnectors().get(0);
+ DataConnector connector = sources.get(0).getConnector();
Rule rule = m.getEvaluateRule().getRules().get(0);
assertEquals(m.getSinksList().size(), 2);
assertEquals(sources.get(0).isBaseline(), true);
diff --git a/service/src/test/java/org/apache/griffin/core/util/EntityMocksHelper.java b/service/src/test/java/org/apache/griffin/core/util/EntityMocksHelper.java
index 563210d..6d9f053 100644
--- a/service/src/test/java/org/apache/griffin/core/util/EntityMocksHelper.java
+++ b/service/src/test/java/org/apache/griffin/core/util/EntityMocksHelper.java
@@ -82,9 +82,9 @@
DataConnector dcTarget)
throws Exception {
DataSource dataSource = new DataSource(
- "source", true, createCheckpointMap(), Arrays.asList(dcSource));
+ "source", true, createCheckpointMap(), dcSource);
DataSource targetSource = new DataSource(
- "target", false, createCheckpointMap(), Arrays.asList(dcTarget));
+ "target", false, createCheckpointMap(), dcTarget);
List<DataSource> dataSources = new ArrayList<>();
dataSources.add(dataSource);
dataSources.add(targetSource);
diff --git a/ui/angular/package.json b/ui/angular/package.json
index d4fae3d..39c6327 100644
--- a/ui/angular/package.json
+++ b/ui/angular/package.json
@@ -39,7 +39,7 @@
"zone.js": "^0.8.14"
},
"devDependencies": {
- "@angular/cli": "1.3.0",
+ "@angular/cli": "1.7.4",
"@angular/compiler-cli": "4.4.4",
"@angular/language-service": "4.4.4",
"@types/jasmine": "~2.5.53",
diff --git a/ui/angular/src/app/job/create-job/batch/batch.component.ts b/ui/angular/src/app/job/create-job/batch/batch.component.ts
index 150f307..9049761 100644
--- a/ui/angular/src/app/job/create-job/batch/batch.component.ts
+++ b/ui/angular/src/app/job/create-job/batch/batch.component.ts
@@ -268,23 +268,21 @@
if (measure == map.name) {
var source = map["data.sources"];
for (let i = 0; i < source.length; i++) {
- var details = source[i].connectors;
- for (let j = 0; j < details.length; j++) {
- if (details[j]["data.unit"] != undefined) {
- var table =
- details[j].config.database +
- "." +
- details[j].config["table.name"];
- var size = details[j]["data.unit"];
- var connectorname = details[j]["name"];
- var detail = {
- id: i + 1,
- name: table,
- size: size,
- connectorname: connectorname
- };
- this.dropdownList.push(detail);
- }
+ var connector = source[i].connector;
+ if (connector["data.unit"] != undefined) {
+ var table =
+ connector.config.database +
+ "." +
+ connector.config["table.name"];
+ var size = connector["data.unit"];
+ var connectorname = connector["name"];
+ var detail = {
+ id: i + 1,
+ name: table,
+ size: size,
+ connectorname: connectorname
+ };
+ this.dropdownList.push(detail);
}
}
}
diff --git a/ui/angular/src/app/job/create-job/streaming/streaming.component.ts b/ui/angular/src/app/job/create-job/streaming/streaming.component.ts
index 200d788..808631b 100644
--- a/ui/angular/src/app/job/create-job/streaming/streaming.component.ts
+++ b/ui/angular/src/app/job/create-job/streaming/streaming.component.ts
@@ -234,23 +234,21 @@
if (measure == map.name) {
var source = map["data.sources"];
for (let i = 0; i < source.length; i++) {
- var details = source[i].connectors;
- for (let j = 0; j < details.length; j++) {
- if (details[j]["data.unit"] != undefined) {
- var table =
- details[j].config.database +
- "." +
- details[j].config["table.name"];
- var size = details[j]["data.unit"];
- var connectorname = details[j]["name"];
- var detail = {
- id: i + 1,
- name: table,
- size: size,
- connectorname: connectorname
- };
- this.dropdownList.push(detail);
- }
+ var connector = source[i].connector;
+ if (connector["data.unit"] != undefined) {
+ var table =
+ connector.config.database +
+ "." +
+ connector.config["table.name"];
+ var size = connector["data.unit"];
+ var connectorname = connector["name"];
+ var detail = {
+ id: i + 1,
+ name: table,
+ size: size,
+ connectorname: connectorname
+ };
+ this.dropdownList.push(detail);
}
}
}
diff --git a/ui/angular/src/app/job/job-detail/job-detail.component.ts b/ui/angular/src/app/job/job-detail/job-detail.component.ts
index 7c4a10b..d857b26 100644
--- a/ui/angular/src/app/job/job-detail/job-detail.component.ts
+++ b/ui/angular/src/app/job/job-detail/job-detail.component.ts
@@ -57,7 +57,7 @@
this.measureType = this.measureData["dq.type"].toLowerCase();
this.processType = this.measureData["process.type"].toLowerCase();
for (let item of this.measureData["data.sources"]) {
- let config = item.connectors[0].config;
+ let config = item.connector.config;
let tableName = config.database + "." + config["table.name"];
this.tableInfo.push(tableName);
}
diff --git a/ui/angular/src/app/measure/create-measure/ac/ac.component.ts b/ui/angular/src/app/measure/create-measure/ac/ac.component.ts
index 72d0138..8b7bbec 100644
--- a/ui/angular/src/app/measure/create-measure/ac/ac.component.ts
+++ b/ui/angular/src/app/measure/create-measure/ac/ac.component.ts
@@ -148,7 +148,7 @@
"data.sources": [
{
name: "source",
- connectors: [
+ connector:
{
name: "",
type: "HIVE",
@@ -170,11 +170,10 @@
}
]
}
- ]
},
{
name: "target",
- connectors: [
+ connector:
{
name: "",
type: "HIVE",
@@ -196,7 +195,6 @@
}
]
}
- ]
}
],
@@ -393,7 +391,7 @@
"data.sources": [
{
name: "source",
- connectors: [
+ connector:
{
name: this.src_name,
type: "HIVE",
@@ -415,11 +413,10 @@
}
]
}
- ]
},
{
name: "target",
- connectors: [
+ connector:
{
name: this.tgt_name,
type: "HIVE",
@@ -441,7 +438,6 @@
}
]
}
- ]
}
],
"evaluate.rule": {
@@ -499,11 +495,11 @@
}
deleteUnit(index) {
- delete this.newMeasure["data.sources"][index]["connectors"][0]["data.unit"];
+ delete this.newMeasure["data.sources"][index]["connector"]["data.unit"];
}
deletePredicates(index) {
- delete this.newMeasure["data.sources"][index]["connectors"][0]["predicates"];
+ delete this.newMeasure["data.sources"][index]["connector"]["predicates"];
}
save() {
diff --git a/ui/angular/src/app/measure/create-measure/pr/pr.component.ts b/ui/angular/src/app/measure/create-measure/pr/pr.component.ts
index a1dae5e..57be58f 100644
--- a/ui/angular/src/app/measure/create-measure/pr/pr.component.ts
+++ b/ui/angular/src/app/measure/create-measure/pr/pr.component.ts
@@ -360,7 +360,7 @@
"data.sources": [
{
name: "source",
- connectors: [
+ connector:
{
name: this.step1.srcname,
type: "HIVE",
@@ -382,7 +382,6 @@
}
]
}
- ]
}
],
"evaluate.rule": {
@@ -393,10 +392,10 @@
this.getGrouprule();
if (this.step3.size.indexOf("0") == 0) {
- delete this.newMeasure["data.sources"][0]["connectors"][0]["data.unit"];
+ delete this.newMeasure["data.sources"][0]["connector"]["data.unit"];
}
if (!this.step3.needpath || this.step3.path == "") {
- delete this.newMeasure["data.sources"][0]["connectors"][0]["predicates"];
+ delete this.newMeasure["data.sources"][0]["connector"]["predicates"];
}
this.visible = true;
setTimeout(() => (this.visibleAnimate = true), 100);
diff --git a/ui/angular/src/app/measure/measure-detail/measure-detail.component.spec.ts b/ui/angular/src/app/measure/measure-detail/measure-detail.component.spec.ts
index 6fd2f25..08d9dc4 100644
--- a/ui/angular/src/app/measure/measure-detail/measure-detail.component.spec.ts
+++ b/ui/angular/src/app/measure/measure-detail/measure-detail.component.spec.ts
@@ -43,7 +43,7 @@
});
it(
- 'should be created',
+ 'should be created',
inject(
[HttpTestingController, ServiceService],
(httpMock: HttpTestingController, serviceService: ServiceService) => {
@@ -54,13 +54,13 @@
"dq.type": "",
"evaluate.rule": "",
"data.sources": [{
- "connectors": [{
+ "connector": {
"data.unit": "",
config: {
where: ""
},
predicates: []
- }]
+ }
}]
});
diff --git a/ui/angular/src/app/measure/measure-detail/measure-detail.component.ts b/ui/angular/src/app/measure/measure-detail/measure-detail.component.ts
index 5ad5fae..7413af1 100644
--- a/ui/angular/src/app/measure/measure-detail/measure-detail.component.ts
+++ b/ui/angular/src/app/measure/measure-detail/measure-detail.component.ts
@@ -64,7 +64,7 @@
currentrule: string;
fetchData(value, index) {
- var data = this.ruleData["data.sources"][index].connectors[0];
+ var data = this.ruleData["data.sources"][index].connector;
var size = value + "size";
var zone = value + "zone";
var where = value + "where";
diff --git a/ui/angular/src/app/measure/measure.component.ts b/ui/angular/src/app/measure/measure.component.ts
index 23f7a15..d94dc3c 100644
--- a/ui/angular/src/app/measure/measure.component.ts
+++ b/ui/angular/src/app/measure/measure.component.ts
@@ -72,11 +72,11 @@
this.deletedRow = row;
$("#save").removeAttr("disabled");
if (this.deletedRow["measure.type"] !== "external") {
- var sourcedata = this.deletedRow["data.sources"][0].connectors[0].config;
+ var sourcedata = this.deletedRow["data.sources"][0].connector.config;
this.sourceTable = sourcedata["table.name"];
}
if (this.deletedRow["dq.type"] === "accuracy") {
- var targetdata = this.deletedRow["data.sources"][1].connectors[0].config;
+ var targetdata = this.deletedRow["data.sources"][1].connector.config;
this.targetTable = targetdata["table.name"];
} else {
this.targetTable = "";
diff --git a/ui/pom.xml b/ui/pom.xml
index d42cb6a..465da85 100644
--- a/ui/pom.xml
+++ b/ui/pom.xml
@@ -24,7 +24,7 @@
<parent>
<groupId>org.apache.griffin</groupId>
<artifactId>griffin</artifactId>
- <version>0.6.0-SNAPSHOT</version>
+ <version>0.7.0-SNAPSHOT</version>
</parent>
<artifactId>ui</artifactId>
<packaging>pom</packaging>