PHOENIX-2477 ClassCastException in IndexedWALEditCodec after HBASE-14501 (possible dataloss)
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java
index c29f77d..2534b34 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java
@@ -19,7 +19,9 @@
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.DataInput;
+import java.io.DataInputStream;
import java.io.DataOutput;
+import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
@@ -80,9 +82,35 @@
}
/**
+ * Returns a DataInput given an InputStream
+ */
+ private static DataInput getDataInput(InputStream is) {
+ return is instanceof DataInput
+ ? (DataInput) is
+ : new DataInputStream(is);
+ }
+
+ /**
+ * Returns a DataOutput given an OutputStream
+ */
+ private static DataOutput getDataOutput(OutputStream os) {
+ return os instanceof DataOutput
+ ? (DataOutput) os
+ : new DataOutputStream(os);
+ }
+
+ private static abstract class PhoenixBaseDecoder extends BaseDecoder {
+ protected DataInput dataInput;
+ public PhoenixBaseDecoder(InputStream in) {
+ super(in);
+ dataInput = getDataInput(this.in);
+ }
+ }
+
+ /**
* Custom Decoder that can handle a stream of regular and indexed {@link KeyValue}s.
*/
- public class IndexKeyValueDecoder extends BaseDecoder {
+ public static class IndexKeyValueDecoder extends PhoenixBaseDecoder {
/**
* Create a Decoder on the given input stream with the given Decoder to parse
@@ -95,11 +123,11 @@
@Override
protected KeyValue parseCell() throws IOException{
- return KeyValueCodec.readKeyValue((DataInput) this.in);
+ return KeyValueCodec.readKeyValue(this.dataInput);
}
}
- public class CompressedIndexKeyValueDecoder extends BaseDecoder {
+ public static class CompressedIndexKeyValueDecoder extends PhoenixBaseDecoder {
private Decoder decoder;
@@ -133,7 +161,15 @@
}
// its an indexedKeyValue, so parse it out specially
- return KeyValueCodec.readKeyValue((DataInput) this.in);
+ return KeyValueCodec.readKeyValue(this.dataInput);
+ }
+ }
+
+ private static abstract class PhoenixBaseEncoder extends BaseEncoder {
+ protected DataOutput dataOutput;
+ public PhoenixBaseEncoder(OutputStream out) {
+ super(out);
+ dataOutput = getDataOutput(this.out);
}
}
@@ -141,7 +177,7 @@
* Encode {@link IndexedKeyValue}s via the {@link KeyValueCodec}. Does <b>not</b> support
* compression.
*/
- private static class IndexKeyValueEncoder extends BaseEncoder {
+ private static class IndexKeyValueEncoder extends PhoenixBaseEncoder {
public IndexKeyValueEncoder(OutputStream os) {
super(os);
}
@@ -157,7 +193,7 @@
checkFlushed();
// use the standard encoding mechanism
- KeyValueCodec.write((DataOutput) this.out, KeyValueUtil.ensureKeyValue(cell));
+ KeyValueCodec.write(this.dataOutput, KeyValueUtil.ensureKeyValue(cell));
}
}
@@ -166,7 +202,7 @@
* <b>not</b> compatible with the {@link IndexKeyValueDecoder} - one cannot intermingle compressed
* and uncompressed WALs that contain index entries.
*/
- private static class CompressedIndexKeyValueEncoder extends BaseEncoder {
+ private static class CompressedIndexKeyValueEncoder extends PhoenixBaseEncoder {
private Encoder compressedKvEncoder;
public CompressedIndexKeyValueEncoder(OutputStream os, Encoder compressedKvEncoder) {
@@ -184,20 +220,20 @@
public void write(Cell cell) throws IOException {
//make sure we are open
checkFlushed();
-
+
//write the special marker so we can figure out which kind of kv is it
int marker = IndexedWALEditCodec.REGULAR_KEY_VALUE_MARKER;
if (cell instanceof IndexedKeyValue) {
marker = KeyValueCodec.INDEX_TYPE_LENGTH_MARKER;
}
out.write(marker);
-
+
//then serialize based on the marker
if (marker == IndexedWALEditCodec.REGULAR_KEY_VALUE_MARKER) {
this.compressedKvEncoder.write(cell);
}
else{
- KeyValueCodec.write((DataOutput) out, KeyValueUtil.ensureKeyValue(cell));
+ KeyValueCodec.write(this.dataOutput, KeyValueUtil.ensureKeyValue(cell));
}
}
}