| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.hbase.regionserver.wal; |
| |
| import java.io.DataInput; |
| import java.io.DataInputStream; |
| import java.io.DataOutput; |
| import java.io.DataOutputStream; |
| import java.io.EOFException; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hbase.Cell; |
| import org.apache.hadoop.hbase.KeyValue; |
| import org.apache.hadoop.hbase.KeyValueUtil; |
| import org.apache.hadoop.hbase.codec.BaseDecoder; |
| import org.apache.hadoop.hbase.codec.BaseEncoder; |
| import org.apache.phoenix.hbase.index.wal.IndexedKeyValue; |
| import org.apache.phoenix.hbase.index.wal.KeyValueCodec; |
| |
| |
| /** |
| * Support custom indexing {@link KeyValue}s when written to the WAL. |
| * <p> |
| * Currently, we don't support reading older WAL files - only new WAL files. Therefore, this should |
| * not be installed on a running cluster, but rather one that has been cleanly shutdown and requires |
| * no WAL replay on startup. |
| */ |
| public class IndexedWALEditCodec extends WALCellCodec { |
| |
| // can't have negative values because reading off a stream returns a negative if its the end of |
| // the stream |
| private static final int REGULAR_KEY_VALUE_MARKER = 0; |
| private CompressionContext compression; |
| |
| public IndexedWALEditCodec(Configuration conf, CompressionContext compression) { |
| super(conf, compression); |
| this.compression = compression; |
| } |
| |
| @Override |
| public Decoder getDecoder(InputStream is) { |
| // compression isn't enabled |
| if (this.compression == null) { |
| return new IndexKeyValueDecoder(is); |
| } |
| |
| // there is compression, so we get the standard decoder to handle reading those kvs |
| Decoder decoder = super.getDecoder(is); |
| // compression is on, reqturn our custom decoder |
| return new CompressedIndexKeyValueDecoder(is, decoder); |
| } |
| |
| @Override |
| public Encoder getEncoder(OutputStream os) { |
| // compression isn't on, do the default thing |
| if (this.compression == null) { |
| return new IndexKeyValueEncoder(os); |
| } |
| |
| // compression is on, return our one that will handle putting in the correct markers |
| Encoder encoder = super.getEncoder(os); |
| return new CompressedIndexKeyValueEncoder(os, encoder); |
| } |
| |
| /** |
| * Returns a DataInput given an InputStream |
| */ |
| private static DataInput getDataInput(InputStream is) { |
| return is instanceof DataInput |
| ? (DataInput) is |
| : new DataInputStream(is); |
| } |
| |
| /** |
| * Returns a DataOutput given an OutputStream |
| */ |
| private static DataOutput getDataOutput(OutputStream os) { |
| return os instanceof DataOutput |
| ? (DataOutput) os |
| : new DataOutputStream(os); |
| } |
| |
| private static abstract class PhoenixBaseDecoder extends BaseDecoder { |
| protected DataInput dataInput; |
| public PhoenixBaseDecoder(InputStream in) { |
| super(in); |
| dataInput = getDataInput(this.in); |
| } |
| } |
| |
| /** |
| * Custom Decoder that can handle a stream of regular and indexed {@link KeyValue}s. |
| */ |
| public static class IndexKeyValueDecoder extends PhoenixBaseDecoder { |
| |
| /** |
| * Create a Decoder on the given input stream with the given Decoder to parse |
| * generic {@link KeyValue}s. |
| * @param is stream to read from |
| */ |
| public IndexKeyValueDecoder(InputStream is){ |
| super(is); |
| } |
| |
| @Override |
| protected KeyValue parseCell() throws IOException{ |
| return KeyValueCodec.readKeyValue(this.dataInput); |
| } |
| } |
| |
| public static class CompressedIndexKeyValueDecoder extends PhoenixBaseDecoder { |
| |
| private Decoder decoder; |
| |
| /** |
| * Create a Decoder on the given input stream with the given Decoder to parse |
| * generic {@link KeyValue}s. |
| * @param is stream to read from |
| * @param compressedDecoder decoder for generic {@link KeyValue}s. Should support the expected |
| * compression. |
| */ |
| public CompressedIndexKeyValueDecoder(InputStream is, Decoder compressedDecoder) { |
| super(is); |
| this.decoder = compressedDecoder; |
| } |
| |
| @Override |
| protected Cell parseCell() throws IOException { |
| // reader the marker |
| int marker = this.in.read(); |
| if (marker < 0) { |
| throw new EOFException( |
| "Unexepcted end of stream found while reading next (Indexed) KeyValue"); |
| } |
| |
| // do the normal thing, if its a regular kv |
| if (marker == REGULAR_KEY_VALUE_MARKER) { |
| if (!this.decoder.advance()) { |
| throw new IOException("Could not read next key-value from generic KeyValue Decoder!"); |
| } |
| return this.decoder.current(); |
| } |
| |
| // its an indexedKeyValue, so parse it out specially |
| return KeyValueCodec.readKeyValue(this.dataInput); |
| } |
| } |
| |
| private static abstract class PhoenixBaseEncoder extends BaseEncoder { |
| protected DataOutput dataOutput; |
| public PhoenixBaseEncoder(OutputStream out) { |
| super(out); |
| dataOutput = getDataOutput(this.out); |
| } |
| } |
| |
| /** |
| * Encode {@link IndexedKeyValue}s via the {@link KeyValueCodec}. Does <b>not</b> support |
| * compression. |
| */ |
| private static class IndexKeyValueEncoder extends PhoenixBaseEncoder { |
| public IndexKeyValueEncoder(OutputStream os) { |
| super(os); |
| } |
| |
| @Override |
| public void flush() throws IOException { |
| super.flush(); |
| } |
| |
| @Override |
| public void write(Cell cell) throws IOException { |
| // make sure we are open |
| checkFlushed(); |
| |
| // use the standard encoding mechanism |
| KeyValueCodec.write(this.dataOutput, KeyValueUtil.ensureKeyValue(cell)); |
| } |
| } |
| |
| /** |
| * Write {@link IndexedKeyValue}s along side compressed {@link KeyValue}s. This Encoder is |
| * <b>not</b> compatible with the {@link IndexKeyValueDecoder} - one cannot intermingle compressed |
| * and uncompressed WALs that contain index entries. |
| */ |
| private static class CompressedIndexKeyValueEncoder extends PhoenixBaseEncoder { |
| private Encoder compressedKvEncoder; |
| |
| public CompressedIndexKeyValueEncoder(OutputStream os, Encoder compressedKvEncoder) { |
| super(os); |
| this.compressedKvEncoder = compressedKvEncoder; |
| } |
| |
| @Override |
| public void flush() throws IOException { |
| this.compressedKvEncoder.flush(); |
| super.flush(); |
| } |
| |
| @Override |
| public void write(Cell cell) throws IOException { |
| //make sure we are open |
| checkFlushed(); |
| |
| //write the special marker so we can figure out which kind of kv is it |
| int marker = IndexedWALEditCodec.REGULAR_KEY_VALUE_MARKER; |
| if (cell instanceof IndexedKeyValue) { |
| marker = KeyValueCodec.INDEX_TYPE_LENGTH_MARKER; |
| } |
| out.write(marker); |
| |
| //then serialize based on the marker |
| if (marker == IndexedWALEditCodec.REGULAR_KEY_VALUE_MARKER) { |
| this.compressedKvEncoder.write(cell); |
| } |
| else{ |
| KeyValueCodec.write(this.dataOutput, KeyValueUtil.ensureKeyValue(cell)); |
| } |
| } |
| } |
| } |