Merge branch 'trunk' into HDFS-6584

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index a1dca66..0ca2953 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -530,6 +530,9 @@
 
     HADOOP-10922. User documentation for CredentialShell. (Larry McCay via wang)
 
+    HADOOP-11016. KMS should support signing cookies with zookeeper secret
+    manager. (tucu)
+
   OPTIMIZATIONS
 
     HADOOP-10838. Byte array native checksumming. (James Thomas via todd)
@@ -721,8 +724,8 @@
     HADOOP-11056. OsSecureRandom.setConf() might leak file descriptors (yzhang
     via cmccabe)
 
-    HDFS-6912. SharedFileDescriptorFactory should not allocate sparse files
-    (cmccabe)
+    HADOOP-11040. Return value of read(ByteBuffer buf) in CryptoInputStream is
+    incorrect in some cases. (Yi Liu via wang)
 
     BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
   
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
index e8964ed..68e9697 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
@@ -471,7 +471,16 @@
         streamOffset += n; // Read n bytes
         decrypt(buf, n, pos);
       }
-      return n;
+      
+      if (n >= 0) {
+        return unread + n;
+      } else {
+        if (unread == 0) {
+          return -1;
+        } else {
+          return unread;
+        }
+      }
     }
 
     throw new UnsupportedOperationException("ByteBuffer read unsupported " +
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderFactory.java
index 6ca0425b..ce99d79 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderFactory.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderFactory.java
@@ -46,7 +46,8 @@
                                              ) throws IOException;
 
   private static final ServiceLoader<KeyProviderFactory> serviceLoader =
-      ServiceLoader.load(KeyProviderFactory.class);
+      ServiceLoader.load(KeyProviderFactory.class,
+          KeyProviderFactory.class.getClassLoader());
 
   // Iterate through the serviceLoader to avoid lazy loading.
   // Lazy loading would require synchronization in concurrent use cases.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileStatus.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileStatus.java
index b261f7f..da3807d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileStatus.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileStatus.java
@@ -200,6 +200,15 @@
   public FsPermission getPermission() {
     return permission;
   }
+
+  /**
+   * Tell whether the underlying file or directory is encrypted or not.
+   *
+   * @return true if the underlying file is encrypted.
+   */
+  public boolean isEncrypted() {
+    return permission.getEncryptedBit();
+  }
   
   /**
    * Get the owner of the file.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/permission/FsPermission.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/permission/FsPermission.java
index ee84437..264a095 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/permission/FsPermission.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/permission/FsPermission.java
@@ -294,6 +294,13 @@
     return false;
   }
 
+  /**
+   * Returns true if the file is encrypted or directory is in an encryption zone
+   */
+  public boolean getEncryptedBit() {
+    return false;
+  }
+
   /** Set the user file creation mask (umask) */
   public static void setUMask(Configuration conf, FsPermission umask) {
     conf.set(UMASK_LABEL, String.format("%1$03o", umask.toShort()));
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md
index 70796cc..e59fa1b 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md
@@ -64,6 +64,33 @@
 
     def isSymlink(FS, p) = p in symlinks(FS)
 
+### 'boolean inEncryptionZone(Path p)'
+
+Return True if the data for p is encrypted. The nature of the encryption and the
+mechanism for creating an encryption zone are implementation details not covered
+in this specification. No guarantees are made about the quality of the
+encryption. The metadata is not encrypted.
+
+#### Preconditions
+
+    if not exists(FS, p) : raise FileNotFoundException
+
+#### Postconditions
+
+#### Invariants
+
+All files and directories under a directory in an encryption zone are also in an
+encryption zone
+
+    forall d in directories(FS): inEncyptionZone(FS, d) implies
+      forall c in children(FS, d) where (isFile(FS, c) or isDir(FS, c)) :
+        inEncyptionZone(FS, c)
+
+For all files in an encrypted zone, the data is encrypted, but the encryption
+type and specification are not defined.
+
+      forall f in files(FS) where  inEncyptionZone(FS, c):
+        isEncrypted(data(f))
 
 ### `FileStatus getFileStatus(Path p)`
 
@@ -88,6 +115,10 @@
             stat.length = 0
             stat.isdir = False
             stat.symlink = FS.Symlinks[p]
+        if inEncryptionZone(FS, p) :
+            stat.isEncrypted = True
+        else
+            stat.isEncrypted = False
 
 ### `Path getHomeDirectory()`
 
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java
index f5acc73..86bb64d 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java
@@ -469,6 +469,7 @@
       int bufPos) throws Exception {
     buf.position(bufPos);
     int n = ((ByteBufferReadable) in).read(buf);
+    Assert.assertEquals(bufPos + n, buf.position());
     byte[] readData = new byte[n];
     buf.rewind();
     buf.position(bufPos);
@@ -568,6 +569,7 @@
     // Read forward len1
     ByteBuffer buf = ByteBuffer.allocate(len1);
     int nRead = ((ByteBufferReadable) in).read(buf);
+    Assert.assertEquals(nRead, buf.position());
     readData = new byte[nRead];
     buf.rewind();
     buf.get(readData);
@@ -575,9 +577,10 @@
     System.arraycopy(data, (int)pos, expectedData, 0, nRead);
     Assert.assertArrayEquals(readData, expectedData);
     
-    // Pos should be len1 + 2 * len2 + nRead
+    long lastPos = pos;
+    // Pos should be lastPos + nRead
     pos = ((Seekable) in).getPos();
-    Assert.assertEquals(len1 + 2 * len2 + nRead, pos);
+    Assert.assertEquals(lastPos + nRead, pos);
     
     // Pos: 1/3 dataLen
     positionedReadCheck(in , dataLen / 3);
@@ -589,13 +592,15 @@
     System.arraycopy(data, (int)pos, expectedData, 0, len1);
     Assert.assertArrayEquals(readData, expectedData);
     
-    // Pos should be 2 * len1 + 2 * len2 + nRead
+    lastPos = pos;
+    // Pos should be lastPos + len1
     pos = ((Seekable) in).getPos();
-    Assert.assertEquals(2 * len1 + 2 * len2 + nRead, pos);
+    Assert.assertEquals(lastPos + len1, pos);
     
     // Read forward len1
     buf = ByteBuffer.allocate(len1);
     nRead = ((ByteBufferReadable) in).read(buf);
+    Assert.assertEquals(nRead, buf.position());
     readData = new byte[nRead];
     buf.rewind();
     buf.get(readData);
@@ -603,6 +608,11 @@
     System.arraycopy(data, (int)pos, expectedData, 0, nRead);
     Assert.assertArrayEquals(readData, expectedData);
     
+    lastPos = pos;
+    // Pos should be lastPos + nRead
+    pos = ((Seekable) in).getPos();
+    Assert.assertEquals(lastPos + nRead, pos);
+    
     // ByteBuffer read after EOF
     ((Seekable) in).seek(dataLen);
     buf.clear();
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java
index 65ebfb1..cbbb27e 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java
@@ -21,6 +21,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.junit.Test;
@@ -30,6 +31,7 @@
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.rm;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
 
 /**
@@ -66,6 +68,16 @@
   }
 
   @Test
+  public void testFsIsEncrypted() throws Exception {
+      describe("create an empty file and call FileStatus.isEncrypted()");
+      final Path path = path("file");
+      createFile(getFileSystem(), path, false, new byte[0]);
+      final FileStatus stat = getFileSystem().getFileStatus(path);
+      assertFalse("Expecting false for stat.isEncrypted()",
+          stat.isEncrypted());
+  }
+
+  @Test
   public void testOpenReadDir() throws Throwable {
     describe("create & read a directory");
     Path path = path("zero.dir");
diff --git a/hadoop-common-project/hadoop-kms/pom.xml b/hadoop-common-project/hadoop-kms/pom.xml
index 2c225cb..e6b21aa 100644
--- a/hadoop-common-project/hadoop-kms/pom.xml
+++ b/hadoop-common-project/hadoop-kms/pom.xml
@@ -187,6 +187,11 @@
       <artifactId>metrics-core</artifactId>
       <scope>compile</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-test</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/hadoop-common-project/hadoop-kms/src/main/conf/kms-site.xml b/hadoop-common-project/hadoop-kms/src/main/conf/kms-site.xml
index 20896fc..4f4694c 100644
--- a/hadoop-common-project/hadoop-kms/src/main/conf/kms-site.xml
+++ b/hadoop-common-project/hadoop-kms/src/main/conf/kms-site.xml
@@ -16,7 +16,7 @@
 
   <!-- KMS Backend KeyProvider -->
   <property>
-    <name>hadoop.security.key.provider.path</name>
+    <name>hadoop.kms.key.provider.uri</name>
     <value>jceks://file@/${user.home}/kms.keystore</value>
     <description>
     </description>
@@ -68,4 +68,61 @@
     </description>
   </property>
 
+  <!-- Authentication cookie signature source -->
+
+  <property>
+    <name>hadoop.kms.authentication.signer.secret.provider</name>
+    <value>random</value>
+    <description>
+      Indicates how the secret to sign the authentication cookies will be
+      stored. Options are 'random' (default), 'string' and 'zookeeper'.
+      If using a setup with multiple KMS instances, 'zookeeper' should be used.
+    </description>
+  </property>
+
+  <!-- Configuration for 'zookeeper' authentication cookie signature source -->
+
+  <property>
+    <name>hadoop.kms.authentication.signer.secret.provider.zookeeper.path</name>
+    <value>/hadoop-kms/hadoop-auth-signature-secret</value>
+    <description>
+      The Zookeeper ZNode path where the KMS instances will store and retrieve
+      the secret from.
+    </description>
+  </property>
+
+  <property>
+    <name>hadoop.kms.authentication.signer.secret.provider.zookeeper.connection.string</name>
+    <value>#HOSTNAME#:#PORT#,...</value>
+    <description>
+      The Zookeeper connection string, a list of hostnames and port comma
+      separated.
+    </description>
+  </property>
+
+  <property>
+    <name>hadoop.kms.authentication.signer.secret.provider.zookeeper.auth.type</name>
+    <value>kerberos</value>
+    <description>
+      The Zookeeper authentication type, 'none' or 'sasl' (Kerberos).
+    </description>
+  </property>
+
+  <property>
+    <name>hadoop.kms.authentication.signer.secret.provider.zookeeper.kerberos.keytab</name>
+    <value>/etc/hadoop/conf/kms.keytab</value>
+    <description>
+      The absolute path for the Kerberos keytab with the credentials to
+      connect to Zookeeper.
+    </description>
+  </property>
+
+  <property>
+    <name>hadoop.kms.authentication.signer.secret.provider.zookeeper.kerberos.principal</name>
+    <value>kms/#HOSTNAME#</value>
+    <description>
+      The Kerberos service principal used to connect to Zookeeper.
+    </description>
+  </property>
+
 </configuration>
diff --git a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSAuthenticationFilter.java b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSAuthenticationFilter.java
index 4df6db5..79652f3 100644
--- a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSAuthenticationFilter.java
+++ b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSAuthenticationFilter.java
@@ -46,7 +46,8 @@
 @InterfaceAudience.Private
 public class KMSAuthenticationFilter
     extends DelegationTokenAuthenticationFilter {
-  private static final String CONF_PREFIX = KMSConfiguration.CONFIG_PREFIX +
+
+  public static final String CONFIG_PREFIX = KMSConfiguration.CONFIG_PREFIX +
       "authentication.";
 
   @Override
@@ -56,9 +57,9 @@
     Configuration conf = KMSWebApp.getConfiguration();
     for (Map.Entry<String, String> entry : conf) {
       String name = entry.getKey();
-      if (name.startsWith(CONF_PREFIX)) {
+      if (name.startsWith(CONFIG_PREFIX)) {
         String value = conf.get(name);
-        name = name.substring(CONF_PREFIX.length());
+        name = name.substring(CONFIG_PREFIX.length());
         props.setProperty(name, value);
       }
     }
diff --git a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSConfiguration.java b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSConfiguration.java
index f028119..c9b0491 100644
--- a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSConfiguration.java
+++ b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSConfiguration.java
@@ -40,6 +40,10 @@
   public static final String KEY_ACL_PREFIX = "key.acl.";
   public static final String DEFAULT_KEY_ACL_PREFIX = "default.key.acl.";
 
+  // Property to set the backing KeyProvider
+  public static final String KEY_PROVIDER_URI = CONFIG_PREFIX +
+      "key.provider.uri";
+
   // Property to Enable/Disable Caching
   public static final String KEY_CACHE_ENABLE = CONFIG_PREFIX +
       "cache.enable";
diff --git a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebApp.java b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebApp.java
index 0827b78..c9eeb1d 100644
--- a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebApp.java
+++ b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebApp.java
@@ -39,6 +39,7 @@
 import javax.servlet.ServletContextListener;
 
 import java.io.File;
+import java.net.URI;
 import java.net.URL;
 import java.util.List;
 
@@ -159,17 +160,12 @@
           new AccessControlList(AccessControlList.WILDCARD_ACL_VALUE));
 
       // intializing the KeyProvider
-
-      List<KeyProvider> providers = KeyProviderFactory.getProviders(kmsConf);
-      if (providers.isEmpty()) {
+      String providerString = kmsConf.get(KMSConfiguration.KEY_PROVIDER_URI);
+      if (providerString == null) {
         throw new IllegalStateException("No KeyProvider has been defined");
       }
-      if (providers.size() > 1) {
-        LOG.warn("There is more than one KeyProvider configured '{}', using " +
-            "the first provider",
-            kmsConf.get(KeyProviderFactory.KEY_PROVIDER_PATH));
-      }
-      KeyProvider keyProvider = providers.get(0);
+      KeyProvider keyProvider =
+          KeyProviderFactory.get(new URI(providerString), kmsConf);
       if (kmsConf.getBoolean(KMSConfiguration.KEY_CACHE_ENABLE,
           KMSConfiguration.KEY_CACHE_ENABLE_DEFAULT)) {
         long keyTimeOutMillis =
diff --git a/hadoop-common-project/hadoop-kms/src/site/apt/index.apt.vm b/hadoop-common-project/hadoop-kms/src/site/apt/index.apt.vm
index d70f2a6..b2755a1 100644
--- a/hadoop-common-project/hadoop-kms/src/site/apt/index.apt.vm
+++ b/hadoop-common-project/hadoop-kms/src/site/apt/index.apt.vm
@@ -51,7 +51,7 @@
 
 +---+
   <property>
-    <name>hadoop.security.key.provider.path</name>
+    <name>hadoop.kms.key.provider.uri</name>
     <value>jceks://file@/${user.home}/kms.keystore</value>
   </property>
 
@@ -448,16 +448,16 @@
   KMS supports access control for all non-read operations at the Key level.
   All Key Access operations are classified as :
 
-  * MANAGEMENT - createKey, deleteKey, rolloverNewVersion
+    * MANAGEMENT - createKey, deleteKey, rolloverNewVersion
 
-  * GENERATE_EEK - generateEncryptedKey, warmUpEncryptedKeys
+    * GENERATE_EEK - generateEncryptedKey, warmUpEncryptedKeys
 
-  * DECRYPT_EEK - decryptEncryptedKey;
+    * DECRYPT_EEK - decryptEncryptedKey
 
-  * READ - getKeyVersion, getKeyVersions, getMetadata, getKeysMetadata,
-           getCurrentKey;
+    * READ - getKeyVersion, getKeyVersions, getMetadata, getKeysMetadata,
+             getCurrentKey
 
-  * ALL - all of the above;
+    * ALL - all of the above
 
   These can be defined in the KMS <<<etc/hadoop/kms-acls.xml>>> as follows
 
@@ -554,41 +554,124 @@
   KMS delegation token secret manager can be configured with the following
   properties:
 
-  +---+
-    <property>
-      <name>hadoop.kms.authentication.delegation-token.update-interval.sec</name>
-      <value>86400</value>
-      <description>
-        How often the master key is rotated, in seconds. Default value 1 day.
-      </description>
-    </property>
++---+
+  <property>
+    <name>hadoop.kms.authentication.delegation-token.update-interval.sec</name>
+    <value>86400</value>
+    <description>
+      How often the master key is rotated, in seconds. Default value 1 day.
+    </description>
+  </property>
 
-    <property>
-      <name>hadoop.kms.authentication.delegation-token.max-lifetime.sec</name>
-      <value>604800</value>
-      <description>
-        Maximum lifetime of a delagation token, in seconds. Default value 7 days.
-      </description>
-    </property>
+  <property>
+    <name>hadoop.kms.authentication.delegation-token.max-lifetime.sec</name>
+    <value>604800</value>
+    <description>
+      Maximum lifetime of a delagation token, in seconds. Default value 7 days.
+    </description>
+  </property>
 
-    <property>
-      <name>hadoop.kms.authentication.delegation-token.renew-interval.sec</name>
-      <value>86400</value>
-      <description>
-        Renewal interval of a delagation token, in seconds. Default value 1 day.
-      </description>
-    </property>
+  <property>
+    <name>hadoop.kms.authentication.delegation-token.renew-interval.sec</name>
+    <value>86400</value>
+    <description>
+      Renewal interval of a delagation token, in seconds. Default value 1 day.
+    </description>
+  </property>
 
-    <property>
-      <name>hadoop.kms.authentication.delegation-token.removal-scan-interval.sec</name>
-      <value>3600</value>
-      <description>
-        Scan interval to remove expired delegation tokens.
-      </description>
-    </property>
-  +---+
+  <property>
+    <name>hadoop.kms.authentication.delegation-token.removal-scan-interval.sec</name>
+    <value>3600</value>
+    <description>
+      Scan interval to remove expired delegation tokens.
+    </description>
+  </property>
++---+
 
 
+** Using Multiple Instances of KMS Behind a Load-Balancer or VIP
+
+  KMS supports multiple KMS instances behind a load-balancer or VIP for
+  scalability and for HA purposes.
+
+  When using multiple KMS instances behind a load-balancer or VIP, requests from
+  the same user may be handled by different KMS instances.
+
+  KMS instances behind a load-balancer or VIP must be specially configured to
+  work properly as a single logical service.
+
+*** HTTP Kerberos Principals Configuration
+
+  TBD
+
+*** HTTP Authentication Signature
+
+  KMS uses Hadoop Authentication for HTTP authentication. Hadoop Authentication
+  issues a signed HTTP Cookie once the client has authenticated successfully.
+  This HTTP Cookie has an expiration time, after which it will trigger a new
+  authentication sequence. This is done to avoid triggering the authentication
+  on every HTTP request of a client.
+
+  A KMS instance must verify the HTTP Cookie signatures signed by other KMS
+  instances. To do this all KMS instances must share the signing secret.
+
+  This secret sharing can be done using a Zookeeper service which is configured
+  in KMS with the following properties in the <<<kms-site.xml>>>:
+
++---+
+  <property>
+    <name>hadoop.kms.authentication.signer.secret.provider</name>
+    <value>zookeeper</value>
+    <description>
+      Indicates how the secret to sign the authentication cookies will be
+      stored. Options are 'random' (default), 'string' and 'zookeeper'.
+      If using a setup with multiple KMS instances, 'zookeeper' should be used.
+    </description>
+  </property>
+  <property>
+    <name>hadoop.kms.authentication.signer.secret.provider.zookeeper.path</name>
+    <value>/hadoop-kms/hadoop-auth-signature-secret</value>
+    <description>
+      The Zookeeper ZNode path where the KMS instances will store and retrieve
+      the secret from.
+    </description>
+  </property>
+  <property>
+    <name>hadoop.kms.authentication.signer.secret.provider.zookeeper.connection.string</name>
+    <value>#HOSTNAME#:#PORT#,...</value>
+    <description>
+      The Zookeeper connection string, a list of hostnames and port comma
+      separated.
+    </description>
+  </property>
+  <property>
+    <name>hadoop.kms.authentication.signer.secret.provider.zookeeper.auth.type</name>
+    <value>kerberos</value>
+    <description>
+      The Zookeeper authentication type, 'none' or 'sasl' (Kerberos).
+    </description>
+  </property>
+  <property>
+    <name>hadoop.kms.authentication.signer.secret.provider.zookeeper.kerberos.keytab</name>
+    <value>/etc/hadoop/conf/kms.keytab</value>
+    <description>
+      The absolute path for the Kerberos keytab with the credentials to
+      connect to Zookeeper.
+    </description>
+  </property>
+  <property>
+    <name>hadoop.kms.authentication.signer.secret.provider.zookeeper.kerberos.principal</name>
+    <value>kms/#HOSTNAME#</value>
+    <description>
+      The Kerberos service principal used to connect to Zookeeper.
+    </description>
+  </property>
++---+
+
+*** Delegation Tokens
+
+  TBD
+
 ** KMS HTTP REST API
 
 *** Create a Key
diff --git a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/MiniKMS.java b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/MiniKMS.java
index f64dcf0..16e78ce 100644
--- a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/MiniKMS.java
+++ b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/MiniKMS.java
@@ -166,7 +166,7 @@
     File kmsFile = new File(kmsConfDir, "kms-site.xml");
     if (!kmsFile.exists()) {
       Configuration kms = new Configuration(false);
-      kms.set("hadoop.security.key.provider.path",
+      kms.set(KMSConfiguration.KEY_PROVIDER_URI,
           "jceks://file@" + new Path(kmsConfDir, "kms.keystore").toUri());
       kms.set("hadoop.kms.authentication.type", "simple");
       kms.setBoolean(KMSConfiguration.KEY_AUTHORIZATION_ENABLE, false);
diff --git a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java
index f4f9fea..9211417 100644
--- a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java
+++ b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java
@@ -117,13 +117,14 @@
 
   protected Configuration createBaseKMSConf(File keyStoreDir) throws Exception {
     Configuration conf = new Configuration(false);
-    conf.set("hadoop.security.key.provider.path",
+    conf.set(KMSConfiguration.KEY_PROVIDER_URI,
         "jceks://file@" + new Path(keyStoreDir.getAbsolutePath(), "kms.keystore").toUri());
     conf.set("hadoop.kms.authentication.type", "simple");
     return conf;
   }
 
-  protected void writeConf(File confDir, Configuration conf) throws Exception {
+  public static void writeConf(File confDir, Configuration conf)
+      throws Exception {
     Writer writer = new FileWriter(new File(confDir,
         KMSConfiguration.KMS_SITE_XML));
     conf.writeXml(writer);
@@ -139,7 +140,7 @@
     writer.close();
   }
 
-  protected URI createKMSUri(URL kmsUrl) throws Exception {
+  public static URI createKMSUri(URL kmsUrl) throws Exception {
     String str = kmsUrl.toString();
     str = str.replaceFirst("://", "@");
     return new URI("kms://" + str);
diff --git a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMSWithZK.java b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMSWithZK.java
new file mode 100644
index 0000000..59b0002
--- /dev/null
+++ b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMSWithZK.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto.key.kms.server;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
+import org.apache.hadoop.crypto.key.KeyProvider.Options;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
+import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
+import org.apache.hadoop.crypto.key.kms.KMSClientProvider;
+import org.apache.hadoop.crypto.key.kms.KMSRESTConstants;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
+import org.apache.hadoop.security.authentication.util.ZKSignerSecretProvider;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.security.auth.Subject;
+import javax.security.auth.kerberos.KerberosPrincipal;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.LoginContext;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.Writer;
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.net.URL;
+import java.security.Principal;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+
+public class TestKMSWithZK {
+
+  protected Configuration createBaseKMSConf(File keyStoreDir) throws Exception {
+    Configuration conf = new Configuration(false);
+    conf.set("hadoop.security.key.provider.path",
+        "jceks://file@" + new Path(keyStoreDir.getAbsolutePath(),
+            "kms.keystore").toUri());
+    conf.set("hadoop.kms.authentication.type", "simple");
+    conf.setBoolean(KMSConfiguration.KEY_AUTHORIZATION_ENABLE, false);
+
+    conf.set(KMSACLs.Type.GET_KEYS.getAclConfigKey(), "foo");
+    return conf;
+  }
+
+  @Test
+  public void testMultipleKMSInstancesWithZKSigner() throws Exception {
+    final File testDir = TestKMS.getTestDir();
+    Configuration conf = createBaseKMSConf(testDir);
+
+    TestingServer zkServer = new TestingServer();
+    zkServer.start();
+
+    MiniKMS kms1 = null;
+    MiniKMS kms2 = null;
+
+    conf.set(KMSAuthenticationFilter.CONFIG_PREFIX +
+        AuthenticationFilter.SIGNER_SECRET_PROVIDER, "zookeeper");
+    conf.set(KMSAuthenticationFilter.CONFIG_PREFIX +
+            ZKSignerSecretProvider.ZOOKEEPER_CONNECTION_STRING,
+        zkServer.getConnectString());
+    conf.set(KMSAuthenticationFilter.CONFIG_PREFIX +
+            ZKSignerSecretProvider.ZOOKEEPER_PATH, "/secret");
+    TestKMS.writeConf(testDir, conf);
+
+    try {
+      kms1 = new MiniKMS.Builder()
+          .setKmsConfDir(testDir).setLog4jConfFile("log4j.properties").build();
+      kms1.start();
+
+      kms2 = new MiniKMS.Builder()
+          .setKmsConfDir(testDir).setLog4jConfFile("log4j.properties").build();
+      kms2.start();
+
+      final URL url1 = new URL(kms1.getKMSUrl().toExternalForm() +
+          KMSRESTConstants.SERVICE_VERSION +  "/" +
+          KMSRESTConstants.KEYS_NAMES_RESOURCE);
+      final URL url2 = new URL(kms2.getKMSUrl().toExternalForm() +
+          KMSRESTConstants.SERVICE_VERSION + "/" +
+          KMSRESTConstants.KEYS_NAMES_RESOURCE);
+
+      final DelegationTokenAuthenticatedURL.Token token =
+          new DelegationTokenAuthenticatedURL.Token();
+      final DelegationTokenAuthenticatedURL aUrl =
+          new DelegationTokenAuthenticatedURL();
+
+      UserGroupInformation ugiFoo = UserGroupInformation.createUserForTesting(
+          "foo", new String[]{"gfoo"});
+      UserGroupInformation ugiBar = UserGroupInformation.createUserForTesting(
+          "bar", new String[]{"gBar"});
+
+      ugiFoo.doAs(new PrivilegedExceptionAction<Object>() {
+        @Override
+        public Object run() throws Exception {
+          HttpURLConnection conn = aUrl.openConnection(url1, token);
+          Assert.assertEquals(HttpURLConnection.HTTP_OK,
+              conn.getResponseCode());
+          return null;
+        }
+      });
+
+      ugiBar.doAs(new PrivilegedExceptionAction<Object>() {
+        @Override
+        public Object run() throws Exception {
+          HttpURLConnection conn = aUrl.openConnection(url2, token);
+          Assert.assertEquals(HttpURLConnection.HTTP_OK,
+              conn.getResponseCode());
+          return null;
+        }
+      });
+
+      ugiBar.doAs(new PrivilegedExceptionAction<Object>() {
+        @Override
+        public Object run() throws Exception {
+          final DelegationTokenAuthenticatedURL.Token emptyToken =
+              new DelegationTokenAuthenticatedURL.Token();
+          HttpURLConnection conn = aUrl.openConnection(url2, emptyToken);
+          Assert.assertEquals(HttpURLConnection.HTTP_FORBIDDEN,
+              conn.getResponseCode());
+          return null;
+        }
+      });
+
+    } finally {
+      if (kms2 != null) {
+        kms2.stop();
+      }
+      if (kms1 != null) {
+        kms1.stop();
+      }
+      zkServer.stop();
+    }
+
+  }
+
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 55ffa31..a888da2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -544,6 +544,10 @@
     HDFS-6705. Create an XAttr that disallows the HDFS admin from accessing a
     file. (clamb via wang)
 
+    HDFS-6843. Create FileStatus isEncrypted() method (clamb via cmccabe)
+
+    HDFS-7004. Update KeyProvider instantiation to create by URI. (wang)
+
   OPTIMIZATIONS
 
     HDFS-6690. Deduplicate xattr names in memory. (wang)
@@ -743,6 +747,14 @@
     and TestDFSClientFailover.testDoesntDnsResolveLogicalURI failing on jdk7.
     (Akira Ajisaka via wang)
 
+    HDFS-6912. SharedFileDescriptorFactory should not allocate sparse files
+    (cmccabe)
+
+    HDFS-7075. hadoop-fuse-dfs fails because it cannot find
+    JavaKeyStoreProvider$Factory (cmccabe)
+
+    HDFS-7078. Fix listEZs to work correctly with snapshots. (wang)
+
     BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
   
       HDFS-6387. HDFS CLI admin tool for creating & deleting an
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 641fb52..7f04fc2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -595,6 +595,7 @@
   public static final String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY = "dfs.data.transfer.saslproperties.resolver.class";
   public static final int    DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES_DEFAULT = 100;
   public static final String DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES = "dfs.namenode.list.encryption.zones.num.responses";
+  public static final String DFS_ENCRYPTION_KEY_PROVIDER_URI = "dfs.encryption.key.provider.uri";
 
   // Journal-node related configs. These are read on the JN side.
   public static final String  DFS_JOURNALNODE_EDITS_DIR_KEY = "dfs.journalnode.edits.dir";
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
index 021890b..aba86d1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
@@ -1794,34 +1794,37 @@
    * Creates a new KeyProviderCryptoExtension by wrapping the
    * KeyProvider specified in the given Configuration.
    *
-   * @param conf Configuration specifying a single, non-transient KeyProvider.
+   * @param conf Configuration
    * @return new KeyProviderCryptoExtension, or null if no provider was found.
    * @throws IOException if the KeyProvider is improperly specified in
    *                             the Configuration
    */
   public static KeyProviderCryptoExtension createKeyProviderCryptoExtension(
       final Configuration conf) throws IOException {
-    final List<KeyProvider> providers = KeyProviderFactory.getProviders(conf);
-    if (providers == null || providers.size() == 0) {
+    final String providerUriStr =
+        conf.get(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, null);
+    // No provider set in conf
+    if (providerUriStr == null) {
       return null;
     }
-    if (providers.size() > 1) {
-      StringBuilder builder = new StringBuilder();
-      builder.append("Found multiple KeyProviders but only one is permitted [");
-      String prefix = " ";
-      for (KeyProvider kp: providers) {
-        builder.append(prefix + kp.toString());
-        prefix = ", ";
-      }
-      builder.append("]");
-      throw new IOException(builder.toString());
+    final URI providerUri;
+    try {
+      providerUri = new URI(providerUriStr);
+    } catch (URISyntaxException e) {
+      throw new IOException(e);
     }
-    KeyProviderCryptoExtension provider = KeyProviderCryptoExtension
-        .createKeyProviderCryptoExtension(providers.get(0));
-    if (provider.isTransient()) {
-      throw new IOException("KeyProvider " + provider.toString()
+    KeyProvider keyProvider = KeyProviderFactory.get(providerUri, conf);
+    if (keyProvider == null) {
+      throw new IOException("Could not instantiate KeyProvider from " + 
+          DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI + " setting of '" + 
+          providerUriStr +"'");
+    }
+    if (keyProvider.isTransient()) {
+      throw new IOException("KeyProvider " + keyProvider.toString()
           + " was found but it is a transient provider.");
     }
-    return provider;
+    KeyProviderCryptoExtension cryptoProvider = KeyProviderCryptoExtension
+        .createKeyProviderCryptoExtension(keyProvider);
+    return cryptoProvider;
   }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/FsAclPermission.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/FsPermissionExtension.java
similarity index 67%
rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/FsAclPermission.java
rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/FsPermissionExtension.java
index de2762d..f74472d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/FsAclPermission.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/FsPermissionExtension.java
@@ -21,39 +21,46 @@
 import org.apache.hadoop.fs.permission.FsPermission;
 
 /**
- * HDFS permission subclass used to indicate an ACL is present.  The ACL bit is
- * not visible directly to users of {@link FsPermission} serialization.  This is
+ * HDFS permission subclass used to indicate an ACL is present and/or that the
+ * underlying file/dir is encrypted. The ACL/encrypted bits are not visible
+ * directly to users of {@link FsPermission} serialization.  This is
  * done for backwards compatibility in case any existing clients assume the
  * value of FsPermission is in a particular range.
  */
 @InterfaceAudience.Private
-public class FsAclPermission extends FsPermission {
+public class FsPermissionExtension extends FsPermission {
   private final static short ACL_BIT = 1 << 12;
+  private final static short ENCRYPTED_BIT = 1 << 13;
   private final boolean aclBit;
+  private final boolean encryptedBit;
 
   /**
-   * Constructs a new FsAclPermission based on the given FsPermission.
+   * Constructs a new FsPermissionExtension based on the given FsPermission.
    *
    * @param perm FsPermission containing permission bits
    */
-  public FsAclPermission(FsPermission perm) {
+  public FsPermissionExtension(FsPermission perm, boolean hasAcl,
+      boolean isEncrypted) {
     super(perm.toShort());
-    aclBit = true;
+    aclBit = hasAcl;
+    encryptedBit = isEncrypted;
   }
 
   /**
-   * Creates a new FsAclPermission by calling the base class constructor.
+   * Creates a new FsPermissionExtension by calling the base class constructor.
    *
    * @param perm short containing permission bits
    */
-  public FsAclPermission(short perm) {
+  public FsPermissionExtension(short perm) {
     super(perm);
     aclBit = (perm & ACL_BIT) != 0;
+    encryptedBit = (perm & ENCRYPTED_BIT) != 0;
   }
 
   @Override
   public short toExtendedShort() {
-    return (short)(toShort() | (aclBit ? ACL_BIT : 0));
+    return (short)(toShort() |
+        (aclBit ? ACL_BIT : 0) | (encryptedBit ? ENCRYPTED_BIT : 0));
   }
 
   @Override
@@ -62,6 +69,11 @@
   }
 
   @Override
+  public boolean getEncryptedBit() {
+    return encryptedBit;
+  }
+
+  @Override
   public boolean equals(Object o) {
     // This intentionally delegates to the base class.  This is only overridden
     // to suppress a FindBugs warning.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index 8460859..862a803 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -67,7 +67,7 @@
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.fs.FileEncryptionInfo;
-import org.apache.hadoop.hdfs.protocol.FsAclPermission;
+import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
@@ -1282,7 +1282,7 @@
   }
   
   public static FsPermission convert(FsPermissionProto p) {
-    return new FsAclPermission((short)p.getPerm());
+    return new FsPermissionExtension((short)p.getPerm());
   }
   
   
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
index e72ae12..c428690 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
@@ -312,7 +312,22 @@
 
     int count = 0;
     for (EncryptionZoneInt ezi : tailMap.values()) {
-      zones.add(new EncryptionZone(getFullPathName(ezi),
+      /*
+       Skip EZs that are only present in snapshots. Re-resolve the path to 
+       see if the path's current inode ID matches EZ map's INode ID.
+       
+       INode#getFullPathName simply calls getParent recursively, so will return
+       the INode's parents at the time it was snapshotted. It will not 
+       contain a reference INode.
+      */
+      final String pathName = getFullPathName(ezi);
+      INodesInPath iip = dir.getINodesInPath(pathName, false);
+      INode lastINode = iip.getLastINode();
+      if (lastINode == null || lastINode.getId() != ezi.getINodeId()) {
+        continue;
+      }
+      // Add the EZ to the result list
+      zones.add(new EncryptionZone(pathName,
           ezi.getKeyName(), ezi.getINodeId()));
       count++;
       if (count >= numResponses) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 3426bf2..9346ea5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -64,7 +64,7 @@
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException;
 import org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException;
-import org.apache.hadoop.hdfs.protocol.FsAclPermission;
+import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
@@ -2372,16 +2372,24 @@
      long size = 0;     // length is zero for directories
      short replication = 0;
      long blocksize = 0;
+     final boolean isEncrypted;
+
+     final FileEncryptionInfo feInfo = isRawPath ? null :
+         getFileEncryptionInfo(node, snapshot);
+
      if (node.isFile()) {
        final INodeFile fileNode = node.asFile();
        size = fileNode.computeFileSize(snapshot);
        replication = fileNode.getFileReplication(snapshot);
        blocksize = fileNode.getPreferredBlockSize();
+       isEncrypted = (feInfo != null) ||
+           (isRawPath && isInAnEZ(INodesInPath.fromINode(node)));
+     } else {
+       isEncrypted = isInAnEZ(INodesInPath.fromINode(node));
      }
+
      int childrenNum = node.isDirectory() ? 
          node.asDirectory().getChildrenNum(snapshot) : 0;
-     FileEncryptionInfo feInfo = isRawPath ? null :
-         getFileEncryptionInfo(node, snapshot);
 
      return new HdfsFileStatus(
         size, 
@@ -2390,7 +2398,7 @@
         blocksize,
         node.getModificationTime(snapshot),
         node.getAccessTime(snapshot),
-        getPermissionForFileStatus(node, snapshot),
+        getPermissionForFileStatus(node, snapshot, isEncrypted),
         node.getUserName(snapshot),
         node.getGroupName(snapshot),
         node.isSymlink() ? node.asSymlink().getSymlink() : null,
@@ -2411,6 +2419,7 @@
     short replication = 0;
     long blocksize = 0;
     LocatedBlocks loc = null;
+    final boolean isEncrypted;
     final FileEncryptionInfo feInfo = isRawPath ? null :
         getFileEncryptionInfo(node, snapshot);
     if (node.isFile()) {
@@ -2430,6 +2439,10 @@
       if (loc == null) {
         loc = new LocatedBlocks();
       }
+      isEncrypted = (feInfo != null) ||
+          (isRawPath && isInAnEZ(INodesInPath.fromINode(node)));
+    } else {
+      isEncrypted = isInAnEZ(INodesInPath.fromINode(node));
     }
     int childrenNum = node.isDirectory() ? 
         node.asDirectory().getChildrenNum(snapshot) : 0;
@@ -2438,7 +2451,7 @@
         new HdfsLocatedFileStatus(size, node.isDirectory(), replication,
           blocksize, node.getModificationTime(snapshot),
           node.getAccessTime(snapshot),
-          getPermissionForFileStatus(node, snapshot),
+          getPermissionForFileStatus(node, snapshot, isEncrypted),
           node.getUserName(snapshot), node.getGroupName(snapshot),
           node.isSymlink() ? node.asSymlink().getSymlink() : null, path,
           node.getId(), loc, childrenNum, feInfo, storagePolicy);
@@ -2454,17 +2467,21 @@
 
   /**
    * Returns an inode's FsPermission for use in an outbound FileStatus.  If the
-   * inode has an ACL, then this method will convert to a FsAclPermission.
+   * inode has an ACL or is for an encrypted file/dir, then this method will
+   * return an FsPermissionExtension.
    *
    * @param node INode to check
    * @param snapshot int snapshot ID
+   * @param isEncrypted boolean true if the file/dir is encrypted
    * @return FsPermission from inode, with ACL bit on if the inode has an ACL
+   * and encrypted bit on if it represents an encrypted file/dir.
    */
   private static FsPermission getPermissionForFileStatus(INode node,
-      int snapshot) {
+      int snapshot, boolean isEncrypted) {
     FsPermission perm = node.getFsPermission(snapshot);
-    if (node.getAclFeature(snapshot) != null) {
-      perm = new FsAclPermission(perm);
+    boolean hasAcl = node.getAclFeature(snapshot) != null;
+    if (hasAcl || isEncrypted) {
+      perm = new FsPermissionExtension(perm, hasAcl, isEncrypted);
     }
     return perm;
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
index 735191b..2b3d7e6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
@@ -181,9 +181,16 @@
   }
 
   /** Convert a string to a FsPermission object. */
-  private static FsPermission toFsPermission(final String s, Boolean aclBit) {
+  private static FsPermission toFsPermission(final String s, Boolean aclBit,
+      Boolean encBit) {
     FsPermission perm = new FsPermission(Short.parseShort(s, 8));
-    return (aclBit != null && aclBit) ? new FsAclPermission(perm) : perm;
+    final boolean aBit = (aclBit != null) ? aclBit : false;
+    final boolean eBit = (encBit != null) ? encBit : false;
+    if (aBit || eBit) {
+      return new FsPermissionExtension(perm, aBit, eBit);
+    } else {
+      return perm;
+    }
   }
 
   static enum PathType {
@@ -215,6 +222,9 @@
     if (perm.getAclBit()) {
       m.put("aclBit", true);
     }
+    if (perm.getEncryptedBit()) {
+      m.put("encBit", true);
+    }
     m.put("accessTime", status.getAccessTime());
     m.put("modificationTime", status.getModificationTime());
     m.put("blockSize", status.getBlockSize());
@@ -242,7 +252,7 @@
     final String owner = (String) m.get("owner");
     final String group = (String) m.get("group");
     final FsPermission permission = toFsPermission((String) m.get("permission"),
-      (Boolean)m.get("aclBit"));
+      (Boolean)m.get("aclBit"), (Boolean)m.get("encBit"));
     final long aTime = (Long) m.get("accessTime");
     final long mTime = (Long) m.get("modificationTime");
     final long blockSize = (Long) m.get("blockSize");
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 1a21405..b28b216 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -2138,4 +2138,12 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.encryption.key.provider.uri</name>
+  <description>
+    The KeyProvider to use when interacting with encryption keys used
+    when reading and writing to an encryption zone.
+  </description>
+</property>
+
 </configuration>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/TransparentEncryption.apt.vm b/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/TransparentEncryption.apt.vm
index 3689a77..0e2cb78 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/TransparentEncryption.apt.vm
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/TransparentEncryption.apt.vm
@@ -85,6 +85,12 @@
   A necessary prerequisite is an instance of the KMS, as well as a backing key store for the KMS.
   See the {{{../../hadoop-kms/index.html}KMS documentation}} for more information.
 
+** Configuring the cluster KeyProvider
+
+*** dfs.encryption.key.provider.uri
+
+  The KeyProvider to use when interacting with encryption keys used when reading and writing to an encryption zone.
+
 ** Selecting an encryption algorithm and codec
 
 *** hadoop.security.crypto.codec.classes.EXAMPLECIPHERSUITE
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/TestCryptoAdminCLI.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/TestCryptoAdminCLI.java
index adeabfe..1c870a2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/TestCryptoAdminCLI.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/TestCryptoAdminCLI.java
@@ -66,7 +66,7 @@
     tmpDir = new File(System.getProperty("test.build.data", "target"),
         UUID.randomUUID().toString()).getAbsoluteFile();
     final Path jksPath = new Path(tmpDir.toString(), "test.jks");
-    conf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
+    conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI,
         JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri());
 
     dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java
index 96f5fce..ff28200 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java
@@ -25,7 +25,9 @@
 import java.io.RandomAccessFile;
 import java.io.StringReader;
 import java.io.StringWriter;
+import java.net.URI;
 import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -47,6 +49,7 @@
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileContextTestWrapper;
 import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.FileSystemTestWrapper;
@@ -124,7 +127,7 @@
     // Set up java key store
     String testRoot = fsHelper.getTestRootDir();
     testRootDir = new File(testRoot).getAbsoluteFile();
-    conf.set(KeyProviderFactory.KEY_PROVIDER_PATH, getKeyProviderURI());
+    conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, getKeyProviderURI());
     conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
     // Lower the batch size for testing
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES,
@@ -669,7 +672,8 @@
     // Check KeyProvider state
     // Flushing the KP on the NN, since it caches, and init a test one
     cluster.getNamesystem().getProvider().flush();
-    KeyProvider provider = KeyProviderFactory.getProviders(conf).get(0);
+    KeyProvider provider = KeyProviderFactory
+        .get(new URI(conf.get(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI)), conf);
     List<String> keys = provider.getKeys();
     assertEquals("Expected NN to have created one key per zone", 1,
         keys.size());
@@ -693,7 +697,7 @@
   public void testCreateEZWithNoProvider() throws Exception {
     // Unset the key provider and make sure EZ ops don't work
     final Configuration clusterConf = cluster.getConfiguration(0);
-    clusterConf.set(KeyProviderFactory.KEY_PROVIDER_PATH, "");
+    clusterConf.unset(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI);
     cluster.restartNameNode(true);
     cluster.waitActive();
     final Path zone1 = new Path("/zone1");
@@ -705,13 +709,100 @@
       assertExceptionContains("since no key provider is available", e);
     }
     final Path jksPath = new Path(testRootDir.toString(), "test.jks");
-    clusterConf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
+    clusterConf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI,
         JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri()
     );
     // Try listing EZs as well
     assertNumZones(0);
   }
 
+  @Test(timeout = 120000)
+  public void testIsEncryptedMethod() throws Exception {
+    doTestIsEncryptedMethod(new Path("/"));
+    doTestIsEncryptedMethod(new Path("/.reserved/raw"));
+  }
+
+  private void doTestIsEncryptedMethod(Path prefix) throws Exception {
+    try {
+      dTIEM(prefix);
+    } finally {
+      for (FileStatus s : fsWrapper.listStatus(prefix)) {
+        fsWrapper.delete(s.getPath(), true);
+      }
+    }
+  }
+
+  private void dTIEM(Path prefix) throws Exception {
+    final HdfsAdmin dfsAdmin =
+      new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
+    // Create an unencrypted file to check isEncrypted returns false
+    final Path baseFile = new Path(prefix, "base");
+    fsWrapper.createFile(baseFile);
+    FileStatus stat = fsWrapper.getFileStatus(baseFile);
+    assertFalse("Expected isEncrypted to return false for " + baseFile,
+        stat.isEncrypted());
+
+    // Create an encrypted file to check isEncrypted returns true
+    final Path zone = new Path(prefix, "zone");
+    fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY);
+    final Path encFile = new Path(zone, "encfile");
+    fsWrapper.createFile(encFile);
+    stat = fsWrapper.getFileStatus(encFile);
+    assertTrue("Expected isEncrypted to return true for enc file" + encFile,
+        stat.isEncrypted());
+
+    // check that it returns true for an ez root
+    stat = fsWrapper.getFileStatus(zone);
+    assertTrue("Expected isEncrypted to return true for ezroot",
+        stat.isEncrypted());
+
+    // check that it returns true for a dir in the ez
+    final Path zoneSubdir = new Path(zone, "subdir");
+    fsWrapper.mkdir(zoneSubdir, FsPermission.getDirDefault(), true);
+    stat = fsWrapper.getFileStatus(zoneSubdir);
+    assertTrue(
+        "Expected isEncrypted to return true for ez subdir " + zoneSubdir,
+        stat.isEncrypted());
+
+    // check that it returns false for a non ez dir
+    final Path nonEzDirPath = new Path(prefix, "nonzone");
+    fsWrapper.mkdir(nonEzDirPath, FsPermission.getDirDefault(), true);
+    stat = fsWrapper.getFileStatus(nonEzDirPath);
+    assertFalse(
+        "Expected isEncrypted to return false for directory " + nonEzDirPath,
+        stat.isEncrypted());
+
+    // check that it returns true for listings within an ez
+    FileStatus[] statuses = fsWrapper.listStatus(zone);
+    for (FileStatus s : statuses) {
+      assertTrue("Expected isEncrypted to return true for ez stat " + zone,
+          s.isEncrypted());
+    }
+
+    statuses = fsWrapper.listStatus(encFile);
+    for (FileStatus s : statuses) {
+      assertTrue(
+          "Expected isEncrypted to return true for ez file stat " + encFile,
+          s.isEncrypted());
+    }
+
+    // check that it returns false for listings outside an ez
+    statuses = fsWrapper.listStatus(nonEzDirPath);
+    for (FileStatus s : statuses) {
+      assertFalse(
+          "Expected isEncrypted to return false for nonez stat " + nonEzDirPath,
+          s.isEncrypted());
+    }
+
+    statuses = fsWrapper.listStatus(baseFile);
+    for (FileStatus s : statuses) {
+      assertFalse(
+          "Expected isEncrypted to return false for non ez stat " + baseFile,
+          s.isEncrypted());
+    }
+  }
+
   private class MyInjector extends EncryptionFaultInjector {
     int generateCount;
     CountDownLatch ready;
@@ -940,6 +1031,9 @@
    */
   @Test(timeout = 60000)
   public void testSnapshotsOnEncryptionZones() throws Exception {
+    final String TEST_KEY2 = "testkey2";
+    DFSTestUtil.createKey(TEST_KEY2, cluster, conf);
+
     final int len = 8196;
     final Path zoneParent = new Path("/zones");
     final Path zone = new Path(zoneParent, "zone");
@@ -954,7 +1048,8 @@
     assertEquals("Got unexpected ez path", zone.toString(),
         dfsAdmin.getEncryptionZoneForPath(snap1Zone).getPath().toString());
 
-    // Now delete the encryption zone, recreate the dir, and take another snapshot
+    // Now delete the encryption zone, recreate the dir, and take another
+    // snapshot
     fsWrapper.delete(zone, true);
     fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
     final Path snap2 = fs.createSnapshot(zoneParent);
@@ -963,11 +1058,35 @@
         dfsAdmin.getEncryptionZoneForPath(snap2Zone));
 
     // Create the encryption zone again
-    dfsAdmin.createEncryptionZone(zone, TEST_KEY);
+    dfsAdmin.createEncryptionZone(zone, TEST_KEY2);
     final Path snap3 = fs.createSnapshot(zoneParent);
     final Path snap3Zone = new Path(snap3, zone.getName());
+    // Check that snap3's EZ has the correct settings
+    EncryptionZone ezSnap3 = dfsAdmin.getEncryptionZoneForPath(snap3Zone);
     assertEquals("Got unexpected ez path", zone.toString(),
-        dfsAdmin.getEncryptionZoneForPath(snap3Zone).getPath().toString());
+        ezSnap3.getPath().toString());
+    assertEquals("Unexpected ez key", TEST_KEY2, ezSnap3.getKeyName());
+    // Check that older snapshots still have the old EZ settings
+    EncryptionZone ezSnap1 = dfsAdmin.getEncryptionZoneForPath(snap1Zone);
+    assertEquals("Got unexpected ez path", zone.toString(),
+        ezSnap1.getPath().toString());
+    assertEquals("Unexpected ez key", TEST_KEY, ezSnap1.getKeyName());
+
+    // Check that listEZs only shows the current filesystem state
+    ArrayList<EncryptionZone> listZones = Lists.newArrayList();
+    RemoteIterator<EncryptionZone> it = dfsAdmin.listEncryptionZones();
+    while (it.hasNext()) {
+      listZones.add(it.next());
+    }
+    for (EncryptionZone z: listZones) {
+      System.out.println(z);
+    }
+    assertEquals("Did not expect additional encryption zones!", 1,
+        listZones.size());
+    EncryptionZone listZone = listZones.get(0);
+    assertEquals("Got unexpected ez path", zone.toString(),
+        listZone.getPath().toString());
+    assertEquals("Unexpected ez key", TEST_KEY2, listZone.getKeyName());
 
     // Verify contents of the snapshotted file
     final Path snapshottedZoneFile = new Path(
@@ -975,7 +1094,8 @@
     assertEquals("Contents of snapshotted file have changed unexpectedly",
         contents, DFSTestUtil.readFile(fs, snapshottedZoneFile));
 
-    // Now delete the snapshots out of order and verify the zones are still correct
+    // Now delete the snapshots out of order and verify the zones are still
+    // correct
     fs.deleteSnapshot(zoneParent, snap2.getName());
     assertEquals("Got unexpected ez path", zone.toString(),
         dfsAdmin.getEncryptionZoneForPath(snap1Zone).getPath().toString());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZonesWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZonesWithHA.java
index b604004..c74f990 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZonesWithHA.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZonesWithHA.java
@@ -20,7 +20,6 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
-import org.apache.hadoop.crypto.key.KeyProviderFactory;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.Path;
@@ -60,7 +59,7 @@
     fsHelper = new FileSystemTestHelper();
     String testRoot = fsHelper.getTestRootDir();
     testRootDir = new File(testRoot).getAbsoluteFile();
-    conf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
+    conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI,
         JavaKeyStoreProvider.SCHEME_NAME + "://file" + testRootDir + "/test.jks"
     );
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReservedRawPaths.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReservedRawPaths.java
index 20e4f4e..cc497ac 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReservedRawPaths.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReservedRawPaths.java
@@ -24,7 +24,6 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
-import org.apache.hadoop.crypto.key.KeyProviderFactory;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileContextTestWrapper;
 import org.apache.hadoop.fs.FileStatus;
@@ -70,7 +69,7 @@
     String testRoot = fsHelper.getTestRootDir();
     File testRootDir = new File(testRoot).getAbsoluteFile();
     final Path jksPath = new Path(testRootDir.toString(), "test.jks");
-    conf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
+    conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI,
         JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri()
     );
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java
index 1ddc774..adca0aa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java
@@ -39,7 +39,7 @@
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.AclException;
-import org.apache.hadoop.hdfs.protocol.FsAclPermission;
+import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -822,7 +822,8 @@
     fs.setPermission(path, FsPermission.createImmutable((short)0700));
     assertPermission((short)0700);
     fs.setPermission(path,
-      new FsAclPermission(FsPermission.createImmutable((short)0755)));
+      new FsPermissionExtension(FsPermission.
+          createImmutable((short)0755), true, true));
     INode inode = cluster.getNamesystem().getFSDirectory().getNode(
       path.toUri().getPath(), false);
     assertNotNull(inode);
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 51fe3cc..5a23814 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -377,6 +377,12 @@
     YARN-2529. Generic history service RPC interface doesn't work when service
     authorization is enabled. (Zhijie Shen via jianhe)
 
+    YARN-2558. Updated ContainerTokenIdentifier#read/write to use
+    ContainerId#getContainerId. (Tsuyoshi OZAWA via jianhe)
+
+    YARN-2559. Fixed NPE in SystemMetricsPublisher when retrieving
+    FinalApplicationStatus. (Zhijie Shen via jianhe)
+
 Release 2.5.1 - 2014-09-05
 
   INCOMPATIBLE CHANGES
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java
index 8b8177a..ca847e0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java
@@ -128,7 +128,7 @@
     out.writeLong(applicationId.getClusterTimestamp());
     out.writeInt(applicationId.getId());
     out.writeInt(applicationAttemptId.getAttemptId());
-    out.writeInt(this.containerId.getId());
+    out.writeLong(this.containerId.getContainerId());
     out.writeUTF(this.nmHostAddr);
     out.writeUTF(this.appSubmitter);
     out.writeInt(this.resource.getMemory());
@@ -147,7 +147,7 @@
     ApplicationAttemptId applicationAttemptId =
         ApplicationAttemptId.newInstance(applicationId, in.readInt());
     this.containerId =
-        ContainerId.newInstance(applicationAttemptId, in.readInt());
+        ContainerId.newInstance(applicationAttemptId, in.readLong());
     this.nmHostAddr = in.readUTF();
     this.appSubmitter = in.readUTF();
     int memory = in.readInt();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java
index ecf37b0..5da006c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java
@@ -160,7 +160,7 @@
 
   @SuppressWarnings("unchecked")
   public void appAttemptFinished(RMAppAttempt appAttempt,
-      RMAppAttemptState state, long finishedTime) {
+      RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
     if (publishSystemMetrics) {
       dispatcher.getEventHandler().handle(
           new AppAttemptFinishedEvent(
@@ -168,8 +168,10 @@
               appAttempt.getTrackingUrl(),
               appAttempt.getOriginalTrackingUrl(),
               appAttempt.getDiagnostics(),
-              appAttempt.getFinalApplicationStatus(),
-              RMServerUtils.createApplicationAttemptState(state),
+              // app will get the final status from app attempt, or create one
+              // based on app state if it doesn't exist
+              app.getFinalApplicationStatus(),
+              RMServerUtils.createApplicationAttemptState(appAttemtpState),
               finishedTime));
     }
   }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index 863130f..7ca57ee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -1159,8 +1159,10 @@
       appAttempt.rmContext.getRMApplicationHistoryWriter()
           .applicationAttemptFinished(appAttempt, finalAttemptState);
       appAttempt.rmContext.getSystemMetricsPublisher()
-          .appAttemptFinished(
-              appAttempt, finalAttemptState, System.currentTimeMillis());
+          .appAttemptFinished(appAttempt, finalAttemptState,
+              appAttempt.rmContext.getRMApps().get(
+                  appAttempt.applicationAttemptId.getApplicationId()),
+              System.currentTimeMillis());
     }
   }
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
index a97ae7b..63343e9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
@@ -174,7 +174,9 @@
         ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1);
     RMAppAttempt appAttempt = createRMAppAttempt(appAttemptId);
     metricsPublisher.appAttemptRegistered(appAttempt, Integer.MAX_VALUE + 1L);
-    metricsPublisher.appAttemptFinished(appAttempt, RMAppAttemptState.FINISHED,
+    RMApp app = mock(RMApp.class);
+    when(app.getFinalApplicationStatus()).thenReturn(FinalApplicationStatus.UNDEFINED);
+    metricsPublisher.appAttemptFinished(appAttempt, RMAppAttemptState.FINISHED, app,
         Integer.MAX_VALUE + 2L);
     TimelineEntity entity = null;
     do {
@@ -222,7 +224,7 @@
             event.getEventInfo().get(
                 AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO));
         Assert.assertEquals(
-            appAttempt.getFinalApplicationStatus().toString(),
+            FinalApplicationStatus.UNDEFINED.toString(),
             event.getEventInfo().get(
                 AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO));
         Assert.assertEquals(
@@ -340,8 +342,6 @@
     when(appAttempt.getTrackingUrl()).thenReturn("test tracking url");
     when(appAttempt.getOriginalTrackingUrl()).thenReturn(
         "test original tracking url");
-    when(appAttempt.getFinalApplicationStatus()).thenReturn(
-        FinalApplicationStatus.UNDEFINED);
     return appAttempt;
   }
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
index 6608ccd..b8e6f43 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
@@ -76,6 +76,7 @@
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
@@ -92,7 +93,6 @@
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@@ -289,7 +289,6 @@
     Mockito.doReturn(resourceScheduler).when(spyRMContext).getScheduler();
 
 
-    final String user = MockApps.newUserName();
     final String queue = MockApps.newQueue();
     submissionContext = mock(ApplicationSubmissionContext.class);
     when(submissionContext.getQueue()).thenReturn(queue);
@@ -1385,7 +1384,7 @@
     finalState =
         ArgumentCaptor.forClass(RMAppAttemptState.class);
     verify(publisher).appAttemptFinished(any(RMAppAttempt.class), finalState.capture(),
-        anyLong());
+        any(RMApp.class), anyLong());
     Assert.assertEquals(state, finalState.getValue());
   }
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java
index 6797165..9bb44ca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java
@@ -28,6 +28,9 @@
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.LinkedList;
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteStreams;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -158,6 +161,25 @@
       }
     }
   }
+
+  @Test (timeout = 500000)
+  public void testContainerManagerWithEpoch() throws Exception {
+    try {
+      yarnCluster = new MiniYARNCluster(TestContainerManagerSecurity.class
+          .getName(), 1, 1, 1);
+      yarnCluster.init(conf);
+      yarnCluster.start();
+
+      // Testing for container token tampering
+      testContainerTokenWithEpoch(conf);
+
+    } finally {
+      if (yarnCluster != null) {
+        yarnCluster.stop();
+        yarnCluster = null;
+      }
+    }
+  }
   
   private void testNMTokens(Configuration conf) throws Exception {
     NMTokenSecretManagerInRM nmTokenSecretManagerRM =
@@ -603,4 +625,74 @@
     Assert.assertTrue(testStartContainer(rpc, appAttemptId, nodeId,
         containerToken, nmToken, true).contains(sb.toString()));
   }
+
+  /**
+   * This tests whether a containerId is serialized/deserialized with epoch.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws YarnException
+   */
+  private void testContainerTokenWithEpoch(Configuration conf)
+      throws IOException, InterruptedException, YarnException {
+
+    LOG.info("Running test for serializing/deserializing containerIds");
+
+    NMTokenSecretManagerInRM nmTokenSecretManagerInRM =
+        yarnCluster.getResourceManager().getRMContext()
+            .getNMTokenSecretManager();
+    ApplicationId appId = ApplicationId.newInstance(1, 1);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 0);
+    ContainerId cId = ContainerId.newInstance(appAttemptId, (5L << 40) | 3L);
+    NodeManager nm = yarnCluster.getNodeManager(0);
+    NMTokenSecretManagerInNM nmTokenSecretManagerInNM =
+        nm.getNMContext().getNMTokenSecretManager();
+    String user = "test";
+
+    waitForNMToReceiveNMTokenKey(nmTokenSecretManagerInNM, nm);
+
+    NodeId nodeId = nm.getNMContext().getNodeId();
+
+    // Both id should be equal.
+    Assert.assertEquals(nmTokenSecretManagerInNM.getCurrentKey().getKeyId(),
+        nmTokenSecretManagerInRM.getCurrentKey().getKeyId());
+
+    // Creating a normal Container Token
+    RMContainerTokenSecretManager containerTokenSecretManager =
+        yarnCluster.getResourceManager().getRMContext().
+            getContainerTokenSecretManager();
+    Resource r = Resource.newInstance(1230, 2);
+    Token containerToken =
+        containerTokenSecretManager.createContainerToken(cId, nodeId, user, r,
+            Priority.newInstance(0), 0);
+
+    ByteArrayDataInput input = ByteStreams.newDataInput(
+        containerToken.getIdentifier().array());
+    ContainerTokenIdentifier containerTokenIdentifier =
+        new ContainerTokenIdentifier();
+    containerTokenIdentifier.readFields(input);
+    Assert.assertEquals(cId, containerTokenIdentifier.getContainerID());
+    Assert.assertEquals(
+        cId.toString(), containerTokenIdentifier.getContainerID().toString());
+
+    Token nmToken =
+        nmTokenSecretManagerInRM.createNMToken(appAttemptId, nodeId, user);
+
+    YarnRPC rpc = YarnRPC.create(conf);
+    testStartContainer(rpc, appAttemptId, nodeId, containerToken, nmToken,
+        false);
+
+    List<ContainerId> containerIds = new LinkedList<ContainerId>();
+    containerIds.add(cId);
+    ContainerManagementProtocol proxy
+        = getContainerManagementProtocolProxy(rpc, nmToken, nodeId, user);
+    GetContainerStatusesResponse res = proxy.getContainerStatuses(
+        GetContainerStatusesRequest.newInstance(containerIds));
+    Assert.assertNotNull(res.getContainerStatuses().get(0));
+    Assert.assertEquals(
+        cId, res.getContainerStatuses().get(0).getContainerId());
+    Assert.assertEquals(cId.toString(),
+        res.getContainerStatuses().get(0).getContainerId().toString());
+  }
 }