PHOENIX-2477 ClassCastException in IndexedWALEditCodec after HBASE-14501 (possible dataloss)
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java
index c29f77d..2534b34 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java
@@ -19,7 +19,9 @@
 package org.apache.hadoop.hbase.regionserver.wal;
 
 import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
@@ -80,9 +82,35 @@
   }
 
   /**
+   * Returns a DataInput given an InputStream
+   */
+  private static DataInput getDataInput(InputStream is) {
+    return is instanceof DataInput
+        ? (DataInput) is
+        : new DataInputStream(is);
+  }
+
+  /**
+   * Returns a DataOutput given an OutputStream
+   */
+  private static DataOutput getDataOutput(OutputStream os) {
+    return os instanceof DataOutput
+        ? (DataOutput) os
+        : new DataOutputStream(os);
+  }
+
+  private static abstract class PhoenixBaseDecoder extends BaseDecoder {
+    protected DataInput dataInput;
+    public PhoenixBaseDecoder(InputStream in) {
+      super(in);
+      dataInput = getDataInput(this.in);
+    }
+  }
+
+  /**
    * Custom Decoder that can handle a stream of regular and indexed {@link KeyValue}s.
    */
-  public class IndexKeyValueDecoder extends BaseDecoder {
+  public static class IndexKeyValueDecoder extends PhoenixBaseDecoder {
 
     /**
      * Create a Decoder on the given input stream with the given Decoder to parse
@@ -95,11 +123,11 @@
 
     @Override
     protected KeyValue parseCell() throws IOException{
-      return KeyValueCodec.readKeyValue((DataInput) this.in);
+      return KeyValueCodec.readKeyValue(this.dataInput);
     }
   }
 
-  public class CompressedIndexKeyValueDecoder extends BaseDecoder {
+  public static class CompressedIndexKeyValueDecoder extends PhoenixBaseDecoder {
 
     private Decoder decoder;
 
@@ -133,7 +161,15 @@
       }
 
       // its an indexedKeyValue, so parse it out specially
-      return KeyValueCodec.readKeyValue((DataInput) this.in);
+      return KeyValueCodec.readKeyValue(this.dataInput);
+    }
+  }
+
+  private static abstract class PhoenixBaseEncoder extends BaseEncoder {
+    protected DataOutput dataOutput;
+    public PhoenixBaseEncoder(OutputStream out) {
+      super(out);
+      dataOutput = getDataOutput(this.out);
     }
   }
 
@@ -141,7 +177,7 @@
    * Encode {@link IndexedKeyValue}s via the {@link KeyValueCodec}. Does <b>not</b> support
    * compression.
    */
-  private static class IndexKeyValueEncoder extends BaseEncoder {
+  private static class IndexKeyValueEncoder extends PhoenixBaseEncoder {
     public IndexKeyValueEncoder(OutputStream os) {
       super(os);
     }
@@ -157,7 +193,7 @@
       checkFlushed();
 
       // use the standard encoding mechanism
-      KeyValueCodec.write((DataOutput) this.out, KeyValueUtil.ensureKeyValue(cell));
+      KeyValueCodec.write(this.dataOutput, KeyValueUtil.ensureKeyValue(cell));
     }
   }
 
@@ -166,7 +202,7 @@
    * <b>not</b> compatible with the {@link IndexKeyValueDecoder} - one cannot intermingle compressed
    * and uncompressed WALs that contain index entries.
    */
-  private static class CompressedIndexKeyValueEncoder extends BaseEncoder {
+  private static class CompressedIndexKeyValueEncoder extends PhoenixBaseEncoder {
     private Encoder compressedKvEncoder;
 
     public CompressedIndexKeyValueEncoder(OutputStream os, Encoder compressedKvEncoder) {
@@ -184,20 +220,20 @@
     public void write(Cell cell) throws IOException {
       //make sure we are open
       checkFlushed();
-      
+
       //write the special marker so we can figure out which kind of kv is it
       int marker = IndexedWALEditCodec.REGULAR_KEY_VALUE_MARKER;
       if (cell instanceof IndexedKeyValue) {
         marker = KeyValueCodec.INDEX_TYPE_LENGTH_MARKER;
       }
       out.write(marker);
-      
+
       //then serialize based on the marker
       if (marker == IndexedWALEditCodec.REGULAR_KEY_VALUE_MARKER) {
         this.compressedKvEncoder.write(cell);
       }
       else{
-        KeyValueCodec.write((DataOutput) out, KeyValueUtil.ensureKeyValue(cell));
+        KeyValueCodec.write(this.dataOutput, KeyValueUtil.ensureKeyValue(cell));
       }
     }
   }