HADOOP-15059. Undoing the switch of Credentials to PB format as default - done via HADOOP-12563 for supporting 2.x to 3.x upgrades.

(cherry picked from commit f19638333b11da6dcab9a964e73a49947b8390fd)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Credentials.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Credentials.java
index 4d58981..3e51249 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Credentials.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Credentials.java
@@ -27,7 +27,6 @@
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
@@ -60,6 +59,28 @@
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public class Credentials implements Writable {
+  public enum SerializedFormat {
+    WRITABLE((byte) 0x00),
+    PROTOBUF((byte) 0x01);
+
+    // Caching to avoid reconstructing the array each time.
+    private static final SerializedFormat[] FORMATS = values();
+
+    final byte value;
+
+    SerializedFormat(byte val) {
+      this.value = val;
+    }
+
+    public static SerializedFormat valueOf(int val) {
+      try {
+        return FORMATS[val];
+      } catch (ArrayIndexOutOfBoundsException e) {
+        throw new IllegalArgumentException("Unknown credential format: " + val);
+      }
+    }
+  }
+
   private static final Logger LOG = LoggerFactory.getLogger(Credentials.class);
 
   private  Map<Text, byte[]> secretKeysMap = new HashMap<Text, byte[]>();
@@ -224,63 +245,74 @@
     if (!Arrays.equals(magic, TOKEN_STORAGE_MAGIC)) {
       throw new IOException("Bad header found in token storage.");
     }
-    byte version = in.readByte();
-    if (version != TOKEN_STORAGE_VERSION &&
-        version != OLD_TOKEN_STORAGE_VERSION) {
-      throw new IOException("Unknown version " + version +
-                            " in token storage.");
+    SerializedFormat format;
+    try {
+      format = SerializedFormat.valueOf(in.readByte());
+    } catch (IllegalArgumentException e) {
+      throw new IOException(e);
     }
-    if (version == OLD_TOKEN_STORAGE_VERSION) {
+    switch (format) {
+    case WRITABLE:
       readFields(in);
-    } else if (version == TOKEN_STORAGE_VERSION) {
+      break;
+    case PROTOBUF:
       readProto(in);
+      break;
+    default:
+      throw new IOException("Unsupported format " + format);
     }
   }
 
   private static final byte[] TOKEN_STORAGE_MAGIC =
       "HDTS".getBytes(StandardCharsets.UTF_8);
-  private static final byte TOKEN_STORAGE_VERSION = 1;
-
-  /**
-   *  For backward compatibility.
-   */
-  private static final byte OLD_TOKEN_STORAGE_VERSION = 0;
-
 
   public void writeTokenStorageToStream(DataOutputStream os)
       throws IOException {
+    // by default store in the oldest supported format for compatibility
+    writeTokenStorageToStream(os, SerializedFormat.WRITABLE);
+  }
+
+  public void writeTokenStorageToStream(DataOutputStream os,
+      SerializedFormat format) throws IOException {
+    switch (format) {
+    case WRITABLE:
+      writeWritableOutputStream(os);
+      break;
+    case PROTOBUF:
+      writeProtobufOutputStream(os);
+      break;
+    default:
+      throw new IllegalArgumentException("Unsupported serialized format: "
+          + format);
+    }
+  }
+
+  private void writeWritableOutputStream(DataOutputStream os)
+      throws IOException {
     os.write(TOKEN_STORAGE_MAGIC);
-    os.write(TOKEN_STORAGE_VERSION);
+    os.write(SerializedFormat.WRITABLE.value);
+    write(os);
+  }
+
+  private void writeProtobufOutputStream(DataOutputStream os)
+      throws IOException {
+    os.write(TOKEN_STORAGE_MAGIC);
+    os.write(SerializedFormat.PROTOBUF.value);
     writeProto(os);
   }
 
   public void writeTokenStorageFile(Path filename,
                                     Configuration conf) throws IOException {
-    FSDataOutputStream os = filename.getFileSystem(conf).create(filename);
-    writeTokenStorageToStream(os);
-    os.close();
+    // by default store in the oldest supported format for compatibility
+    writeTokenStorageFile(filename, conf, SerializedFormat.WRITABLE);
   }
 
-  /**
-   *  For backward compatibility.
-   */
-  public void writeLegacyTokenStorageLocalFile(File f) throws IOException {
-    writeLegacyOutputStream(new DataOutputStream(new FileOutputStream(f)));
-  }
-
-  /**
-   *  For backward compatibility.
-   */
-  public void writeLegacyTokenStorageFile(Path filename, Configuration conf)
-      throws IOException {
-    writeLegacyOutputStream(filename.getFileSystem(conf).create(filename));
-  }
-
-  private void writeLegacyOutputStream(DataOutputStream os) throws IOException {
-    os.write(TOKEN_STORAGE_MAGIC);
-    os.write(OLD_TOKEN_STORAGE_VERSION);
-    write(os);
-    os.close();
+  public void writeTokenStorageFile(Path filename, Configuration conf,
+      SerializedFormat format) throws IOException {
+    try (FSDataOutputStream os =
+             filename.getFileSystem(conf).create(filename)) {
+      writeTokenStorageToStream(os, format);
+    }
   }
 
   /**
@@ -312,7 +344,7 @@
    * @param out
    * @throws IOException
    */
-  public void writeProto(DataOutput out) throws IOException {
+  void writeProto(DataOutput out) throws IOException {
     CredentialsProto.Builder storage = CredentialsProto.newBuilder();
     for (Map.Entry<Text, Token<? extends TokenIdentifier>> e :
                                                          tokenMap.entrySet()) {
@@ -337,7 +369,7 @@
    * Populates keys/values from proto buffer storage.
    * @param in - stream ready to read a serialized proto buffer message
    */
-  public void readProto(DataInput in) throws IOException {
+  void readProto(DataInput in) throws IOException {
     CredentialsProto storage = CredentialsProto.parseDelimitedFrom((DataInputStream)in);
     for (CredentialsKVProto kv : storage.getTokensList()) {
       addToken(new Text(kv.getAliasBytes().toByteArray()),
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/DtFileOperations.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/DtFileOperations.java
index d128cc9..d36ad9b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/DtFileOperations.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/DtFileOperations.java
@@ -102,11 +102,13 @@
   public static void doFormattedWrite(
       File f, String format, Credentials creds, Configuration conf)
       throws IOException {
-    if (format == null || format.equals(FORMAT_PB)) {
-      creds.writeTokenStorageFile(fileToPath(f), conf);
-    } else { // if (format != null && format.equals(FORMAT_JAVA)) {
-      creds.writeLegacyTokenStorageLocalFile(f);
+    // default to oldest supported format for compatibility
+    Credentials.SerializedFormat credsFormat =
+        Credentials.SerializedFormat.WRITABLE;
+    if (format.equals(FORMAT_PB)) {
+      credsFormat = Credentials.SerializedFormat.PROTOBUF;
     }
+    creds.writeTokenStorageFile(fileToPath(f), conf, credsFormat);
   }
 
   /** Print out a Credentials file from the local filesystem.
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/TestDtUtilShell.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/TestDtUtilShell.java
index 1c25912..53b0d3d 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/TestDtUtilShell.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/TestDtUtilShell.java
@@ -20,7 +20,6 @@
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.FileInputStream;
-import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
 
@@ -29,14 +28,12 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.security.token.DtFetcher;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -54,7 +51,6 @@
   private static Configuration defaultConf = new Configuration();
   private static FileSystem localFs = null;
   private final String alias = "proxy_ip:1234";
-  private final String renewer = "yarn";
   private final String getUrl = SERVICE_GET.toString() + "://localhost:9000/";
   private final String getUrl2 = "http://localhost:9000/";
   public static Text SERVICE_GET = new Text("testTokenServiceGet");
@@ -111,11 +107,12 @@
     Token<? extends TokenIdentifier> tok = (Token<? extends TokenIdentifier>)
         new Token(IDENTIFIER, PASSWORD, KIND, service);
     creds.addToken(tok.getService(), tok);
+    Credentials.SerializedFormat format =
+        Credentials.SerializedFormat.PROTOBUF;
     if (legacy) {
-      creds.writeLegacyTokenStorageLocalFile(new File(tokenPath.toString()));
-    } else {
-      creds.writeTokenStorageFile(tokenPath, defaultConf);
+      format = Credentials.SerializedFormat.WRITABLE;
     }
+    creds.writeTokenStorageFile(tokenPath, defaultConf, format);
   }
 
   @Test
@@ -284,6 +281,6 @@
     DataInputStream in = new DataInputStream(
         new FileInputStream(tokenFilenameGet));
     spyCreds.readTokenStorageStream(in);
-    Mockito.verify(spyCreds).readProto(in);
+    Mockito.verify(spyCreds, Mockito.never()).readFields(in);
   }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java
index cf77804..c6ea91c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java
@@ -182,7 +182,8 @@
       Credentials cred = new Credentials();
       cred.addToken(token.getService(), token);
       // dtutil is replacing this tool; preserve legacy functionality
-      cred.writeLegacyTokenStorageFile(tokenFile, conf);
+      cred.writeTokenStorageFile(tokenFile, conf,
+          Credentials.SerializedFormat.WRITABLE);
 
       if (LOG.isDebugEnabled()) {
         LOG.debug("Fetched token " + fs.getUri() + " for " +