blob: 63d275178b5cdc1ead549500867548fad0388f0e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
* applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
* governing permissions and limitations under the License.
*/
package org.apache.phoenix.hbase.index.covered;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.hbase.index.builder.BaseIndexBuilder;
import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Build covered indexes for phoenix updates.
* <p>
* Before any call to prePut/preDelete, the row has already been locked. This ensures that we don't need to do any extra
* synchronization in the IndexBuilder.
* <p>
* NOTE: This implementation doesn't cleanup the index when we remove a key-value on compaction or flush, leading to a
* bloated index that needs to be cleaned up by a background process.
*/
public class NonTxIndexBuilder extends BaseIndexBuilder {
private static final Logger LOGGER = LoggerFactory.getLogger(NonTxIndexBuilder.class);
@Override
public void setup(RegionCoprocessorEnvironment env) throws IOException {
super.setup(env);
}
@Override
public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData indexMetaData, LocalHBaseState localHBaseState) throws IOException {
// create a state manager, so we can manage each batch
LocalTableState state = new LocalTableState(localHBaseState, mutation);
// build the index updates for each group
IndexUpdateManager manager = new IndexUpdateManager(indexMetaData);
batchMutationAndAddUpdates(manager, state, mutation, indexMetaData);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Found index updates for Mutation: " + mutation + "\n" + manager);
}
return manager.toMap();
}
/**
* Split the mutation into batches based on the timestamps of each keyvalue. We need to check each key-value in the
* update to see if it matches the others. Generally, this will be the case, but you can add kvs to a mutation that
* don't all have the timestamp, so we need to manage everything in batches based on timestamp.
* <p>
* Adds all the updates in the {@link Mutation} to the state, as a side-effect.
* @param state
* current state of the row for the mutation.
* @param m
* mutation to batch
* @param indexMetaData TODO
* @param updateMap
* index updates into which to add new updates. Modified as a side-effect.
*
* @throws IOException
*/
private void batchMutationAndAddUpdates(IndexUpdateManager manager, LocalTableState state, Mutation m, IndexMetaData indexMetaData) throws IOException {
// The cells of a mutation are broken up into time stamp batches prior to this call (in Indexer).
long ts = m.getFamilyCellMap().values().iterator().next().iterator().next().getTimestamp();
Batch batch = new Batch(ts);
for (List<Cell> family : m.getFamilyCellMap().values()) {
for (Cell kv : family) {
batch.add(kv);
if(ts != kv.getTimestamp()) {
throw new IllegalStateException("Time stamps must match for all cells in a batch");
}
}
}
addMutationsForBatch(manager, batch, state, indexMetaData);
}
/**
* For a single batch, get all the index updates and add them to the updateMap
* <p>
* This method manages cleaning up the entire history of the row from the given timestamp forward for out-of-order
* (e.g. 'back in time') updates.
* <p>
* If things arrive out of order (client is using custom timestamps) we should still see the index in the correct
* order (assuming we scan after the out-of-order update in finished). Therefore, we when we aren't the most recent
* update to the index, we need to delete the state at the current timestamp (similar to above), but also issue a
* delete for the added index updates at the next newest timestamp of any of the columns in the update; we need to
* cleanup the insert so it looks like it was also deleted at that next newest timestamp. However, its not enough to
* just update the one in front of us - that column will likely be applied to index entries up the entire history in
* front of us, which also needs to be fixed up.
* <p>
* However, the current update usually will be the most recent thing to be added. In that case, all we need to is
* issue a delete for the previous index row (the state of the row, without the update applied) at the current
* timestamp. This gets rid of anything currently in the index for the current state of the row (at the timestamp).
* Then we can just follow that by applying the pending update and building the index update based on the new row
* state.
*
* @param updateMap
* map to update with new index elements
* @param batch
* timestamp-based batch of edits
* @param state
* local state to update and pass to the codec
* @param indexMetaData TODO
* @return <tt>true</tt> if we cleaned up the current state forward (had a back-in-time put), <tt>false</tt>
* otherwise
* @throws IOException
*/
private boolean addMutationsForBatch(IndexUpdateManager updateMap, Batch batch, LocalTableState state,
IndexMetaData indexMetaData) throws IOException {
// need a temporary manager for the current batch. It should resolve any conflicts for the
// current batch. Essentially, we can get the case where a batch doesn't change the current
// state of the index (all Puts are covered by deletes), in which case we don't want to add
// anything
// A. Get the correct values for the pending state in the batch
// A.1 start by cleaning up the current state - as long as there are key-values in the batch
// that are indexed, we need to change the current state of the index. Its up to the codec to
// determine if we need to make any cleanup given the pending update.
long batchTs = batch.getTimestamp();
state.setPendingUpdates(batch.getKvs());
addCleanupForCurrentBatch(updateMap, batchTs, state, indexMetaData);
// A.2 do a single pass first for the updates to the current state
state.applyPendingUpdates();
addUpdateForGivenTimestamp(batchTs, state, updateMap, indexMetaData);
// FIXME: PHOENIX-4057 do not attempt to issue index updates
// for out-of-order mutations since it corrupts the index.
return false;
}
private long addUpdateForGivenTimestamp(long ts, LocalTableState state, IndexUpdateManager updateMap, IndexMetaData indexMetaData)
throws IOException {
state.setCurrentTimestamp(ts);
ts = addCurrentStateMutationsForBatch(updateMap, state, indexMetaData);
return ts;
}
private void addCleanupForCurrentBatch(IndexUpdateManager updateMap, long batchTs, LocalTableState state, IndexMetaData indexMetaData)
throws IOException {
// get the cleanup for the current state
state.setCurrentTimestamp(batchTs);
addDeleteUpdatesToMap(updateMap, state, batchTs, indexMetaData);
// ignore any index tracking from the delete
state.resetTrackedColumns();
}
/**
* Add the necessary mutations for the pending batch on the local state. Handles rolling up through history to
* determine the index changes after applying the batch (for the case where the batch is back in time).
*
* @param updateMap
* to update with index mutations
* @param state
* current state of the table
* @param indexMetaData TODO
* @param batch
* to apply to the current state
* @return the minimum timestamp across all index columns requested. If {@link ColumnTracker#isNewestTime(long)}
* returns <tt>true</tt> on the returned timestamp, we know that this <i>was not a back-in-time update</i>.
* @throws IOException
*/
private long addCurrentStateMutationsForBatch(IndexUpdateManager updateMap, LocalTableState state, IndexMetaData indexMetaData)
throws IOException {
// get the index updates for this current batch
Iterable<IndexUpdate> upserts = codec.getIndexUpserts(state, indexMetaData, env.getRegionInfo().getStartKey(), env.getRegionInfo().getEndKey());
state.resetTrackedColumns();
/*
* go through all the pending updates. If we are sure that all the entries are the latest timestamp, we can just
* add the index updates and move on. However, if there are columns that we skip past (based on the timestamp of
* the batch), we need to roll back up the history. Regardless of whether or not they are the latest timestamp,
* the entries here are going to be correct for the current batch timestamp, so we add them to the updates. The
* only thing we really care about it if we need to roll up the history and fix it as we go.
*/
// timestamp of the next update we need to track
long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP;
for (IndexUpdate update : upserts) {
// this is the one bit where we check the timestamps
final ColumnTracker tracker = update.getIndexedColumns();
long trackerTs = tracker.getTS();
// update the next min TS we need to track
if (trackerTs < minTs) {
minTs = tracker.getTS();
}
// FIXME: PHOENIX-4057 do not attempt to issue index updates
// for out-of-order mutations since it corrupts the index.
if (tracker.hasNewerTimestamps()) {
continue;
}
// only make the put if the index update has been setup
if (update.isValid()) {
byte[] table = update.getTableName();
Mutation mutation = update.getUpdate();
updateMap.addIndexUpdate(table, mutation);
}
}
return minTs;
}
/**
* Get the index deletes from the codec {@link IndexCodec#getIndexDeletes(TableState, IndexMetaData, byte[], byte[])} and then add them to the
* update map.
* <p>
* Expects the {@link LocalTableState} to already be correctly setup (correct timestamp, updates applied, etc).
* @param indexMetaData TODO
*
* @throws IOException
*/
protected void addDeleteUpdatesToMap(IndexUpdateManager updateMap, LocalTableState state, long ts, IndexMetaData indexMetaData)
throws IOException {
Iterable<IndexUpdate> cleanup = codec.getIndexDeletes(state, indexMetaData, env.getRegionInfo().getStartKey(), env.getRegionInfo().getEndKey());
if (cleanup != null) {
for (IndexUpdate d : cleanup) {
if (!d.isValid()) {
continue;
}
// FIXME: PHOENIX-4057 do not attempt to issue index updates
// for out-of-order mutations since it corrupts the index.
final ColumnTracker tracker = d.getIndexedColumns();
if (tracker.hasNewerTimestamps()) {
continue;
}
// override the timestamps in the delete to match the current batch.
Delete remove = (Delete)d.getUpdate();
remove.setTimestamp(ts);
updateMap.addIndexUpdate(d.getTableName(), remove);
}
}
}
@Override
public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(Collection<Cell> filtered, IndexMetaData indexMetaData)
throws IOException {
// TODO Implement IndexBuilder.getIndexUpdateForFilteredRows
return null;
}
}