[NEMO-364] Upgrade Beam version (#204)

JIRA: [NEMO-364: Upgrade Beam](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-364)

**Major changes:**
- Upgrades the Beam version to 2.11.0, the newest version, to fix various errors.

**Minor changes to note:**
- Updates to the deprecated packages and syntaxes.

**Tests for the changes:**
- Existing tests confirm the changes

**Other comments:**
- None

Closes #204 
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
index 8d1f6a2..07f3a0e 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
@@ -19,10 +19,17 @@
 package org.apache.nemo.compiler.frontend.beam.source;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.nemo.common.exception.MetricException;
 import org.apache.nemo.common.ir.Readable;
+import org.apache.nemo.common.ir.vertex.SourceVertex;
+import org.apache.nemo.common.test.EmptyComponents;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.lang.reflect.Field;
@@ -30,14 +37,6 @@
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.nemo.common.ir.vertex.SourceVertex;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.nemo.common.test.EmptyComponents;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * SourceVertex implementation for BoundedSource.
  * @param <O> output type.
@@ -181,10 +180,10 @@
 
     @Override
     public List<String> getLocations() throws Exception {
-      if (boundedSource instanceof HadoopInputFormatIO.HadoopInputFormatBoundedSource) {
+      if (boundedSource instanceof HadoopFormatIO.HadoopInputFormatBoundedSource) {
         final Field inputSplitField = boundedSource.getClass().getDeclaredField("inputSplit");
         inputSplitField.setAccessible(true);
-        final InputSplit inputSplit = ((HadoopInputFormatIO.SerializableSplit) inputSplitField
+        final InputSplit inputSplit = ((HadoopFormatIO.SerializableSplit) inputSplitField
             .get(boundedSource)).getSplit();
         return Arrays.asList(inputSplit.getLocations());
       } else {
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
index 2ab09a7..b9f3a15 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
@@ -19,25 +19,31 @@
 package org.apache.nemo.examples.beam;
 
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.*;
-import org.apache.beam.sdk.transforms.*;
-import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO;
-import org.apache.beam.sdk.values.*;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.fs.FileSystem;
-
-import java.io.*;
-import java.util.*;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.UUID;
+
 /**
  * Helper class for handling source/sink in a generic way.
  * Assumes String-type PCollections.
@@ -65,7 +71,7 @@
       hadoopConf.setClass("value.class", Text.class, Object.class);
 
       // Without translations, Beam internally does some weird cloning
-      final HadoopInputFormatIO.Read<Long, String> read = HadoopInputFormatIO.<Long, String>read()
+      final HadoopFormatIO.Read<Long, String> read = HadoopFormatIO.<Long, String>read()
           .withConfiguration(hadoopConf)
           .withKeyTranslation(new SimpleFunction<LongWritable, Long>() {
             @Override
@@ -87,6 +93,7 @@
 
   /**
    * Write data.
+   * NEMO-365: This method could later be replaced using the HadoopFormatIO class.
    * @param dataToWrite data to write
    * @param path        path to write data
    * @return            returns {@link PDone}
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java
index 84e20fd..8be20f3 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java
@@ -19,10 +19,13 @@
 package org.apache.nemo.examples.beam;
 
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.RowCoder;
 import org.apache.beam.sdk.extensions.sql.SqlTransform;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.values.*;
 
 import java.util.List;
@@ -62,7 +65,9 @@
       .collect(Collectors.toList());
 
     // Create a source PCollection
-    final PCollection<Row> inputTable = PBegin.in(p).apply(Create.of(rows).withCoder(schema.getRowCoder()));
+    final PCollection<Row> inputTable = PBegin.in(p)
+      .apply(Create.of(rows).withCoder(RowCoder.of(schema)))
+      .setRowSchema(schema);
 
     // Run 2 SQL queries
     // ==> Sum of ints larger than 1
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WriteOneFilePerWindow.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WriteOneFilePerWindow.java
index 53ea982..8a38b78 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WriteOneFilePerWindow.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WriteOneFilePerWindow.java
@@ -30,9 +30,10 @@
 import org.joda.time.format.DateTimeFormatter;
 import org.joda.time.format.ISODateTimeFormat;
 import javax.annotation.Nullable;
-import static org.apache.beam.repackaged.beam_runners_core_java.com.google.common.base.MoreObjects.firstNonNull;
 
- /**
+import static com.google.common.base.MoreObjects.firstNonNull;
+
+/**
   * This class is brought from beam/examples/common/WriteOneFilePerWindow.java.
   *
   */
diff --git a/pom.xml b/pom.xml
index 2909e82..3e05b66 100644
--- a/pom.xml
+++ b/pom.xml
@@ -44,7 +44,7 @@
         <maven.compiler.target>1.8</maven.compiler.target>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <auto-service.version>1.0-rc2</auto-service.version>
-        <beam.version>2.6.0</beam.version>
+        <beam.version>2.11.0</beam.version>
         <spark.version>2.2.0</spark.version>
         <scala.version>2.11.8</scala.version>
         <kryo.version>4.0.1</kryo.version>
diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/MetricUtils.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/MetricUtils.java
index 845d795..23fdef7 100644
--- a/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/MetricUtils.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/MetricUtils.java
@@ -36,7 +36,6 @@
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.sql.*;
-import java.util.Enumeration;
 import java.util.concurrent.CountDownLatch;
 import java.util.stream.Stream;
 
@@ -80,7 +79,6 @@
    */
   private static Pair<HashBiMap<Integer, Class<? extends ExecutionProperty>>,
     HashBiMap<Pair<Integer, Integer>, ExecutionProperty<?>>> loadMetaData() {
-    deregisterBeamDriver();
     try (final Connection c = DriverManager.getConnection(MetricUtils.POSTGRESQL_METADATA_DB_NAME,
       "postgres", "fake_password")) {
       try (final Statement statement = c.createStatement()) {
@@ -145,7 +143,6 @@
     }
     LOG.info("Saving Metadata..");
 
-    deregisterBeamDriver();
     try (final Connection c = DriverManager.getConnection(MetricUtils.POSTGRESQL_METADATA_DB_NAME,
       "postgres", "fake_password")) {
       try (final Statement statement = c.createStatement()) {
@@ -294,23 +291,4 @@
       throw new MetricException(e);
     }
   }
-
-  /**
-   * De-register Beam JDBC driver, which produces inconsistent results.
-   */
-  public static void deregisterBeamDriver() {
-    final String beamDriver = "org.apache.beam.sdk.extensions.sql.impl.JdbcDriver";
-    final Enumeration<Driver> drivers = DriverManager.getDrivers();
-    while (drivers.hasMoreElements()) {
-      final Driver d = drivers.nextElement();
-      if (d.getClass().getName().equals(beamDriver)) {
-        try {
-          DriverManager.deregisterDriver(d);
-        } catch (SQLException e) {
-          throw new MetricException(e);
-        }
-        break;
-      }
-    }
-  }
 }
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricStore.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricStore.java
index 76a8f52..160e8bf 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricStore.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/metric/MetricStore.java
@@ -247,7 +247,6 @@
 
     try (final Connection c = DriverManager.getConnection(address, id, passwd)) {
       LOG.info("Opened database successfully at {}", MetricUtils.POSTGRESQL_METADATA_DB_NAME);
-      MetricUtils.deregisterBeamDriver();
       saveOptimizationMetrics(c, syntax);
     } catch (SQLException e) {
       LOG.error("Error while saving optimization metrics to PostgreSQL: {}", e);