YARN-4757. Add the ability to split reverse zone subnets. Contributed by Shane Kumpf.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/api/RegistryConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/api/RegistryConstants.java
index 7115a4c..f4fecfd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/api/RegistryConstants.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/api/RegistryConstants.java
@@ -144,6 +144,26 @@
   String KEY_DNS_ZONES_DIR = DNS_PREFIX + "zones-dir";
 
   /**
+   * Split Reverse Zone.
+   * It may be necessary to spit large reverse zone subnets
+   * into multiple zones to handle existing hosts collocated
+   * with containers.
+   */
+  String KEY_DNS_SPLIT_REVERSE_ZONE = DNS_PREFIX + "split-reverse-zone";
+
+  /**
+   * Default value for splitting the reverse zone.
+   */
+  boolean DEFAULT_DNS_SPLIT_REVERSE_ZONE = false;
+
+  /**
+   * Split Reverse Zone IP Range.
+   * How many IPs should be part of each reverse zone split
+   */
+  String KEY_DNS_SPLIT_REVERSE_ZONE_RANGE = DNS_PREFIX +
+      "split-reverse-zone-range";
+
+  /**
    * Key to set if the registry is secure: {@value}.
    * Turning it on changes the permissions policy from "open access"
    * to restrictions on kerberos with the option of
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNS.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNS.java
index 52b3c37..126795a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNS.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNS.java
@@ -16,6 +16,7 @@
  */
 package org.apache.hadoop.registry.server.dns;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.filefilter.IOFileFilter;
 import org.apache.commons.net.util.Base64;
@@ -269,17 +270,61 @@
   }
 
   /**
+   * Return the number of zones in the map.
+   *
+   * @return number of zones in the map
+   */
+  @VisibleForTesting
+  protected int getZoneCount() {
+    return zones.size();
+  }
+
+  /**
    * Initializes the reverse lookup zone (mapping IP to name).
    *
    * @param conf the Hadoop configuration.
-   * @throws IOException
+   * @throws IOException if the DNSSEC key can not be read.
    */
   private void initializeReverseLookupZone(Configuration conf)
       throws IOException {
-    Name reverseLookupZoneName = getReverseZoneName(conf);
-    Zone reverseLookupZone =
-        configureZone(reverseLookupZoneName, conf);
-    zones.put(reverseLookupZone.getOrigin(), reverseLookupZone);
+    // Determine if the subnet should be split into
+    // multiple reverse zones, this can be necessary in
+    // network configurations where the hosts and containers
+    // are part of the same subnet (i.e. the containers only use
+    // part of the subnet).
+    Boolean shouldSplitReverseZone = conf.getBoolean(KEY_DNS_SPLIT_REVERSE_ZONE,
+        DEFAULT_DNS_SPLIT_REVERSE_ZONE);
+    if (shouldSplitReverseZone) {
+      int subnetCount = ReverseZoneUtils.getSubnetCountForReverseZones(conf);
+      addSplitReverseZones(conf, subnetCount);
+      // Single reverse zone
+    } else {
+      Name reverseLookupZoneName = getReverseZoneName(conf);
+      Zone reverseLookupZone = configureZone(reverseLookupZoneName, conf);
+      zones.put(reverseLookupZone.getOrigin(), reverseLookupZone);
+    }
+  }
+
+  /**
+   * Create the zones based on the zone count.
+   *
+   * @param conf        the Hadoop configuration.
+   * @param subnetCount number of subnets to create reverse zones for.
+   * @throws IOException if the DNSSEC key can not be read.
+   */
+  @VisibleForTesting
+  protected void addSplitReverseZones(Configuration conf, int subnetCount)
+      throws IOException {
+    String subnet = conf.get(KEY_DNS_ZONE_SUBNET);
+    String range = conf.get(KEY_DNS_SPLIT_REVERSE_ZONE_RANGE);
+
+    // Add the split reverse zones
+    for (int idx = 0; idx < subnetCount; idx++) {
+      Name reverseLookupZoneName = getReverseZoneName(ReverseZoneUtils
+          .getReverseZoneNetworkAddress(subnet, Integer.parseInt(range), idx));
+      Zone reverseLookupZone = configureZone(reverseLookupZoneName, conf);
+      zones.put(reverseLookupZone.getOrigin(), reverseLookupZone);
+    }
   }
 
   /**
@@ -427,7 +472,8 @@
    *
    * @param conf the Hadoop configuration.
    */
-  private void setDNSSECEnabled(Configuration conf) {
+  @VisibleForTesting
+  protected void setDNSSECEnabled(Configuration conf) {
     dnssecEnabled = conf.getBoolean(KEY_DNSSEC_ENABLED, false);
   }
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/ReverseZoneUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/ReverseZoneUtils.java
new file mode 100644
index 0000000..cb04f9e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/ReverseZoneUtils.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.registry.server.dns;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.net.Inet6Address;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.net.util.SubnetUtils;
+import org.apache.hadoop.conf.Configuration;
+import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_DNS_SPLIT_REVERSE_ZONE_RANGE;
+import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_DNS_ZONE_MASK;
+import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_DNS_ZONE_SUBNET;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utilities for configuring reverse zones.
+ */
+public final class ReverseZoneUtils {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ReverseZoneUtils.class);
+
+  private static final long POW3 = (long) Math.pow(256, 3);
+  private static final long POW2 = (long) Math.pow(256, 2);
+  private static final long POW1 = (long) Math.pow(256, 1);
+
+  private ReverseZoneUtils() {
+  }
+
+  /**
+   * Given a baseIp, range and index, return the network address for the
+   * reverse zone.
+   *
+   * @param baseIp base ip address to perform calculations against.
+   * @param range  number of ip addresses per subnet.
+   * @param index  the index of the subnet to calculate.
+   * @return the calculated ip address.
+   * @throws UnknownHostException if an invalid ip is provided.
+   */
+  protected static String getReverseZoneNetworkAddress(String baseIp, int range,
+      int index) throws UnknownHostException {
+    if (index < 0) {
+      throw new IllegalArgumentException(
+          String.format("Invalid index provided, must be positive: %d", index));
+    }
+    if (range < 0) {
+      throw new IllegalArgumentException(
+          String.format("Invalid range provided, cannot be negative: %d",
+              range));
+    }
+    return calculateIp(baseIp, range, index);
+  }
+
+  /**
+   * When splitting the reverse zone, return the number of subnets needed,
+   * given the range and netmask.
+   *
+   * @param conf the Hadoop configuration.
+   * @return The number of subnets given the range and netmask.
+   */
+  protected static int getSubnetCountForReverseZones(Configuration conf) {
+    String subnet = conf.get(KEY_DNS_ZONE_SUBNET);
+    String mask = conf.get(KEY_DNS_ZONE_MASK);
+    String range = conf.get(KEY_DNS_SPLIT_REVERSE_ZONE_RANGE);
+
+    int parsedRange;
+    try {
+      parsedRange = Integer.parseInt(range);
+    } catch (NumberFormatException e) {
+      LOG.error("The supplied range is not a valid integer: Supplied range: ",
+          range);
+      throw e;
+    }
+    if (parsedRange < 0) {
+      String msg = String
+          .format("Range cannot be negative: Supplied range: %d", parsedRange);
+      LOG.error(msg);
+      throw new IllegalArgumentException(msg);
+    }
+
+    int ipCount;
+    try {
+      SubnetUtils subnetUtils = new SubnetUtils(subnet, mask);
+      subnetUtils.setInclusiveHostCount(true);
+      ipCount = subnetUtils.getInfo().getAddressCount();
+
+    } catch (IllegalArgumentException e) {
+      LOG.error("The subnet or mask is invalid: Subnet: {} Mask: {}", subnet,
+          mask);
+      throw e;
+    }
+
+    if (parsedRange == 0) {
+      return ipCount;
+    }
+    return ipCount / parsedRange;
+  }
+
+  private static String calculateIp(String baseIp, int range, int index)
+      throws UnknownHostException {
+    long[] ipParts = splitIp(baseIp);
+
+    long ipNum1 = POW3 * ipParts[0];
+    long ipNum2 = POW2 * ipParts[1];
+    long ipNum3 = POW1 * ipParts[2];
+    long ipNum4 = ipParts[3];
+    long ipNum = ipNum1 + ipNum2 + ipNum3 + ipNum4;
+
+    ArrayList<Long> ipPartsOut = new ArrayList<>();
+    // First octet
+    long temp = ipNum + range * (long) index;
+    ipPartsOut.add(0, temp / POW3);
+
+    // Second octet
+    temp = temp - ipPartsOut.get(0) * POW3;
+    ipPartsOut.add(1, temp / POW2);
+
+    // Third octet
+    temp = temp - ipPartsOut.get(1) * POW2;
+    ipPartsOut.add(2, temp / POW1);
+
+    // Fourth octet
+    temp = temp - ipPartsOut.get(2) * POW1;
+    ipPartsOut.add(3, temp);
+
+    return StringUtils.join(ipPartsOut, '.');
+  }
+
+  @VisibleForTesting
+  protected static long[] splitIp(String baseIp) throws UnknownHostException {
+    InetAddress inetAddress;
+    try {
+      inetAddress = InetAddress.getByName(baseIp);
+    } catch (UnknownHostException e) {
+      LOG.error("Base IP address is invalid");
+      throw e;
+    }
+    if (inetAddress instanceof Inet6Address) {
+      throw new IllegalArgumentException(
+          "IPv6 is not yet supported for " + "reverse zones");
+    }
+    byte[] octets = inetAddress.getAddress();
+    if (octets.length != 4) {
+      throw new IllegalArgumentException("Base IP address is invalid");
+    }
+    long[] results = new long[4];
+    for (int i = 0; i < octets.length; i++) {
+      results[i] = octets[i] & 0xff;
+    }
+    return results;
+  }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/server/dns/TestRegistryDNS.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/server/dns/TestRegistryDNS.java
index 37f0d23..d58b1c8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/server/dns/TestRegistryDNS.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/server/dns/TestRegistryDNS.java
@@ -55,8 +55,7 @@
 import java.util.Date;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_DNS_ZONE_MASK;
-import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_DNS_ZONE_SUBNET;
+import static org.apache.hadoop.registry.client.api.RegistryConstants.*;
 
 /**
  *
@@ -541,6 +540,34 @@
     assertEquals("wrong name", "26.172.in-addr.arpa.", name.toString());
   }
 
+  @Test
+  public void testSplitReverseZoneNames() throws Exception {
+    Configuration conf = new Configuration();
+    registryDNS = new RegistryDNS("TestRegistry");
+    conf.set(RegistryConstants.KEY_DNS_DOMAIN, "example.com");
+    conf.set(KEY_DNS_SPLIT_REVERSE_ZONE, "true");
+    conf.set(KEY_DNS_SPLIT_REVERSE_ZONE_RANGE, "256");
+    conf.set(KEY_DNS_ZONE_SUBNET, "172.26.32.0");
+    conf.set(KEY_DNS_ZONE_MASK, "255.255.224.0");
+    conf.setTimeDuration(RegistryConstants.KEY_DNS_TTL, 30L, TimeUnit.SECONDS);
+    conf.set(RegistryConstants.KEY_DNS_ZONES_DIR,
+        getClass().getResource("/").getFile());
+    if (isSecure()) {
+      conf.setBoolean(RegistryConstants.KEY_DNSSEC_ENABLED, true);
+      conf.set(RegistryConstants.KEY_DNSSEC_PUBLIC_KEY,
+          "AwEAAe1Jev0Az1khlQCvf0nud1/CNHQwwPEu8BNchZthdDxKPVn29yrD "
+              + "CHoAWjwiGsOSw3SzIPrawSbHzyJsjn0oLBhGrH6QedFGnydoxjNsw3m/ "
+              + "SCmOjR/a7LGBAMDFKqFioi4gOyuN66svBeY+/5uw72+0ei9AQ20gqf6q "
+              + "l9Ozs5bV");
+      conf.set(RegistryConstants.KEY_DNSSEC_PRIVATE_KEY_FILE,
+          getClass().getResource("/test.private").getFile());
+    }
+    registryDNS.setDomainName(conf);
+    registryDNS.setDNSSECEnabled(conf);
+    registryDNS.addSplitReverseZones(conf, 4);
+    assertEquals(4, registryDNS.getZoneCount());
+  }
+
   public RegistryDNS getRegistryDNS() {
     return registryDNS;
   }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/server/dns/TestReverseZoneUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/server/dns/TestReverseZoneUtils.java
new file mode 100644
index 0000000..1331f75
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/server/dns/TestReverseZoneUtils.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.registry.server.dns;
+
+import java.net.UnknownHostException;
+import static org.junit.Assert.assertEquals;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Tests for the reverse zone utilities.
+ */
+public class TestReverseZoneUtils {
+  private static final String NET = "172.17.4.0";
+  private static final int RANGE = 256;
+  private static final int INDEX = 0;
+
+  @Rule public ExpectedException exception = ExpectedException.none();
+
+  @Test
+  public void testGetReverseZoneNetworkAddress() throws Exception {
+    assertEquals("172.17.4.0",
+        ReverseZoneUtils.getReverseZoneNetworkAddress(NET, RANGE, INDEX));
+  }
+
+  @Test
+  public void testSplitIp() throws Exception {
+    long[] splitIp = ReverseZoneUtils.splitIp(NET);
+    assertEquals(172, splitIp[0]);
+    assertEquals(17, splitIp[1]);
+    assertEquals(4, splitIp[2]);
+    assertEquals(0, splitIp[3]);
+  }
+
+  @Test
+  public void testThrowIllegalArgumentExceptionIfIndexIsNegative()
+      throws Exception {
+    exception.expect(IllegalArgumentException.class);
+    ReverseZoneUtils.getReverseZoneNetworkAddress(NET, RANGE, -1);
+  }
+
+  @Test
+  public void testThrowUnknownHostExceptionIfIpIsInvalid() throws Exception {
+    exception.expect(UnknownHostException.class);
+    ReverseZoneUtils
+        .getReverseZoneNetworkAddress("213124.21231.14123.13", RANGE, INDEX);
+  }
+
+  @Test
+  public void testThrowIllegalArgumentExceptionIfRangeIsNegative()
+      throws Exception {
+    exception.expect(IllegalArgumentException.class);
+    ReverseZoneUtils.getReverseZoneNetworkAddress(NET, -1, INDEX);
+  }
+
+  @Test
+  public void testVariousRangeAndIndexValues() throws Exception {
+    // Given the base address of 172.17.4.0, step 256 IP addresses, 5 times.
+    assertEquals("172.17.9.0",
+        ReverseZoneUtils.getReverseZoneNetworkAddress(NET, 256, 5));
+    assertEquals("172.17.4.128",
+        ReverseZoneUtils.getReverseZoneNetworkAddress(NET, 128, 1));
+    assertEquals("172.18.0.0",
+        ReverseZoneUtils.getReverseZoneNetworkAddress(NET, 256, 252));
+    assertEquals("172.17.12.0",
+        ReverseZoneUtils.getReverseZoneNetworkAddress(NET, 1024, 2));
+    assertEquals("172.17.4.0",
+        ReverseZoneUtils.getReverseZoneNetworkAddress(NET, 0, 1));
+    assertEquals("172.17.4.0",
+        ReverseZoneUtils.getReverseZoneNetworkAddress(NET, 1, 0));
+    assertEquals("172.17.4.1",
+        ReverseZoneUtils.getReverseZoneNetworkAddress(NET, 1, 1));
+  }
+}
\ No newline at end of file