[NEMO-364] Upgrade Beam version (#204)
JIRA: [NEMO-364: Upgrade Beam](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-364)
**Major changes:**
- Upgrades the Beam version to 2.11.0, the newest version, to fix various errors.
**Minor changes to note:**
- Updates to the deprecated packages and syntaxes.
**Tests for the changes:**
- Existing tests confirm the changes
**Other comments:**
- None
Closes #204
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
index 8d1f6a2..07f3a0e 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
@@ -19,10 +19,17 @@
package org.apache.nemo.compiler.frontend.beam.source;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.nemo.common.exception.MetricException;
import org.apache.nemo.common.ir.Readable;
+import org.apache.nemo.common.ir.vertex.SourceVertex;
+import org.apache.nemo.common.test.EmptyComponents;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.Field;
@@ -30,14 +37,6 @@
import java.util.Arrays;
import java.util.List;
-import org.apache.nemo.common.ir.vertex.SourceVertex;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.nemo.common.test.EmptyComponents;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* SourceVertex implementation for BoundedSource.
* @param <O> output type.
@@ -181,10 +180,10 @@
@Override
public List<String> getLocations() throws Exception {
- if (boundedSource instanceof HadoopInputFormatIO.HadoopInputFormatBoundedSource) {
+ if (boundedSource instanceof HadoopFormatIO.HadoopInputFormatBoundedSource) {
final Field inputSplitField = boundedSource.getClass().getDeclaredField("inputSplit");
inputSplitField.setAccessible(true);
- final InputSplit inputSplit = ((HadoopInputFormatIO.SerializableSplit) inputSplitField
+ final InputSplit inputSplit = ((HadoopFormatIO.SerializableSplit) inputSplitField
.get(boundedSource)).getSplit();
return Arrays.asList(inputSplit.getLocations());
} else {
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
index 2ab09a7..b9f3a15 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
@@ -19,25 +19,31 @@
package org.apache.nemo.examples.beam;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.*;
-import org.apache.beam.sdk.transforms.*;
-import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO;
-import org.apache.beam.sdk.values.*;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.fs.FileSystem;
-
-import java.io.*;
-import java.util.*;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.UUID;
+
/**
* Helper class for handling source/sink in a generic way.
* Assumes String-type PCollections.
@@ -65,7 +71,7 @@
hadoopConf.setClass("value.class", Text.class, Object.class);
// Without translations, Beam internally does some weird cloning
- final HadoopInputFormatIO.Read<Long, String> read = HadoopInputFormatIO.<Long, String>read()
+ final HadoopFormatIO.Read<Long, String> read = HadoopFormatIO.<Long, String>read()
.withConfiguration(hadoopConf)
.withKeyTranslation(new SimpleFunction<LongWritable, Long>() {
@Override
@@ -87,6 +93,7 @@
/**
* Write data.
+ * NEMO-365: This method could later be replaced using the HadoopFormatIO class.
* @param dataToWrite data to write
* @param path path to write data
* @return returns {@link PDone}
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java
index 84e20fd..8be20f3 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java
@@ -19,10 +19,13 @@
package org.apache.nemo.examples.beam;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.*;
import java.util.List;
@@ -62,7 +65,9 @@
.collect(Collectors.toList());
// Create a source PCollection
- final PCollection<Row> inputTable = PBegin.in(p).apply(Create.of(rows).withCoder(schema.getRowCoder()));
+ final PCollection<Row> inputTable = PBegin.in(p)
+ .apply(Create.of(rows).withCoder(RowCoder.of(schema)))
+ .setRowSchema(schema);
// Run 2 SQL queries
// ==> Sum of ints larger than 1
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WriteOneFilePerWindow.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WriteOneFilePerWindow.java
index 53ea982..8a38b78 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WriteOneFilePerWindow.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WriteOneFilePerWindow.java
@@ -30,9 +30,10 @@
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
import javax.annotation.Nullable;
-import static org.apache.beam.repackaged.beam_runners_core_java.com.google.common.base.MoreObjects.firstNonNull;
- /**
+import static com.google.common.base.MoreObjects.firstNonNull;
+
+/**
* This class is brought from beam/examples/common/WriteOneFilePerWindow.java.
*
*/
diff --git a/pom.xml b/pom.xml
index 2909e82..3e05b66 100644
--- a/pom.xml
+++ b/pom.xml
@@ -44,7 +44,7 @@
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<auto-service.version>1.0-rc2</auto-service.version>
- <beam.version>2.6.0</beam.version>
+ <beam.version>2.11.0</beam.version>
<spark.version>2.2.0</spark.version>
<scala.version>2.11.8</scala.version>
<kryo.version>4.0.1</kryo.version>
diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/MetricUtils.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/MetricUtils.java
index 845d795..23fdef7 100644
--- a/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/MetricUtils.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/MetricUtils.java
@@ -36,7 +36,6 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.*;
-import java.util.Enumeration;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Stream;
@@ -80,7 +79,6 @@
*/
private static Pair<HashBiMap<Integer, Class<? extends ExecutionProperty>>,
HashBiMap<Pair<Integer, Integer>, ExecutionProperty<?>>> loadMetaData() {
- deregisterBeamDriver();
try (final Connection c = DriverManager.getConnection(MetricUtils.POSTGRESQL_METADATA_DB_NAME,
"postgres", "fake_password")) {
try (final Statement statement = c.createStatement()) {
@@ -145,7 +143,6 @@
}
LOG.info("Saving Metadata..");
- deregisterBeamDriver();
try (final Connection c = DriverManager.getConnection(MetricUtils.POSTGRESQL_METADATA_DB_NAME,
"postgres", "fake_password")) {
try (final Statement statement = c.createStatement()) {
@@ -294,23 +291,4 @@
throw new MetricException(e);
}
}
-
- /**
- * De-register Beam JDBC driver, which produces inconsistent results.
- */
- public static void deregisterBeamDriver() {
- final String beamDriver = "org.apache.beam.sdk.extensions.sql.impl.JdbcDriver";
- final Enumeration<Driver> drivers = DriverManager.getDrivers();
- while (drivers.hasMoreElements()) {
- final Driver d = drivers.nextElement();
- if (d.getClass().getName().equals(beamDriver)) {
- try {
- DriverManager.deregisterDriver(d);
- } catch (SQLException e) {
- throw new MetricException(e);
- }
- break;
- }
- }
- }
}
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricStore.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricStore.java
index 76a8f52..160e8bf 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricStore.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricStore.java
@@ -247,7 +247,6 @@
try (final Connection c = DriverManager.getConnection(address, id, passwd)) {
LOG.info("Opened database successfully at {}", MetricUtils.POSTGRESQL_METADATA_DB_NAME);
- MetricUtils.deregisterBeamDriver();
saveOptimizationMetrics(c, syntax);
} catch (SQLException e) {
LOG.error("Error while saving optimization metrics to PostgreSQL: {}", e);