DRILL-8409: Support the configuration of bind addresses for network services (#2777)

* Add support for setting RPC and HTTP bind addresses using boot options.
* Safely log Parquet batch read timing.
* Safely close page write store and column write store in ParquetRecordWriter.
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 1408935..b2f4e1f 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -936,7 +936,7 @@
           </execution>
         </executions>
       </plugin>
-      <plugin> <!-- generate the parser (Parser.jj is itself generated wit fmpp above) -->
+      <plugin> <!-- generate the parser (Parser.jj is itself generated with fmpp above) -->
         <groupId>org.codehaus.mojo</groupId>
         <artifactId>javacc-maven-plugin</artifactId>
         <version>2.6</version>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 9b32005..8fd01be 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -59,6 +59,7 @@
   public static final String BIT_RETRY_DELAY = "drill.exec.rpc.bit.server.retry.delay";
   public static final String BIT_TIMEOUT = "drill.exec.bit.timeout";
   public static final String SERVICE_NAME = "drill.exec.cluster-id";
+  public static final String RPC_BIND_ADDR = "drill.exec.rpc.bind_addr";
   public static final String INITIAL_BIT_PORT = "drill.exec.rpc.bit.server.port";
   public static final String INITIAL_DATA_PORT = "drill.exec.rpc.bit.server.dataport";
   public static final String BIT_RPC_TIMEOUT = "drill.exec.rpc.bit.timeout";
@@ -222,6 +223,7 @@
   public static final String HTTP_ENABLE = "drill.exec.http.enabled";
   public static final String HTTP_MAX_PROFILES = "drill.exec.http.max_profiles";
   public static final String HTTP_PROFILES_PER_PAGE = "drill.exec.http.profiles_per_page";
+  public static final String HTTP_BIND_ADDR = "drill.exec.http.bind_addr";
   public static final String HTTP_PORT = "drill.exec.http.port";
   public static final String HTTP_PORT_HUNT = "drill.exec.http.porthunt";
   public static final String HTTP_JETTY_SERVER_DUMP_AFTER_START = "drill.exec.http.jetty.server.dumpAfterStart";
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
index 0721557..d6a8108 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
@@ -20,6 +20,7 @@
 import java.util.List;
 
 import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.memory.BufferAllocator;
@@ -56,8 +57,10 @@
   @Override
   public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint, final boolean allowPortHunting) {
     server = new ControlServer(config, connectionRegistry);
-    int port = config.getBootstrapContext().getConfig().getInt(ExecConstants.INITIAL_BIT_PORT);
-    port = server.bind(port, allowPortHunting);
+    DrillConfig drillConfig = config.getBootstrapContext().getConfig();
+    String bindAddr = drillConfig.getString(ExecConstants.RPC_BIND_ADDR);
+    int port = drillConfig.getInt(ExecConstants.INITIAL_BIT_PORT);
+    port = server.bind(bindAddr, port, allowPortHunting);
     DrillbitEndpoint completeEndpoint = partialEndpoint.toBuilder().setControlPort(port).build();
     connectionRegistry.setLocalEndpoint(completeEndpoint);
     handlerRegistry.setEndpoint(completeEndpoint);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
index 6265012..995535d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
@@ -20,6 +20,7 @@
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.memory.BufferAllocator;
@@ -52,10 +53,13 @@
   public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint, boolean allowPortHunting) {
     server = new DataServer(config);
     int port = partialEndpoint.getControlPort() + 1;
-    if (config.getBootstrapContext().getConfig().hasPath(ExecConstants.INITIAL_DATA_PORT)) {
-      port = config.getBootstrapContext().getConfig().getInt(ExecConstants.INITIAL_DATA_PORT);
+    DrillConfig drillConfig = config.getBootstrapContext().getConfig();
+
+    String bindAddr = drillConfig.getString(ExecConstants.RPC_BIND_ADDR);
+    if (drillConfig.hasPath(ExecConstants.INITIAL_DATA_PORT)) {
+      port = drillConfig.getInt(ExecConstants.INITIAL_DATA_PORT);
     }
-    port = server.bind(port, allowPortHunting);
+    port = server.bind(bindAddr, port, allowPortHunting);
     return partialEndpoint.toBuilder().setDataPort(port).build();
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
index 673976d..e6f1499 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
@@ -156,8 +156,9 @@
 
     final int acceptors = config.getInt(ExecConstants.HTTP_JETTY_SERVER_ACCEPTORS);
     final int selectors = config.getInt(ExecConstants.HTTP_JETTY_SERVER_SELECTORS);
+    String bindAddr = config.getString(ExecConstants.HTTP_BIND_ADDR);
     int port = config.getInt(ExecConstants.HTTP_PORT);
-    ServerConnector connector = createConnector(port, acceptors, selectors);
+    ServerConnector connector = createConnector(bindAddr, port, acceptors, selectors);
 
     final int handlers = config.getInt(ExecConstants.HTTP_JETTY_SERVER_HANDLERS);
     threadPool.setMaxThreads(handlers + connector.getAcceptors() + connector.getSelectorManager().getSelectorCount());
@@ -310,7 +311,7 @@
 
   public int getPort() {
     if (!isRunning()) {
-      throw new UnsupportedOperationException("Http is not enabled");
+      throw new UnsupportedOperationException("HTTP server is not enabled");
     }
     return ((ServerConnector)embeddedJetty.getConnectors()[0]).getPort();
   }
@@ -319,16 +320,21 @@
     return (embeddedJetty != null && embeddedJetty.getConnectors().length == 1);
   }
 
-  private ServerConnector createConnector(int port, int acceptors, int selectors) throws Exception {
+  private ServerConnector createConnector(
+      String bindAddr,
+      int port,
+      int acceptors,
+      int selectors
+  ) throws Exception {
     final ServerConnector serverConnector;
     if (config.getBoolean(ExecConstants.HTTP_ENABLE_SSL)) {
       try {
-        serverConnector = createHttpsConnector(port, acceptors, selectors);
+        serverConnector = createHttpsConnector(bindAddr, port, acceptors, selectors);
       } catch (DrillException e) {
         throw new DrillbitStartupException(e.getMessage(), e);
       }
     } else {
-      serverConnector = createHttpConnector(port, acceptors, selectors);
+      serverConnector = createHttpConnector(bindAddr, port, acceptors, selectors);
     }
 
     return serverConnector;
@@ -341,8 +347,13 @@
    *
    * @return Initialized {@link ServerConnector} for HTTPS connections.
    */
-  private ServerConnector createHttpsConnector(int port, int acceptors, int selectors) throws Exception {
-    logger.info("Setting up HTTPS connector for web server");
+  private ServerConnector createHttpsConnector(
+      String bindAddr,
+      int port,
+      int acceptors,
+      int selectors
+  ) throws Exception {
+    logger.info("Setting up HTTPS connector for web server at {}:{}", bindAddr, port);
     SslContextFactory sslContextFactory = new SslContextFactoryConfigurator(config,
         workManager.getContext().getEndpoint().getAddress())
         .configureNewSslContextFactory();
@@ -354,6 +365,7 @@
         null, null, null, acceptors, selectors,
         new SslConnectionFactory(sslContextFactory, HttpVersion.HTTP_1_1.asString()),
         new HttpConnectionFactory(httpsConfig));
+    sslConnector.setHost(bindAddr);
     sslConnector.setPort(port);
 
     return sslConnector;
@@ -364,10 +376,11 @@
    *
    * @return Initialized {@link ServerConnector} instance for HTTP connections.
    */
-  private ServerConnector createHttpConnector(int port, int acceptors, int selectors) {
-    logger.info("Setting up HTTP connector for web server");
+  private ServerConnector createHttpConnector(String bindAddr, int port, int acceptors, int selectors) {
+    logger.info("Setting up HTTP connector for web server at {}:{}", bindAddr, port);
     final ServerConnector httpConnector =
         new ServerConnector(embeddedJetty, null, null, null, acceptors, selectors, new HttpConnectionFactory(baseHttpConfig()));
+    httpConnector.setHost(bindAddr);
     httpConnector.setPort(port);
 
     return httpConnector;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
index 6ae6f44..06daf6a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
@@ -29,6 +29,7 @@
 import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.util.DrillVersionInfo;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.DrillbitStartupException;
@@ -53,6 +54,7 @@
   private final DataConnectionCreator dataPool;
 
   private final String hostName;
+  private final String bindAddr;
   private final int intialUserPort;
   private final boolean allowPortHunting;
   private final boolean isDistributedMode;
@@ -73,14 +75,16 @@
     dataAllocator = newAllocator(context, "rpc:bit-data",
         "drill.exec.rpc.bit.server.memory.data.reservation", "drill.exec.rpc.bit.server.memory.data.maximum");
 
+    DrillConfig drillConfig = context.getConfig();
     final EventLoopGroup eventLoopGroup = TransportCheck.createEventLoopGroup(
-        context.getConfig().getInt(ExecConstants.USER_SERVER_RPC_THREADS), "UserServer-");
+        drillConfig.getInt(ExecConstants.USER_SERVER_RPC_THREADS), "UserServer-");
     userServer = new UserServer(context, userAllocator, eventLoopGroup, manager.getUserWorker());
     controller = new ControllerImpl(context, controlAllocator, manager.getControlMessageHandler());
     dataPool = new DataConnectionCreator(context, dataAllocator, manager.getWorkBus(), manager.getBee());
 
     hostName = context.getHostName();
-    intialUserPort = context.getConfig().getInt(ExecConstants.INITIAL_USER_PORT);
+    bindAddr = drillConfig.getString(ExecConstants.RPC_BIND_ADDR);
+    intialUserPort = drillConfig.getInt(ExecConstants.INITIAL_USER_PORT);
     this.allowPortHunting = allowPortHunting;
     this.isDistributedMode = isDistributedMode;
   }
@@ -93,11 +97,13 @@
 
   public DrillbitEndpoint start() throws DrillbitStartupException, UnknownHostException {
     // loopback address check
-    if (isDistributedMode && InetAddress.getByName(hostName).isLoopbackAddress()) {
-      throw new DrillbitStartupException("Drillbit is disallowed to bind to loopback address in distributed mode.");
+    if (isDistributedMode && InetAddress.getByName(bindAddr).isLoopbackAddress()) {
+      throw new DrillbitStartupException(
+        "Drillbit may not bind to a loopback address in distributed mode."
+      );
     }
 
-    userPort = userServer.bind(intialUserPort, allowPortHunting);
+    userPort = userServer.bind(bindAddr, intialUserPort, allowPortHunting);
     DrillbitEndpoint partialEndpoint = DrillbitEndpoint.newBuilder()
         .setAddress(hostName)
         .setUserPort(userPort)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index d58747c..c57a7c2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -30,6 +30,7 @@
 import java.util.stream.Collectors;
 
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
@@ -491,8 +492,16 @@
         flushParquetFileWriter();
       }
     } finally {
-      store.close();
-      pageStore.close();
+      AutoCloseables.closeSilently(pageStore);
+
+      // ColumnWriteStore doesn't implement AutoCloseable so a manual safe
+      // closure must be written.
+      try {
+        store.close();
+      } catch (Exception e) {
+        logger.warn("Error closing {}", store, e);
+      }
+
       codecFactory.release();
 
       store = null;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index b9c365c..1a8e6f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -231,7 +231,14 @@
         "\nRow group index: " + rowGroupIndex +
         "\nRecords to read: " + numRecordsToRead, e);
     } finally {
-      parquetReaderStats.timeProcess.addAndGet(timer.elapsed(TimeUnit.NANOSECONDS));
+      if (parquetReaderStats != null) {
+        parquetReaderStats.timeProcess.addAndGet(timer.elapsed(TimeUnit.NANOSECONDS));
+      } else {
+        logger.warn(
+          "Cannot log batch read timing because no Parquet reader stats tracker " +
+          "is available (probably due to an earlier error during query execution)."
+        );
+      }
     }
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
index 31a304e..28dfc27 100644
--- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
+++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
@@ -57,7 +57,8 @@
 import org.slf4j.LoggerFactory;
 
 @InterfaceAudience.Private
-public class ParquetColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore, Closeable {
+public class ParquetColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore,
+AutoCloseable {
   private static final Logger LOG = LoggerFactory.getLogger(ParquetColumnChunkPageWriteStore.class);
 
   private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 749bd11..488d8af 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -104,7 +104,8 @@
         }
       }
     },
-    use.ip: false
+    use.ip: false,
+    bind_addr: "0.0.0.0"
   },
   optimizer: {
     implementation: "org.apache.drill.exec.opt.IdentityOptimizer"
@@ -154,6 +155,7 @@
     },
     ssl_enabled: false,
     porthunt: false,
+    bind_addr: "0.0.0.0",
     port: 8047,
     jetty : {
       server : {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java
index 65b0351..a00401c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java
@@ -81,6 +81,7 @@
 @Category(SecurityTest.class)
 public class TestBitBitKerberos extends ClusterTest {
   private static KerberosHelper krbHelper;
+  private static final String BIND_ADDR = "127.0.0.1";
 
   private int port = 1234;
 
@@ -172,7 +173,7 @@
         new DataServerRequestHandler(workBus, bee));
     DataServer server = new DataServer(config);
 
-    port = server.bind(port, true);
+    port = server.bind(BIND_ADDR, port, true);
     DrillbitEndpoint ep = DrillbitEndpoint.newBuilder().setAddress("localhost").setDataPort(port).build();
     DataConnectionManager connectionManager = new DataConnectionManager(ep, config);
     DataTunnel tunnel = new DataTunnel(connectionManager);
@@ -218,7 +219,7 @@
         new DataConnectionConfig(c2.getAllocator(), c2, new DataServerRequestHandler(workBus, bee));
       final DataServer server = new DataServer(config);
 
-      port = server.bind(port, true);
+      port = server.bind(BIND_ADDR, port, true);
       DrillbitEndpoint ep = DrillbitEndpoint.newBuilder().setAddress("localhost").setDataPort(port).build();
       final DataConnectionManager connectionManager = new DataConnectionManager(ep, config);
       final DataTunnel tunnel = new DataTunnel(connectionManager);
@@ -265,7 +266,7 @@
         new DataServerRequestHandler(workBus, bee));
       final DataServer server = new DataServer(config);
 
-      port = server.bind(port, true);
+      port = server.bind(BIND_ADDR, port, true);
       final DrillbitEndpoint ep = DrillbitEndpoint.newBuilder().setAddress("localhost").setDataPort(port).build();
       final DataConnectionManager connectionManager = new DataConnectionManager(ep, config);
       final DataTunnel tunnel = new DataTunnel(connectionManager);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java
index 20a7340..4b145df 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java
@@ -61,6 +61,9 @@
 import static org.mockito.Mockito.when;
 
 public class TestBitRpc extends ExecTest {
+
+  private static final String BIND_ADDR = "127.0.0.1";
+
   @Test
   public void testConnectionBackpressure() throws Exception {
     final WorkerBee bee = mock(WorkerBee.class);
@@ -80,7 +83,7 @@
         new DataServerRequestHandler(workBus, bee));
     DataServer server = new DataServer(config);
 
-    port = server.bind(port, true);
+    port = server.bind(BIND_ADDR, port, true);
     DrillbitEndpoint ep = DrillbitEndpoint.newBuilder().setAddress("localhost").setDataPort(port).build();
     DataConnectionManager manager = new DataConnectionManager(ep, config);
     DataTunnel tunnel = new DataTunnel(manager);
@@ -113,7 +116,7 @@
             new DataServerRequestHandler(workBus, bee));
     DataServer server = new DataServer(config);
 
-    port = server.bind(port, true);
+    port = server.bind(BIND_ADDR, port, true);
     DrillbitEndpoint ep = DrillbitEndpoint.newBuilder().setAddress("localhost").setDataPort(port).build();
     DataConnectionManager manager = new DataConnectionManager(ep, config);
     DataTunnel tunnel = new DataTunnel(manager);
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
index 5a6bed8..2870d66 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
@@ -189,11 +189,11 @@
     return null;
   }
 
-  public int bind(final int initialPort, boolean allowPortHunting) {
+  public int bind(String bindAddr, final int initialPort, boolean allowPortHunting) {
     int port = initialPort - 1;
     while (true) {
       try {
-        b.bind(++port).sync();
+        b.bind(bindAddr, ++port).sync();
         break;
       } catch (Exception e) {
         // TODO(DRILL-3026):  Revisit:  Exception is not (always) BindException.