blob: b7153a3bd69afd872cb243e6cf4688d1b83cfb74 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.index;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.filter.SkipScanFilter;
import org.apache.phoenix.hbase.index.MultiMutation;
import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.hbase.index.covered.IndexUpdate;
import org.apache.phoenix.hbase.index.covered.TableState;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.write.IndexWriter;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.trace.TracingUtils;
import org.apache.phoenix.trace.util.NullSpan;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.TransactionUtil;
import org.apache.tephra.Transaction;
import org.apache.tephra.Transaction.VisibilityLevel;
import org.apache.tephra.TxConstants;
import org.apache.tephra.hbase.TransactionAwareHTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.primitives.Longs;
/**
* Do all the work of managing index updates for a transactional table from a single coprocessor. Since the transaction
* manager essentially time orders writes through conflict detection, the logic to maintain a secondary index is quite a
* bit simpler than the non transactional case. For example, there's no need to muck with the WAL, as failure scenarios
* are handled by aborting the transaction.
*/
public class PhoenixTransactionalIndexer extends BaseRegionObserver {
private static final Log LOG = LogFactory.getLog(PhoenixTransactionalIndexer.class);
private PhoenixIndexCodec codec;
private IndexWriter writer;
private boolean stopped;
@Override
public void start(CoprocessorEnvironment e) throws IOException {
final RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment)e;
String serverName = env.getRegionServerServices().getServerName().getServerName();
codec = new PhoenixIndexCodec();
codec.initialize(env);
// setup the actual index writer
this.writer = new IndexWriter(env, serverName + "-tx-index-writer");
}
@Override
public void stop(CoprocessorEnvironment e) throws IOException {
if (this.stopped) { return; }
this.stopped = true;
String msg = "TxIndexer is being stopped";
this.writer.stop(msg);
}
private static Iterator<Mutation> getMutationIterator(final MiniBatchOperationInProgress<Mutation> miniBatchOp) {
return new Iterator<Mutation>() {
private int i = 0;
@Override
public boolean hasNext() {
return i < miniBatchOp.size();
}
@Override
public Mutation next() {
return miniBatchOp.getOperation(i++);
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
Mutation m = miniBatchOp.getOperation(0);
if (!codec.isEnabled(m)) {
super.preBatchMutate(c, miniBatchOp);
return;
}
Map<String,byte[]> updateAttributes = m.getAttributesMap();
PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(c.getEnvironment(),updateAttributes);
byte[] txRollbackAttribute = m.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY);
Collection<Pair<Mutation, byte[]>> indexUpdates = null;
// get the current span, or just use a null-span to avoid a bunch of if statements
try (TraceScope scope = Trace.startSpan("Starting to build index updates")) {
Span current = scope.getSpan();
if (current == null) {
current = NullSpan.INSTANCE;
}
// get the index updates for all elements in this batch
indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, getMutationIterator(miniBatchOp), txRollbackAttribute);
current.addTimelineAnnotation("Built index updates, doing preStep");
TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
// no index updates, so we are done
if (!indexUpdates.isEmpty()) {
this.writer.write(indexUpdates, true);
}
} catch (Throwable t) {
String msg = "Failed to update index with entries:" + indexUpdates;
LOG.error(msg, t);
ServerUtil.throwIOException(msg, t);
}
}
public static void addMutation(Map<ImmutableBytesPtr, MultiMutation> mutations, ImmutableBytesPtr row, Mutation m) {
MultiMutation stored = mutations.get(row);
// we haven't seen this row before, so add it
if (stored == null) {
stored = new MultiMutation(row);
mutations.put(row, stored);
}
stored.addAll(m);
}
private Collection<Pair<Mutation, byte[]>> getIndexUpdates(RegionCoprocessorEnvironment env, PhoenixIndexMetaData indexMetaData, Iterator<Mutation> mutationIterator, byte[] txRollbackAttribute) throws IOException {
Transaction tx = indexMetaData.getTransaction();
if (tx == null) {
throw new NullPointerException("Expected to find transaction in metadata for " + env.getRegionInfo().getTable().getNameAsString());
}
boolean isRollback = txRollbackAttribute!=null;
boolean isImmutable = indexMetaData.isImmutableRows();
ResultScanner currentScanner = null;
TransactionAwareHTable txTable = null;
// Collect up all mutations in batch
Map<ImmutableBytesPtr, MultiMutation> mutations =
new HashMap<ImmutableBytesPtr, MultiMutation>();
Map<ImmutableBytesPtr, MultiMutation> findPriorValueMutations;
if (isImmutable && !isRollback) {
findPriorValueMutations = new HashMap<ImmutableBytesPtr, MultiMutation>();
} else {
findPriorValueMutations = mutations;
}
while(mutationIterator.hasNext()) {
Mutation m = mutationIterator.next();
// add the mutation to the batch set
ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
if (mutations != findPriorValueMutations && isDeleteMutation(m)) {
addMutation(findPriorValueMutations, row, m);
}
addMutation(mutations, row, m);
}
// Collect the set of mutable ColumnReferences so that we can first
// run a scan to get the current state. We'll need this to delete
// the existing index rows.
List<IndexMaintainer> indexMaintainers = indexMetaData.getIndexMaintainers();
int estimatedSize = indexMaintainers.size() * 10;
Set<ColumnReference> mutableColumns = Sets.newHashSetWithExpectedSize(estimatedSize);
for (IndexMaintainer indexMaintainer : indexMaintainers) {
// For transactional tables, we use an index maintainer
// to aid in rollback if there's a KeyValue column in the index. The alternative would be
// to hold on to all uncommitted index row keys (even ones already sent to HBase) on the
// client side.
Set<ColumnReference> allColumns = indexMaintainer.getAllColumns();
mutableColumns.addAll(allColumns);
}
Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>(mutations.size() * 2 * indexMaintainers.size());
try {
// Track if we have row keys with Delete mutations (or Puts that are
// Tephra's Delete marker). If there are none, we don't need to do the scan for
// prior versions, if there are, we do. Since rollbacks always have delete mutations,
// this logic will work there too.
if (!findPriorValueMutations.isEmpty()) {
List<KeyRange> keys = Lists.newArrayListWithExpectedSize(mutations.size());
for (ImmutableBytesPtr ptr : findPriorValueMutations.keySet()) {
keys.add(PVarbinary.INSTANCE.getKeyRange(ptr.copyBytesIfNecessary()));
}
Scan scan = new Scan();
// Project all mutable columns
for (ColumnReference ref : mutableColumns) {
scan.addColumn(ref.getFamily(), ref.getQualifier());
}
/*
* Indexes inherit the storage scheme of the data table which means all the indexes have the same
* storage scheme and empty key value qualifier. Note that this assumption would be broken if we start
* supporting new indexes over existing data tables to have a different storage scheme than the data
* table.
*/
byte[] emptyKeyValueQualifier = indexMaintainers.get(0).getEmptyKeyValueQualifier();
// Project empty key value column
scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), emptyKeyValueQualifier);
ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, KeyRange.EVERYTHING_RANGE, null, true, -1);
scanRanges.initializeScan(scan);
TableName tableName = env.getRegion().getRegionInfo().getTable();
HTableInterface htable = env.getTable(tableName);
txTable = new TransactionAwareHTable(htable);
txTable.startTx(tx);
// For rollback, we need to see all versions, including
// the last committed version as there may be multiple
// checkpointed versions.
SkipScanFilter filter = scanRanges.getSkipScanFilter();
if (isRollback) {
filter = new SkipScanFilter(filter,true);
tx.setVisibility(VisibilityLevel.SNAPSHOT_ALL);
}
scan.setFilter(filter);
currentScanner = txTable.getScanner(scan);
}
if (isRollback) {
processRollback(env, indexMetaData, txRollbackAttribute, currentScanner, tx, mutableColumns, indexUpdates, mutations);
} else {
processMutation(env, indexMetaData, txRollbackAttribute, currentScanner, tx, mutableColumns, indexUpdates, mutations, findPriorValueMutations);
}
} finally {
if (txTable != null) txTable.close();
}
return indexUpdates;
}
private static boolean isDeleteMutation(Mutation m) {
for (Map.Entry<byte[],List<Cell>> cellMap : m.getFamilyCellMap().entrySet()) {
for (Cell cell : cellMap.getValue()) {
if (cell.getTypeByte() != KeyValue.Type.Put.getCode() || TransactionUtil.isDelete(cell)) {
return true;
}
}
}
return false;
}
private void processMutation(RegionCoprocessorEnvironment env,
PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute,
ResultScanner scanner,
Transaction tx,
Set<ColumnReference> upsertColumns,
Collection<Pair<Mutation, byte[]>> indexUpdates,
Map<ImmutableBytesPtr, MultiMutation> mutations,
Map<ImmutableBytesPtr, MultiMutation> mutationsToFindPreviousValue) throws IOException {
if (scanner != null) {
Result result;
ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0)
.getDataEmptyKeyValueCF(), indexMetaData.getIndexMaintainers().get(0).getEmptyKeyValueQualifier());
// Process existing data table rows by removing the old index row and adding the new index row
while ((result = scanner.next()) != null) {
Mutation m = mutationsToFindPreviousValue.remove(new ImmutableBytesPtr(result.getRow()));
TxTableState state = new TxTableState(env, upsertColumns, indexMetaData.getAttributes(), tx.getWritePointer(), m, emptyColRef, result);
generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state);
generatePuts(indexMetaData, indexUpdates, state);
}
}
// Process new data table by adding new index rows
for (Mutation m : mutations.values()) {
TxTableState state = new TxTableState(env, upsertColumns, indexMetaData.getAttributes(), tx.getWritePointer(), m);
generatePuts(indexMetaData, indexUpdates, state);
}
}
private void processRollback(RegionCoprocessorEnvironment env,
PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute,
ResultScanner scanner,
Transaction tx, Set<ColumnReference> mutableColumns,
Collection<Pair<Mutation, byte[]>> indexUpdates,
Map<ImmutableBytesPtr, MultiMutation> mutations) throws IOException {
if (scanner != null) {
Result result;
// Loop through last committed row state plus all new rows associated with current transaction
// to generate point delete markers for all index rows that were added. We don't have Tephra
// manage index rows in change sets because we don't want to be hit with the additional
// memory hit and do not need to do conflict detection on index rows.
ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), indexMetaData.getIndexMaintainers().get(0).getEmptyKeyValueQualifier());
while ((result = scanner.next()) != null) {
Mutation m = mutations.remove(new ImmutableBytesPtr(result.getRow()));
// Sort by timestamp, type, cf, cq so we can process in time batches from oldest to newest
// (as if we're "replaying" them in time order).
List<Cell> cells = result.listCells();
Collections.sort(cells, new Comparator<Cell>() {
@Override
public int compare(Cell o1, Cell o2) {
int c = Longs.compare(o1.getTimestamp(), o2.getTimestamp());
if (c != 0) return c;
c = o1.getTypeByte() - o2.getTypeByte();
if (c != 0) return c;
c = Bytes.compareTo(o1.getFamilyArray(), o1.getFamilyOffset(), o1.getFamilyLength(), o1.getFamilyArray(), o1.getFamilyOffset(), o1.getFamilyLength());
if (c != 0) return c;
return Bytes.compareTo(o1.getQualifierArray(), o1.getQualifierOffset(), o1.getQualifierLength(), o1.getQualifierArray(), o1.getQualifierOffset(), o1.getQualifierLength());
}
});
int i = 0;
int nCells = cells.size();
Result oldResult = null, newResult;
long readPtr = tx.getReadPointer();
do {
boolean hasPuts = false;
LinkedList<Cell> singleTimeCells = Lists.newLinkedList();
long writePtr;
Cell cell = cells.get(i);
do {
hasPuts |= cell.getTypeByte() == KeyValue.Type.Put.getCode();
writePtr = cell.getTimestamp();
ListIterator<Cell> it = singleTimeCells.listIterator();
do {
// Add at the beginning of the list to match the expected HBase
// newest to oldest sort order (which TxTableState relies on
// with the Result.getLatestColumnValue() calls). However, we
// still want to add Cells in the expected order for each time
// bound as otherwise we won't find it in our old state.
it.add(cell);
} while (++i < nCells && (cell=cells.get(i)).getTimestamp() == writePtr);
} while (i < nCells && cell.getTimestamp() <= readPtr);
// Generate point delete markers for the prior row deletion of the old index value.
// The write timestamp is the next timestamp, not the current timestamp,
// as the earliest cells are the current values for the row (and we don't
// want to delete the current row).
if (oldResult != null) {
TxTableState state = new TxTableState(env, mutableColumns, indexMetaData.getAttributes(), writePtr, m, emptyColRef, oldResult);
generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state);
}
// Generate point delete markers for the new index value.
// If our time batch doesn't have Puts (i.e. we have only Deletes), then do not
// generate deletes. We would have generated the delete above based on the state
// of the previous row. The delete markers do not give us the state we need to
// delete.
if (hasPuts) {
newResult = Result.create(singleTimeCells);
// First row may represent the current state which we don't want to delete
if (writePtr > readPtr) {
TxTableState state = new TxTableState(env, mutableColumns, indexMetaData.getAttributes(), writePtr, m, emptyColRef, newResult);
generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state);
}
oldResult = newResult;
} else {
oldResult = null;
}
} while (i < nCells);
}
}
}
private void generateDeletes(PhoenixIndexMetaData indexMetaData,
Collection<Pair<Mutation, byte[]>> indexUpdates,
byte[] attribValue, TxTableState state) throws IOException {
Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, indexMetaData);
for (IndexUpdate delete : deletes) {
if (delete.isValid()) {
delete.getUpdate().setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, attribValue);
indexUpdates.add(new Pair<Mutation, byte[]>(delete.getUpdate(),delete.getTableName()));
}
}
}
private boolean generatePuts(
PhoenixIndexMetaData indexMetaData,
Collection<Pair<Mutation, byte[]>> indexUpdates,
TxTableState state)
throws IOException {
state.applyMutation();
Iterable<IndexUpdate> puts = codec.getIndexUpserts(state, indexMetaData);
boolean validPut = false;
for (IndexUpdate put : puts) {
if (put.isValid()) {
indexUpdates.add(new Pair<Mutation, byte[]>(put.getUpdate(),put.getTableName()));
validPut = true;
}
}
return validPut;
}
private static class TxTableState implements TableState {
private final Mutation mutation;
private final long currentTimestamp;
private final RegionCoprocessorEnvironment env;
private final Map<String, byte[]> attributes;
private final List<KeyValue> pendingUpdates;
private final Set<ColumnReference> indexedColumns;
private final Map<ColumnReference, ImmutableBytesWritable> valueMap;
private TxTableState(RegionCoprocessorEnvironment env, Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long currentTimestamp, Mutation mutation) {
this.env = env;
this.currentTimestamp = currentTimestamp;
this.indexedColumns = indexedColumns;
this.attributes = attributes;
this.mutation = mutation;
int estimatedSize = indexedColumns.size();
this.valueMap = Maps.newHashMapWithExpectedSize(estimatedSize);
this.pendingUpdates = Lists.newArrayListWithExpectedSize(estimatedSize);
try {
CellScanner scanner = mutation.cellScanner();
while (scanner.advance()) {
Cell cell = scanner.current();
pendingUpdates.add(KeyValueUtil.ensureKeyValue(cell));
}
} catch (IOException e) {
throw new RuntimeException(e); // Impossible
}
}
public TxTableState(RegionCoprocessorEnvironment env, Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long currentTimestamp, Mutation m, ColumnReference emptyColRef, Result r) {
this(env, indexedColumns, attributes, currentTimestamp, m);
for (ColumnReference ref : indexedColumns) {
Cell cell = r.getColumnLatestCell(ref.getFamily(), ref.getQualifier());
if (cell != null) {
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
valueMap.put(ref, ptr);
}
}
}
@Override
public RegionCoprocessorEnvironment getEnvironment() {
return env;
}
@Override
public long getCurrentTimestamp() {
return currentTimestamp;
}
@Override
public Map<String, byte[]> getUpdateAttributes() {
return attributes;
}
@Override
public byte[] getCurrentRowKey() {
return mutation.getRow();
}
@Override
public List<? extends IndexedColumnGroup> getIndexColumnHints() {
return Collections.emptyList();
}
private void applyMutation() {
for (Cell cell : pendingUpdates) {
if (cell.getTypeByte() == KeyValue.Type.Delete.getCode() || cell.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) {
ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
valueMap.remove(ref);
} else if (cell.getTypeByte() == KeyValue.Type.DeleteFamily.getCode() || cell.getTypeByte() == KeyValue.Type.DeleteFamilyVersion.getCode()) {
for (ColumnReference ref : indexedColumns) {
if (ref.matchesFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())) {
valueMap.remove(ref);
}
}
} else if (cell.getTypeByte() == KeyValue.Type.Put.getCode()){
ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
if (indexedColumns.contains(ref)) {
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
valueMap.put(ref, ptr);
}
} else {
throw new IllegalStateException("Unexpected mutation type for " + cell);
}
}
}
@Override
public Collection<KeyValue> getPendingUpdate() {
return pendingUpdates;
}
@Override
public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations, boolean returnNullScannerIfRowNotFound)
throws IOException {
// TODO: creating these objects over and over again is wasteful
ColumnTracker tracker = new ColumnTracker(indexedColumns);
ValueGetter getter = new ValueGetter() {
@Override
public ImmutableBytesWritable getLatestValue(ColumnReference ref) throws IOException {
return valueMap.get(ref);
}
@Override
public byte[] getRowKey() {
return mutation.getRow();
}
};
Pair<ValueGetter, IndexUpdate> pair = new Pair<ValueGetter, IndexUpdate>(getter, new IndexUpdate(tracker));
return pair;
}
}
}