blob: bf592ba70c74e86840606b2f0e21443e73ddb3b3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
* applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
* governing permissions and limitations under the License.
*/
package org.apache.phoenix.hbase.index.covered;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.hbase.index.covered.data.IndexMemStore;
import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
import org.apache.phoenix.hbase.index.scanner.ScannerBuilder;
import org.apache.phoenix.hbase.index.scanner.ScannerBuilder.CoveredDeleteScanner;
import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
/**
* Manage the state of the HRegion's view of the table, for the single row.
* <p>
* Currently, this is a single-use object - you need to create a new one for each row that you need to manage. In the
* future, we could make this object reusable, but for the moment its easier to manage as a throw-away object.
* <p>
* This class is <b>not</b> thread-safe - it requires external synchronization is access concurrently.
*/
public class LocalTableState implements TableState {
private long ts;
private KeyValueStore memstore;
private LocalHBaseState table;
private Mutation update;
private Set<ColumnTracker> trackedColumns = new HashSet<ColumnTracker>();
private ScannerBuilder scannerBuilder;
private List<Cell> kvs = new ArrayList<Cell>();
private List<? extends IndexedColumnGroup> hints;
private CoveredColumns columnSet;
public LocalTableState(LocalHBaseState table, Mutation update) {
this.table = table;
this.update = update;
this.memstore = new IndexMemStore();
this.scannerBuilder = new ScannerBuilder(memstore, update);
this.columnSet = new CoveredColumns();
}
public void addPendingUpdates(Cell... kvs) {
if (kvs == null) return;
addPendingUpdates(Arrays.asList(kvs));
}
public void addPendingUpdates(List<Cell> kvs) {
if (kvs == null) return;
setPendingUpdates(kvs);
addUpdate(kvs);
}
private void addUpdate(List<Cell> list) {
addUpdate(list, true);
}
private void addUpdate(List<Cell> list, boolean overwrite) {
if (list == null) return;
for (Cell kv : list) {
this.memstore.add(kv, overwrite);
}
}
private void addUpdateCells(List<Cell> list, boolean overwrite) {
if (list == null) return;
// Avoid a copy of the Cell into a KeyValue if it's already a KeyValue
for (Cell c : list) {
this.memstore.add(c, overwrite);
}
}
@Override
public long getCurrentTimestamp() {
return this.ts;
}
/**
* Set the current timestamp up to which the table should allow access to the underlying table.
* This overrides the timestamp view provided by the indexer - use with care!
* @param timestamp timestamp up to which the table should allow access.
*/
public void setCurrentTimestamp(long timestamp) {
this.ts = timestamp;
}
public void resetTrackedColumns() {
this.trackedColumns.clear();
}
public Set<ColumnTracker> getTrackedColumns() {
return this.trackedColumns;
}
/**
* Get a scanner on the columns that are needed by the index.
* <p>
* The returned scanner is already pre-seeked to the first {@link KeyValue} that matches the given
* columns with a timestamp earlier than the timestamp to which the table is currently set (the
* current state of the table for which we need to build an update).
* <p>
* If none of the passed columns matches any of the columns in the pending update (as determined
* by {@link ColumnReference#matchesFamily(byte[])} and
* {@link ColumnReference#matchesQualifier(byte[])}, then an empty scanner will be returned. This
* is because it doesn't make sense to build index updates when there is no change in the table
* state for any of the columns you are indexing.
* <p>
* <i>NOTE:</i> This method should <b>not</b> be used during
* {@link IndexCodec#getIndexDeletes(TableState, BatchState, byte[], byte[])} as the pending update will not yet have been
* applied - you are merely attempting to cleanup the current state and therefore do <i>not</i>
* need to track the indexed columns.
* <p>
* As a side-effect, we update a timestamp for the next-most-recent timestamp for the columns you
* request - you will never see a column with the timestamp we are tracking, but the next oldest
* timestamp for that column.
* @param indexedColumns the columns to that will be indexed
* @param ignoreNewerMutations ignore mutations newer than m when determining current state. Useful
* when replaying mutation state for partial index rebuild where writes succeeded to the data
* table, but not to the index table.
* @param indexMetaData TODO
* @return an iterator over the columns and the {@link IndexUpdate} that should be passed back to
* the builder. Even if no update is necessary for the requested columns, you still need
* to return the {@link IndexUpdate}, just don't set the update for the
* {@link IndexUpdate}.
* @throws IOException
*/
public Pair<CoveredDeleteScanner, IndexUpdate> getIndexedColumnsTableState(
Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations, boolean isStateForDeletes, IndexMetaData indexMetaData) throws IOException {
// check to see if we haven't initialized any columns yet
Collection<? extends ColumnReference> toCover = this.columnSet.findNonCoveredColumns(indexedColumns);
// add the covered columns to the set
for (ColumnReference ref : toCover) {
this.columnSet.addColumn(ref);
}
boolean requiresPriorRowState = indexMetaData.requiresPriorRowState(update);
if (!toCover.isEmpty()) {
// no need to perform scan to find prior row values when the indexed columns are immutable, as
// by definition, there won't be any. If we have indexed non row key columns, then we need to
// look up the row so that we can formulate the delete of the index row correctly. We'll always
// have our "empty" key value column, so we check if we have more than that as a basis for
// needing to lookup the prior row values.
if (requiresPriorRowState) {
// add the current state of the row. Uses listCells() to avoid a new array creation.
this.addUpdateCells(this.table.getCurrentRowState(update, toCover, ignoreNewerMutations), false);
}
}
// filter out things with a newer timestamp and track the column references to which it applies
ColumnTracker tracker = new ColumnTracker(indexedColumns);
synchronized (this.trackedColumns) {
// we haven't seen this set of columns before, so we need to create a new tracker
if (!this.trackedColumns.contains(tracker)) {
this.trackedColumns.add(tracker);
}
}
CoveredDeleteScanner scanner = this.scannerBuilder.buildIndexedColumnScanner(indexedColumns, tracker, ts,
// If we're determining the index state for deletes and either
// a) we've looked up the prior row state or
// b) we're inserting immutable data
// then allow a null scanner to be returned.
// FIXME: this is crappy code - we need to simplify the global mutable secondary index implementation
// TODO: use mutable transactional secondary index implementation instead (PhoenixTransactionalIndexer)
isStateForDeletes && (requiresPriorRowState || insertingData(update)));
return new Pair<CoveredDeleteScanner, IndexUpdate>(scanner, new IndexUpdate(tracker));
}
private static boolean insertingData(Mutation m) {
for (Collection<Cell> cells : m.getFamilyCellMap().values()) {
for (Cell cell : cells) {
if (KeyValue.Type.codeToType(cell.getTypeByte()) != KeyValue.Type.Put) {
return false;
}
}
}
return true;
}
@Override
public byte[] getCurrentRowKey() {
return this.update.getRow();
}
/**
* @param hints
*/
public void setHints(List<? extends IndexedColumnGroup> hints) {
this.hints = hints;
}
@Override
public List<? extends IndexedColumnGroup> getIndexColumnHints() {
return this.hints;
}
@Override
public Collection<Cell> getPendingUpdate() {
return this.kvs;
}
/**
* Set the {@link KeyValue}s in the update for which we are currently building an index update, but don't actually
* apply them.
*
* @param update
* pending {@link KeyValue}s
*/
public void setPendingUpdates(Collection<Cell> update) {
this.kvs.clear();
this.kvs.addAll(update);
}
/**
* Apply the {@link KeyValue}s set in {@link #setPendingUpdates(Collection)}.
*/
public void applyPendingUpdates() {
this.addUpdate(kvs);
}
/**
* Rollback all the given values from the underlying state.
*
* @param values
*/
public void rollback(Collection<KeyValue> values) {
for (KeyValue kv : values) {
this.memstore.rollback(kv);
}
}
@Override
public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations, boolean isStateForDeletes, IndexMetaData indexMetaData)
throws IOException {
Pair<CoveredDeleteScanner, IndexUpdate> pair = getIndexedColumnsTableState(indexedColumns, ignoreNewerMutations, isStateForDeletes, indexMetaData);
ValueGetter valueGetter = IndexManagementUtil.createGetterFromScanner(pair.getFirst(), getCurrentRowKey());
return new Pair<ValueGetter, IndexUpdate>(valueGetter, pair.getSecond());
}
}