blob: f9f5b4a4a5330d9eaa08a8d61664b59f234752ef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.coprocessor;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilder;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.phoenix.coprocessor.generated.CDCInfoProtos;
import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.SingleCellColumnExpression;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.CDCTableInfo;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.schema.tuple.ResultTuple;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.util.CDCChangeBuilder;
import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.IndexUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.sql.Types;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_DATA_TABLE_DEF;
import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
public class CDCGlobalIndexRegionScanner extends UncoveredGlobalIndexRegionScanner {
private static final Logger LOGGER =
LoggerFactory.getLogger(CDCGlobalIndexRegionScanner.class);
private CDCTableInfo cdcDataTableInfo;
private CDCChangeBuilder changeBuilder;
public CDCGlobalIndexRegionScanner(final RegionScanner innerScanner,
final Region region,
final Scan scan,
final RegionCoprocessorEnvironment env,
final Scan dataTableScan,
final TupleProjector tupleProjector,
final IndexMaintainer indexMaintainer,
final byte[][] viewConstants,
final ImmutableBytesWritable ptr,
final long pageSizeMs,
final long queryLimit) throws IOException {
super(innerScanner, region, scan, env, dataTableScan, tupleProjector, indexMaintainer,
viewConstants, ptr, pageSizeMs, queryLimit);
CDCUtil.initForRawScan(dataTableScan);
cdcDataTableInfo = CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef
.parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF)));
changeBuilder = new CDCChangeBuilder(cdcDataTableInfo);
}
@Override
protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws IOException {
//TODO: Get Timerange from the start row and end row of the index scan object
// and set it in the datatable scan object.
// if (scan.getStartRow().length == 8) {
// startTimeRange = PLong.INSTANCE.getCodec().decodeLong(
// scan.getStartRow(), 0, SortOrder.getDefault());
// }
// if (scan.getStopRow().length == 8) {
// stopTimeRange = PLong.INSTANCE.getCodec().decodeLong(
// scan.getStopRow(), 0, SortOrder.getDefault());
// }
return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true));
}
protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException {
if (indexRowIterator.hasNext()) {
List<Cell> indexRow = indexRowIterator.next();
// firstCell: Picking the earliest cell in the index row so that
// timestamp of the cell and the row will be same.
Cell firstIndexCell = indexRow.get(indexRow.size() - 1);
byte[] indexRowKey = ImmutableBytesPtr.cloneCellRowIfNecessary(firstIndexCell);
ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
indexToDataRowKeyMap.get(indexRowKey));
Result dataRow = dataRows.get(dataRowKey);
Long changeTS = firstIndexCell.getTimestamp();
TupleProjector dataTableProjector = cdcDataTableInfo.getDataTableProjector();
Expression[] expressions = dataTableProjector != null ?
dataTableProjector.getExpressions() : null;
boolean isSingleCell = dataTableProjector != null;
byte[] emptyCQ = EncodedColumnsUtil.getEmptyKeyValueInfo(
cdcDataTableInfo.getQualifierEncodingScheme()).getFirst();
changeBuilder.initChange(changeTS);
try {
if (dataRow != null) {
int curColumnNum = 0;
List<CDCTableInfo.CDCColumnInfo> cdcColumnInfoList =
this.cdcDataTableInfo.getColumnInfoList();
cellLoop:
for (Cell cell : dataRow.rawCells()) {
if (! changeBuilder.isChangeRelevant(cell)) {
continue;
}
byte[] cellFam = ImmutableBytesPtr.cloneCellFamilyIfNecessary(cell);
byte[] cellQual = ImmutableBytesPtr.cloneCellQualifierIfNecessary(cell);
if (cell.getType() == Cell.Type.DeleteFamily) {
if (changeTS == cell.getTimestamp()) {
changeBuilder.markAsDeletionEvent();
} else if (changeTS > cell.getTimestamp()
&& changeBuilder.getLastDeletedTimestamp() == 0L) {
// Cells with timestamp less than the lowerBoundTsForPreImage
// can not be part of the PreImage as there is a Delete Family
// marker after that.
changeBuilder.setLastDeletedTimestamp(cell.getTimestamp());
}
} else if ((cell.getType() == Cell.Type.DeleteColumn
|| cell.getType() == Cell.Type.Put)
&& !Arrays.equals(cellQual, emptyCQ)) {
if (! changeBuilder.isChangeRelevant(cell)) {
// We don't need to build the change image, just skip it.
continue;
}
// In this case, cell is the row, meaning we loop over rows..
if (isSingleCell) {
while (curColumnNum < cdcColumnInfoList.size()) {
boolean hasValue = dataTableProjector.getSchema().extractValue(
cell, (SingleCellColumnExpression)
expressions[curColumnNum], ptr);
if (hasValue) {
Object cellValue = getColumnValue(ptr.get(),
ptr.getOffset(), ptr.getLength(),
cdcColumnInfoList.get(curColumnNum).getColumnType());
changeBuilder.registerChange(cell, curColumnNum, cellValue);
}
++curColumnNum;
}
break cellLoop;
}
while (true) {
CDCTableInfo.CDCColumnInfo currentColumnInfo =
cdcColumnInfoList.get(curColumnNum);
int columnComparisonResult = CDCUtil.compareCellFamilyAndQualifier(
cellFam, cellQual,
currentColumnInfo.getColumnFamily(),
currentColumnInfo.getColumnQualifier());
if (columnComparisonResult > 0) {
if (++curColumnNum >= cdcColumnInfoList.size()) {
// Have no more column definitions, so the rest of the cells
// must be for dropped columns and so can be ignored.
break cellLoop;
}
// Continue looking for the right column definition
// for this cell.
continue;
} else if (columnComparisonResult < 0) {
// We didn't find a column definition for this cell, ignore the
// current cell but continue working on the rest of the cells.
continue cellLoop;
}
// else, found the column definition.
Object cellValue = cell.getType() == Cell.Type.DeleteColumn ? null
: getColumnValue(cell, cdcColumnInfoList.get(curColumnNum)
.getColumnType());
changeBuilder.registerChange(cell, curColumnNum, cellValue);
// Done processing the current cell, check the next cell.
break;
}
}
}
if (changeBuilder.isNonEmptyEvent()) {
Result cdcRow = getCDCImage(indexRowKey, firstIndexCell);
if (cdcRow != null && tupleProjector != null) {
if (firstIndexCell.getType() == Cell.Type.DeleteFamily) {
// result is of type EncodedColumnQualiferCellsList for queries with
// Order by clause. It fails when Delete Family cell is added to it
// as it expects column qualifier bytes which is not available.
// Adding empty PUT cell as a placeholder.
result.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
.setRow(indexRowKey)
.setFamily(ImmutableBytesPtr.cloneCellFamilyIfNecessary(
firstIndexCell))
.setQualifier(indexMaintainer.getEmptyKeyValueQualifier())
.setTimestamp(firstIndexCell.getTimestamp())
.setType(Cell.Type.Put)
.setValue(EMPTY_BYTE_ARRAY).build());
} else {
result.add(firstIndexCell);
}
IndexUtil.addTupleAsOneCell(result, new ResultTuple(cdcRow),
tupleProjector, ptr);
} else {
result.clear();
}
} else {
result.clear();
}
} else {
result.clear();
}
return true;
} catch (Throwable e) {
LOGGER.error("Exception in UncoveredIndexRegionScanner for region "
+ region.getRegionInfo().getRegionNameAsString(), e);
throw e;
}
}
return false;
}
private Result getCDCImage(byte[] indexRowKey, Cell firstCell) {
Gson gson = new GsonBuilder().serializeNulls().create();
byte[] value = gson.toJson(changeBuilder.buildCDCEvent()).getBytes(StandardCharsets.UTF_8);
CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
Result cdcRow = Result.create(Arrays.asList(builder
.setRow(indexRowKey)
.setFamily(ImmutableBytesPtr.cloneCellFamilyIfNecessary(firstCell))
.setQualifier(cdcDataTableInfo.getCdcJsonColQualBytes())
.setTimestamp(changeBuilder.getChangeTimestamp())
.setValue(value)
.setType(Cell.Type.Put)
.build()));
return cdcRow;
}
private Object getColumnValue(Cell cell, PDataType dataType) {
return getColumnValue(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
dataType);
}
private Object getColumnValue(byte[] cellValue, int offset, int length, PDataType dataType) {
if (dataType.getSqlType() == Types.BINARY) {
// Unfortunately, Base64.Encoder has no option to specify offset and length so can't
// avoid copying bytes.
return Base64.getEncoder().encodeToString(
ImmutableBytesPtr.copyBytesIfNecessary(cellValue, offset, length));
} else {
Object value = dataType.toObject(cellValue, offset, length);
if (dataType.getSqlType() == Types.DATE
|| dataType.getSqlType() == Types.TIMESTAMP
|| dataType.getSqlType() == Types.TIME
|| dataType.getSqlType() == Types.TIME_WITH_TIMEZONE
|| dataType.getSqlType() == Types.TIMESTAMP_WITH_TIMEZONE) {
value = value.toString();
}
return value;
}
}
}