APEXMALHAR-2516 contrib dependency issues
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 83305cb..3c5797c 100755
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -554,16 +554,10 @@
</exclusions>
</dependency>
<dependency>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-exec</artifactId>
- <version>0.13.1</version>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-hadoop</artifactId>
+ <version>1.8.1</version>
<optional>true</optional>
- <exclusions>
- <exclusion>
- <groupId>*</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>org.apache.solr</groupId>
diff --git a/contrib/src/main/java/com/datatorrent/contrib/avro/PojoToAvro.java b/contrib/src/main/java/com/datatorrent/contrib/avro/PojoToAvro.java
index 2f8fb19..41c56e3 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/avro/PojoToAvro.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/avro/PojoToAvro.java
@@ -86,9 +86,9 @@
@VisibleForTesting
int fieldErrorCount = 0;
- public final transient DefaultOutputPort<GenericRecord> output = new DefaultOutputPort<GenericRecord>();
+ public final transient DefaultOutputPort<GenericRecord> output = new DefaultOutputPort<>();
- public final transient DefaultOutputPort<Object> errorPort = new DefaultOutputPort<Object>();
+ public final transient DefaultOutputPort<Object> errorPort = new DefaultOutputPort<>();
private void parseSchema() throws IOException
{
@@ -172,7 +172,7 @@
{
setColumnNames(schema.getFields());
- keyMethodMap = new ArrayList<Getter>();
+ keyMethodMap = new ArrayList<>();
for (int i = 0; i < getColumnNames().size(); i++) {
try {
keyMethodMap.add(generateGettersForField(cls, getColumnNames().get(i).name()));
@@ -217,7 +217,7 @@
try {
record = getGenericRecord(tuple);
} catch (Exception e) {
- LOG.error("Exception in parsing record");
+ LOG.error("Exception in creating record");
errorCount++;
}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/parquet/AbstractParquetFileReader.java b/contrib/src/main/java/com/datatorrent/contrib/parquet/AbstractParquetFileReader.java
index c4eefff..14ab918 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/parquet/AbstractParquetFileReader.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/parquet/AbstractParquetFileReader.java
@@ -21,17 +21,17 @@
import java.io.IOException;
import java.io.InputStream;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
-import parquet.example.data.Group;
-import parquet.hadoop.ParquetReader;
-import parquet.hadoop.example.GroupReadSupport;
-import parquet.schema.MessageType;
-import parquet.schema.MessageTypeParser;
-
/**
* Base implementation of ParquetFileReader. Reads Parquet files from input
* directory using GroupReadSupport. Derived classes need to implement
@@ -41,6 +41,7 @@
*
* @since 3.4.0
*/
+@InterfaceStability.Evolving
public abstract class AbstractParquetFileReader<T> extends AbstractFileInputOperator<T>
{
private transient ParquetReader<Group> reader;
@@ -70,7 +71,7 @@
InputStream is = super.openFile(path);
GroupReadSupport readSupport = new GroupReadSupport();
readSupport.init(configuration, null, schema);
- reader = new ParquetReader<Group>(path, readSupport);
+ reader = new ParquetReader<>(path, readSupport);
return is;
}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/parquet/ParquetFilePOJOReader.java b/contrib/src/main/java/com/datatorrent/contrib/parquet/ParquetFilePOJOReader.java
index 37bd60b..36c5b55 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/parquet/ParquetFilePOJOReader.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/parquet/ParquetFilePOJOReader.java
@@ -25,6 +25,11 @@
import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.ClassUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.io.InvalidRecordException;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultOutputPort;
@@ -32,11 +37,6 @@
import com.datatorrent.lib.util.FieldInfo.SupportType;
import com.datatorrent.lib.util.PojoUtils;
-import parquet.example.data.Group;
-import parquet.io.InvalidRecordException;
-import parquet.io.ParquetEncodingException;
-import parquet.schema.PrimitiveType.PrimitiveTypeName;
-
/**
* <p>
* ParquetFilePOJOReader
@@ -57,6 +57,7 @@
*
* @since 3.4.0
*/
+@InterfaceStability.Evolving
public class ParquetFilePOJOReader extends AbstractParquetFileReader<Object>
{
@@ -175,7 +176,7 @@
private void initialiseActiveFieldInfo(String fieldMapping)
{
String[] fields = fieldMapping.split(RECORD_SEPARATOR);
- activeFieldInfos = new ArrayList<ActiveFieldInfo>(fields.length);
+ activeFieldInfos = new ArrayList<>(fields.length);
for (String field : fields) {
String[] token = field.split(FIELD_SEPARATOR);
try {
diff --git a/contrib/src/test/java/com/datatorrent/contrib/avro/PojoToAvroTest.java b/contrib/src/test/java/com/datatorrent/contrib/avro/PojoToAvroTest.java
index 772a057..787e684 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/avro/PojoToAvroTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/avro/PojoToAvroTest.java
@@ -18,6 +18,7 @@
*/
package com.datatorrent.contrib.avro;
+import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
@@ -26,7 +27,6 @@
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
-import org.python.google.common.collect.Lists;
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
@@ -41,7 +41,7 @@
+ "\"type\": \"long\"}," + "{\"name\":\"customerId\",\"type\": \"int\"},"
+ "{\"name\":\"total\",\"type\": \"double\"}," + "{\"name\":\"customerName\",\"type\": \"string\"}]}";
- CollectorTestSink<Object> outputSink = new CollectorTestSink<Object>();
+ CollectorTestSink<Object> outputSink = new CollectorTestSink<>();
PojoToAvro avroWriter = new PojoToAvro();
public class TestMeta extends TestWatcher
@@ -74,7 +74,7 @@
public void testWriting() throws Exception
{
- List<SimpleOrder> orderList = Lists.newArrayList();
+ List<SimpleOrder> orderList = new ArrayList<>();
orderList.add(new SimpleOrder(1, 11, 100.25, "customerOne"));
orderList.add(new SimpleOrder(2, 22, 200.25, "customerTwo"));
orderList.add(new SimpleOrder(3, 33, 300.25, "customerThree"));
@@ -101,7 +101,7 @@
public void testWriteFailure() throws Exception
{
- List<Order> orderList = Lists.newArrayList();
+ List<Order> orderList = new ArrayList<>();
orderList.add(new Order(11));
orderList.add(new Order(22));
orderList.add(new Order(33));
diff --git a/contrib/src/test/java/com/datatorrent/contrib/parquet/ParquetFilePOJOReaderTest.java b/contrib/src/test/java/com/datatorrent/contrib/parquet/ParquetFilePOJOReaderTest.java
index 89a9839..862c96a 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/parquet/ParquetFilePOJOReaderTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/parquet/ParquetFilePOJOReaderTest.java
@@ -55,15 +55,15 @@
import com.datatorrent.lib.util.PojoUtils;
import com.datatorrent.lib.util.PojoUtils.Getter;
-import parquet.column.ColumnDescriptor;
-import parquet.hadoop.ParquetWriter;
-import parquet.hadoop.api.WriteSupport;
-import parquet.hadoop.metadata.CompressionCodecName;
-import parquet.io.ParquetEncodingException;
-import parquet.io.api.Binary;
-import parquet.io.api.RecordConsumer;
-import parquet.schema.MessageType;
-import parquet.schema.MessageTypeParser;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext;
@@ -74,7 +74,7 @@
+ "required INT32 event_id;" + "required BINARY org_id (UTF8);" + "required INT64 long_id;"
+ "optional BOOLEAN css_file_loaded;" + "optional FLOAT float_val;" + "optional DOUBLE double_val;}";
- CollectorTestSink<Object> outputSink = new CollectorTestSink<Object>();
+ CollectorTestSink<Object> outputSink = new CollectorTestSink<>();
ParquetFilePOJOReader parquetFilePOJOReader = new ParquetFilePOJOReader();
public static class TestMeta extends TestWatcher
@@ -506,7 +506,7 @@
public ParquetPOJOWriter(Path file, MessageType schema, Class klass, CompressionCodecName codecName,
boolean enableDictionary) throws IOException
{
- super(file, (WriteSupport<Object>)new POJOWriteSupport(schema, klass), codecName, DEFAULT_BLOCK_SIZE,
+ super(file, new POJOWriteSupport(schema, klass), codecName, DEFAULT_BLOCK_SIZE,
DEFAULT_PAGE_SIZE, enableDictionary, false);
}
@@ -530,7 +530,7 @@
private void init()
{
- keyMethodMap = new ArrayList<Getter>();
+ keyMethodMap = new ArrayList<>();
for (int i = 0; i < cols.size(); i++) {
try {
keyMethodMap.add(generateGettersForField(klass, cols.get(i).getPath()[0]));
@@ -542,7 +542,7 @@
}
@Override
- public parquet.hadoop.api.WriteSupport.WriteContext init(Configuration configuration)
+ public WriteContext init(Configuration configuration)
{
return new WriteContext(schema, new HashMap<String, String>());
}
diff --git a/examples/jdbc/src/main/java/org/apache/apex/examples/JdbcIngest/JdbcPollerApplication.java b/examples/jdbc/src/main/java/org/apache/apex/examples/JdbcIngest/JdbcPollerApplication.java
index 4f351ef..2a5ff29 100644
--- a/examples/jdbc/src/main/java/org/apache/apex/examples/JdbcIngest/JdbcPollerApplication.java
+++ b/examples/jdbc/src/main/java/org/apache/apex/examples/JdbcIngest/JdbcPollerApplication.java
@@ -36,6 +36,7 @@
@ApplicationAnnotation(name = "PollJdbcToHDFSApp")
public class JdbcPollerApplication implements StreamingApplication
{
+ @Override
public void populateDAG(DAG dag, Configuration conf)
{
JdbcPOJOPollInputOperator poller = dag.addOperator("JdbcPoller", new JdbcPOJOPollInputOperator());
@@ -53,7 +54,7 @@
}
/**
- * This method can be modified to have field mappings based on used defined
+ * This method can be modified to have field mappings based on user defined
* class
*/
private List<FieldInfo> addFieldInfos()
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java
index f96c6e1..8442d55 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java
@@ -84,7 +84,6 @@
{
super.setup(context);
try {
- // closing the query statement in super class as it is not needed
if (getColumnsExpression() == null) {
StringBuilder columns = new StringBuilder();
for (int i = 0; i < fieldInfos.size(); i++) {
diff --git a/pom.xml b/pom.xml
index dc9ed8b..d6bdfdd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -134,6 +134,7 @@
<excludes>
<exclude>@org.apache.hadoop.classification.InterfaceStability$Evolving</exclude>
<exclude>@org.apache.hadoop.classification.InterfaceStability$Unstable</exclude>
+ <exclude>com.datatorrent.contrib.parquet</exclude>
</excludes>
</parameter>
<skip>${semver.plugin.skip}</skip>
diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/codegen/BeanClassGenerator.java b/sql/src/main/java/org/apache/apex/malhar/sql/codegen/BeanClassGenerator.java
index 5365e20..cc2eff8 100644
--- a/sql/src/main/java/org/apache/apex/malhar/sql/codegen/BeanClassGenerator.java
+++ b/sql/src/main/java/org/apache/apex/malhar/sql/codegen/BeanClassGenerator.java
@@ -104,7 +104,6 @@
* @throws JSONException
* @throws IOException
*/
- @SuppressWarnings("unchecked")
public static byte[] createAndWriteBeanClass(String fqcn, List<TupleSchemaRegistry.SQLFieldInfo> fieldList,
FSDataOutputStream outputStream) throws JSONException, IOException
{
diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/schema/TupleSchemaRegistry.java b/sql/src/main/java/org/apache/apex/malhar/sql/schema/TupleSchemaRegistry.java
index 460fafb..7742503 100644
--- a/sql/src/main/java/org/apache/apex/malhar/sql/schema/TupleSchemaRegistry.java
+++ b/sql/src/main/java/org/apache/apex/malhar/sql/schema/TupleSchemaRegistry.java
@@ -129,9 +129,7 @@
case DOUBLE:
return Double.class;
case DATE:
- return Date.class;
case TIME:
- return Date.class;
case TIMESTAMP:
return Date.class;
case CHAR: