blob: b768bc54d81118f40edb0386410a70d3e2aafdc2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.coprocessor;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Region.RowLock;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.Sequence;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.types.PBoolean;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.KeyValueUtil;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.SequenceUtil;
import org.apache.phoenix.util.ServerUtil;
import com.google.common.collect.Lists;
/**
*
* Region observer coprocessor for sequence operations:
* 1) For creating a sequence, as checkAndPut does not allow us to scope the
* Get done for the check with a TimeRange.
* 2) For incrementing a sequence, as increment does not a) allow us to set the
* timestamp of the key value being incremented and b) recognize when the key
* value being incremented does not exist
* 3) For deleting a sequence, as checkAndDelete does not allow us to scope
* the Get done for the check with a TimeRange.
*
*
* @since 3.0.0
*/
public class SequenceRegionObserver extends BaseRegionObserver {
public static final String OPERATION_ATTRIB = "SEQUENCE_OPERATION";
public static final String MAX_TIMERANGE_ATTRIB = "MAX_TIMERANGE";
public static final String CURRENT_VALUE_ATTRIB = "CURRENT_VALUE";
public static final String NUM_TO_ALLOCATE = "NUM_TO_ALLOCATE";
private static final byte[] SUCCESS_VALUE = PInteger.INSTANCE.toBytes(Integer.valueOf(Sequence.SUCCESS));
private static Result getErrorResult(byte[] row, long timestamp, int errorCode) {
byte[] errorCodeBuf = new byte[PInteger.INSTANCE.getByteSize()];
PInteger.INSTANCE.getCodec().encodeInt(errorCode, errorCodeBuf, 0);
return Result.create(Collections.singletonList(
(Cell)KeyValueUtil.newKeyValue(row,
PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES,
QueryConstants.EMPTY_COLUMN_BYTES, timestamp, errorCodeBuf)));
}
private static void acquireLock(Region region, byte[] key, List<RowLock> locks)
throws IOException {
RowLock rowLock = region.getRowLock(key, false);
if (rowLock == null) {
throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key));
}
locks.add(rowLock);
}
/**
* Use PreIncrement hook of BaseRegionObserver to overcome deficiencies in Increment
* implementation (HBASE-10254):
* 1) Lack of recognition and identification of when the key value to increment doesn't exist
* 2) Lack of the ability to set the timestamp of the updated key value.
* Works the same as existing region.increment(), except assumes there is a single column to
* increment and uses Phoenix LONG encoding.
*
* @since 3.0.0
*/
@Override
public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> e,
final Increment increment) throws IOException {
RegionCoprocessorEnvironment env = e.getEnvironment();
// We need to set this to prevent region.increment from being called
e.bypass();
e.complete();
Region region = env.getRegion();
byte[] row = increment.getRow();
List<RowLock> locks = Lists.newArrayList();
TimeRange tr = increment.getTimeRange();
region.startRegionOperation();
try {
acquireLock(region, row, locks);
try {
long maxTimestamp = tr.getMax();
boolean validateOnly = true;
Get get = new Get(row);
get.setTimeRange(tr.getMin(), tr.getMax());
for (Map.Entry<byte[], List<Cell>> entry : increment.getFamilyCellMap().entrySet()) {
byte[] cf = entry.getKey();
for (Cell cq : entry.getValue()) {
long value = Bytes.toLong(cq.getValueArray(), cq.getValueOffset());
get.addColumn(cf, CellUtil.cloneQualifier(cq));
long cellTimestamp = cq.getTimestamp();
// Workaround HBASE-15698 by using the lowest of the timestamps found
// on the Increment or any of its Cells.
if (cellTimestamp > 0 && cellTimestamp < maxTimestamp) {
maxTimestamp = cellTimestamp;
get.setTimeRange(MetaDataProtocol.MIN_TABLE_TIMESTAMP, maxTimestamp);
}
validateOnly &= (Sequence.ValueOp.VALIDATE_SEQUENCE.ordinal() == value);
}
}
Result result = region.get(get);
if (result.isEmpty()) {
return getErrorResult(row, maxTimestamp, SQLExceptionCode.SEQUENCE_UNDEFINED.getErrorCode());
}
KeyValue currentValueKV = Sequence.getCurrentValueKV(result);
KeyValue incrementByKV = Sequence.getIncrementByKV(result);
KeyValue cacheSizeKV = Sequence.getCacheSizeKV(result);
long currentValue = PLong.INSTANCE.getCodec().decodeLong(currentValueKV.getValueArray(), currentValueKV.getValueOffset(), SortOrder.getDefault());
long incrementBy = PLong.INSTANCE.getCodec().decodeLong(incrementByKV.getValueArray(), incrementByKV.getValueOffset(), SortOrder.getDefault());
long cacheSize = PLong.INSTANCE.getCodec().decodeLong(cacheSizeKV.getValueArray(), cacheSizeKV.getValueOffset(), SortOrder.getDefault());
// Hold timestamp constant for sequences, so that clients always only see the latest
// value regardless of when they connect.
long timestamp = currentValueKV.getTimestamp();
Put put = new Put(row, timestamp);
int numIncrementKVs = increment.getFamilyCellMap().get(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES).size();
// creates the list of KeyValues used for the Result that will be returned
List<Cell> cells = Sequence.getCells(result, numIncrementKVs);
//if client is 3.0/4.0 preserve the old behavior (older clients won't have newer columns present in the increment)
if (numIncrementKVs != Sequence.NUM_SEQUENCE_KEY_VALUES) {
currentValue += incrementBy * cacheSize;
// Hold timestamp constant for sequences, so that clients always only see the latest value
// regardless of when they connect.
KeyValue newCurrentValueKV = createKeyValue(row, PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, currentValue, timestamp);
put.add(newCurrentValueKV);
Sequence.replaceCurrentValueKV(cells, newCurrentValueKV);
}
else {
KeyValue cycleKV = Sequence.getCycleKV(result);
KeyValue limitReachedKV = Sequence.getLimitReachedKV(result);
KeyValue minValueKV = Sequence.getMinValueKV(result);
KeyValue maxValueKV = Sequence.getMaxValueKV(result);
boolean increasingSeq = incrementBy > 0 ? true : false;
// if the minValue, maxValue, cycle and limitReached is null this sequence has been upgraded from
// a lower version. Set minValue, maxValue, cycle and limitReached to Long.MIN_VALUE, Long.MAX_VALUE, true and false
// respectively in order to maintain existing behavior and also update the KeyValues on the server
boolean limitReached;
if (limitReachedKV == null) {
limitReached = false;
KeyValue newLimitReachedKV = createKeyValue(row, PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG_BYTES, limitReached, timestamp);
put.add(newLimitReachedKV);
Sequence.replaceLimitReachedKV(cells, newLimitReachedKV);
}
else {
limitReached = (Boolean) PBoolean.INSTANCE.toObject(limitReachedKV.getValueArray(),
limitReachedKV.getValueOffset(), limitReachedKV.getValueLength());
}
long minValue;
if (minValueKV == null) {
minValue = Long.MIN_VALUE;
KeyValue newMinValueKV = createKeyValue(row, PhoenixDatabaseMetaData.MIN_VALUE_BYTES, minValue, timestamp);
put.add(newMinValueKV);
Sequence.replaceMinValueKV(cells, newMinValueKV);
}
else {
minValue = PLong.INSTANCE.getCodec().decodeLong(minValueKV.getValueArray(),
minValueKV.getValueOffset(), SortOrder.getDefault());
}
long maxValue;
if (maxValueKV == null) {
maxValue = Long.MAX_VALUE;
KeyValue newMaxValueKV = createKeyValue(row, PhoenixDatabaseMetaData.MAX_VALUE_BYTES, maxValue, timestamp);
put.add(newMaxValueKV);
Sequence.replaceMaxValueKV(cells, newMaxValueKV);
}
else {
maxValue = PLong.INSTANCE.getCodec().decodeLong(maxValueKV.getValueArray(),
maxValueKV.getValueOffset(), SortOrder.getDefault());
}
boolean cycle;
if (cycleKV == null) {
cycle = false;
KeyValue newCycleKV = createKeyValue(row, PhoenixDatabaseMetaData.CYCLE_FLAG_BYTES, cycle, timestamp);
put.add(newCycleKV);
Sequence.replaceCycleValueKV(cells, newCycleKV);
}
else {
cycle = (Boolean) PBoolean.INSTANCE.toObject(cycleKV.getValueArray(),
cycleKV.getValueOffset(), cycleKV.getValueLength());
}
long numSlotsToAllocate = calculateNumSlotsToAllocate(increment);
// We don't support Bulk Allocations on sequences that have the CYCLE flag set to true
if (cycle && !SequenceUtil.isCycleAllowed(numSlotsToAllocate)) {
return getErrorResult(row, maxTimestamp, SQLExceptionCode.NUM_SEQ_TO_ALLOCATE_NOT_SUPPORTED.getErrorCode());
}
// Bulk Allocations are expressed by NEXT <n> VALUES FOR
if (SequenceUtil.isBulkAllocation(numSlotsToAllocate)) {
if (SequenceUtil.checkIfLimitReached(currentValue, minValue, maxValue, incrementBy, cacheSize, numSlotsToAllocate)) {
// If we try to allocate more slots than the limit we return an error.
// Allocating sequence values in bulk should be an all or nothing operation.
// If the operation succeeds clients are guaranteed that they have reserved
// all the slots requested.
return getErrorResult(row, maxTimestamp, SequenceUtil.getLimitReachedErrorCode(increasingSeq).getErrorCode());
}
}
if (validateOnly) {
return result;
}
// return if we have run out of sequence values
if (limitReached) {
if (cycle) {
// reset currentValue of the Sequence row to minValue/maxValue
currentValue = increasingSeq ? minValue : maxValue;
}
else {
return getErrorResult(row, maxTimestamp, SequenceUtil.getLimitReachedErrorCode(increasingSeq).getErrorCode());
}
}
// check if the limit was reached
limitReached = SequenceUtil.checkIfLimitReached(currentValue, minValue, maxValue, incrementBy, cacheSize, numSlotsToAllocate);
// update currentValue
currentValue += incrementBy * (SequenceUtil.isBulkAllocation(numSlotsToAllocate) ? numSlotsToAllocate : cacheSize);
// update the currentValue of the Result row
KeyValue newCurrentValueKV = createKeyValue(row, PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, currentValue, timestamp);
Sequence.replaceCurrentValueKV(cells, newCurrentValueKV);
put.add(newCurrentValueKV);
// set the LIMIT_REACHED column to true, so that no new values can be used
KeyValue newLimitReachedKV = createKeyValue(row, PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG_BYTES, limitReached, timestamp);
put.add(newLimitReachedKV);
}
// update the KeyValues on the server
Mutation[] mutations = new Mutation[]{put};
region.batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
// return a Result with the updated KeyValues
return Result.create(cells);
} finally {
region.releaseRowLocks(locks);
}
} catch (Throwable t) {
ServerUtil.throwIOException("Increment of sequence " + Bytes.toStringBinary(row), t);
return null; // Impossible
} finally {
region.closeRegionOperation();
}
}
/**
* Creates a new KeyValue for a long value
*
* @param key
* key used while creating KeyValue
* @param cqBytes
* column qualifier of KeyValue
* @return return the KeyValue that was created
*/
KeyValue createKeyValue(byte[] key, byte[] cqBytes, long value, long timestamp) {
byte[] valueBuffer = new byte[PLong.INSTANCE.getByteSize()];
PLong.INSTANCE.getCodec().encodeLong(value, valueBuffer, 0);
return KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, cqBytes, timestamp, valueBuffer);
}
/**
* Creates a new KeyValue for a boolean value and adds it to the given put
*
* @param key
* key used while creating KeyValue
* @param cqBytes
* column qualifier of KeyValue
* @return return the KeyValue that was created
*/
private KeyValue createKeyValue(byte[] key, byte[] cqBytes, boolean value, long timestamp) throws IOException {
// create new key value for put
return KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, cqBytes,
timestamp, value ? PDataType.TRUE_BYTES : PDataType.FALSE_BYTES);
}
/**
* Override the preAppend for checkAndPut and checkAndDelete, as we need the ability to
* a) set the TimeRange for the Get being done and
* b) return something back to the client to indicate success/failure
*/
@SuppressWarnings("deprecation")
@Override
public Result preAppend(final ObserverContext<RegionCoprocessorEnvironment> e,
final Append append) throws IOException {
byte[] opBuf = append.getAttribute(OPERATION_ATTRIB);
if (opBuf == null) {
return null;
}
Sequence.MetaOp op = Sequence.MetaOp.values()[opBuf[0]];
Cell keyValue = append.getFamilyCellMap().values().iterator().next().iterator().next();
long clientTimestamp = HConstants.LATEST_TIMESTAMP;
long minGetTimestamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP;
long maxGetTimestamp = HConstants.LATEST_TIMESTAMP;
boolean hadClientTimestamp;
byte[] clientTimestampBuf = null;
if (op == Sequence.MetaOp.RETURN_SEQUENCE) {
// When returning sequences, this allows us to send the expected timestamp
// of the sequence to make sure we don't reset any other sequence
hadClientTimestamp = true;
clientTimestamp = minGetTimestamp = keyValue.getTimestamp();
maxGetTimestamp = minGetTimestamp + 1;
} else {
clientTimestampBuf = append.getAttribute(MAX_TIMERANGE_ATTRIB);
if (clientTimestampBuf != null) {
clientTimestamp = maxGetTimestamp = Bytes.toLong(clientTimestampBuf);
}
hadClientTimestamp = (clientTimestamp != HConstants.LATEST_TIMESTAMP);
if (hadClientTimestamp) {
// Prevent race condition of creating two sequences at the same timestamp
// by looking for a sequence at or after the timestamp at which it'll be
// created.
if (op == Sequence.MetaOp.CREATE_SEQUENCE) {
maxGetTimestamp = clientTimestamp + 1;
}
} else {
clientTimestamp = EnvironmentEdgeManager.currentTimeMillis();
maxGetTimestamp = clientTimestamp + 1;
clientTimestampBuf = Bytes.toBytes(clientTimestamp);
}
}
RegionCoprocessorEnvironment env = e.getEnvironment();
// We need to set this to prevent region.append from being called
e.bypass();
e.complete();
Region region = env.getRegion();
byte[] row = append.getRow();
List<RowLock> locks = Lists.newArrayList();
region.startRegionOperation();
try {
acquireLock(region, row, locks);
try {
byte[] family = CellUtil.cloneFamily(keyValue);
byte[] qualifier = CellUtil.cloneQualifier(keyValue);
Get get = new Get(row);
get.setTimeRange(minGetTimestamp, maxGetTimestamp);
get.addColumn(family, qualifier);
Result result = region.get(get);
if (result.isEmpty()) {
if (op == Sequence.MetaOp.DROP_SEQUENCE || op == Sequence.MetaOp.RETURN_SEQUENCE) {
return getErrorResult(row, clientTimestamp, SQLExceptionCode.SEQUENCE_UNDEFINED.getErrorCode());
}
} else {
if (op == Sequence.MetaOp.CREATE_SEQUENCE) {
return getErrorResult(row, clientTimestamp, SQLExceptionCode.SEQUENCE_ALREADY_EXIST.getErrorCode());
}
}
Mutation m = null;
switch (op) {
case RETURN_SEQUENCE:
KeyValue currentValueKV = result.raw()[0];
long expectedValue = PLong.INSTANCE.getCodec().decodeLong(append.getAttribute(CURRENT_VALUE_ATTRIB), 0, SortOrder.getDefault());
long value = PLong.INSTANCE.getCodec().decodeLong(currentValueKV.getValueArray(),
currentValueKV.getValueOffset(), SortOrder.getDefault());
// Timestamp should match exactly, or we may have the wrong sequence
if (expectedValue != value || currentValueKV.getTimestamp() != clientTimestamp) {
return Result.create(Collections.singletonList(
(Cell)KeyValueUtil.newKeyValue(row, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES,
QueryConstants.EMPTY_COLUMN_BYTES, currentValueKV.getTimestamp(), ByteUtil.EMPTY_BYTE_ARRAY)));
}
m = new Put(row, currentValueKV.getTimestamp());
m.getFamilyCellMap().putAll(append.getFamilyCellMap());
break;
case DROP_SEQUENCE:
m = new Delete(row, clientTimestamp);
break;
case CREATE_SEQUENCE:
m = new Put(row, clientTimestamp);
m.getFamilyCellMap().putAll(append.getFamilyCellMap());
break;
}
if (!hadClientTimestamp) {
for (List<Cell> kvs : m.getFamilyCellMap().values()) {
for (Cell kv : kvs) {
((KeyValue)kv).updateLatestStamp(clientTimestampBuf);
}
}
}
Mutation[] mutations = new Mutation[]{m};
region.batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
long serverTimestamp = MetaDataUtil.getClientTimeStamp(m);
// Return result with single KeyValue. The only piece of information
// the client cares about is the timestamp, which is the timestamp of
// when the mutation was actually performed (useful in the case of .
return Result.create(Collections.singletonList(
(Cell)KeyValueUtil.newKeyValue(row, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, serverTimestamp, SUCCESS_VALUE)));
} finally {
region.releaseRowLocks(locks);
}
} catch (Throwable t) {
ServerUtil.throwIOException("Increment of sequence " + Bytes.toStringBinary(row), t);
return null; // Impossible
} finally {
region.closeRegionOperation();
}
}
/**
* Determines whether a request for incrementing the sequence was a bulk allocation and if so
* what the number of slots to allocate is. This is triggered by the NEXT <n> VALUES FOR expression.
* For backwards compatibility with older clients, we default the value to 1 which preserves
* existing behavior when invoking NEXT VALUE FOR.
*/
private long calculateNumSlotsToAllocate(final Increment increment) {
long numToAllocate = 1;
byte[] numToAllocateBytes = increment.getAttribute(SequenceRegionObserver.NUM_TO_ALLOCATE);
if (numToAllocateBytes != null) {
numToAllocate = Bytes.toLong(numToAllocateBytes);
}
return numToAllocate;
}
}