| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.db.view; |
| |
| import java.nio.ByteBuffer; |
| import java.util.*; |
| |
| import com.google.common.collect.Iterators; |
| import com.google.common.collect.PeekingIterator; |
| |
| import org.apache.cassandra.config.CFMetaData; |
| import org.apache.cassandra.config.ColumnDefinition; |
| import org.apache.cassandra.db.*; |
| import org.apache.cassandra.db.rows.*; |
| import org.apache.cassandra.db.partitions.*; |
| import org.apache.cassandra.db.marshal.AbstractType; |
| import org.apache.cassandra.db.marshal.CompositeType; |
| |
| /** |
| * Creates the updates to apply to a view given the existing rows in the base |
| * table and the updates that we're applying to them (this handles updates |
| * on a single partition only). |
| * |
| * This class is used by passing the updates made to the base table to |
| * {@link #addBaseTableUpdate} and calling {@link #generateViewUpdates} once all updates have |
| * been handled to get the resulting view mutations. |
| */ |
| public class ViewUpdateGenerator |
| { |
| private final View view; |
| private final int nowInSec; |
| |
| private final CFMetaData baseMetadata; |
| private final DecoratedKey baseDecoratedKey; |
| private final ByteBuffer[] basePartitionKey; |
| |
| private final CFMetaData viewMetadata; |
| private final boolean baseEnforceStrictLiveness; |
| |
| private final Map<DecoratedKey, PartitionUpdate> updates = new HashMap<>(); |
| |
| // Reused internally to build a new entry |
| private final ByteBuffer[] currentViewEntryPartitionKey; |
| private final Row.Builder currentViewEntryBuilder; |
| |
| /** |
| * The type of type update action to perform to the view for a given base table |
| * update. |
| */ |
| private enum UpdateAction |
| { |
| NONE, // There was no view entry and none should be added |
| NEW_ENTRY, // There was no entry but there is one post-update |
| DELETE_OLD, // There was an entry but there is nothing after update |
| UPDATE_EXISTING, // There was an entry and the update modifies it |
| SWITCH_ENTRY // There was an entry and there is still one after update, |
| // but they are not the same one. |
| } |
| |
| /** |
| * Creates a new {@code ViewUpdateBuilder}. |
| * |
| * @param view the view for which this will be building updates for. |
| * @param basePartitionKey the partition key for the base table partition for which |
| * we'll handle updates for. |
| * @param nowInSec the current time in seconds. Used to decide if data are live or not |
| * and as base reference for new deletions. |
| */ |
| public ViewUpdateGenerator(View view, DecoratedKey basePartitionKey, int nowInSec) |
| { |
| this.view = view; |
| this.nowInSec = nowInSec; |
| |
| this.baseMetadata = view.getDefinition().baseTableMetadata(); |
| this.baseEnforceStrictLiveness = baseMetadata.enforceStrictLiveness(); |
| this.baseDecoratedKey = basePartitionKey; |
| this.basePartitionKey = extractKeyComponents(basePartitionKey, baseMetadata.getKeyValidator()); |
| |
| this.viewMetadata = view.getDefinition().metadata; |
| |
| this.currentViewEntryPartitionKey = new ByteBuffer[viewMetadata.partitionKeyColumns().size()]; |
| this.currentViewEntryBuilder = BTreeRow.sortedBuilder(); |
| } |
| |
| private static ByteBuffer[] extractKeyComponents(DecoratedKey partitionKey, AbstractType<?> type) |
| { |
| return type instanceof CompositeType |
| ? ((CompositeType)type).split(partitionKey.getKey()) |
| : new ByteBuffer[]{ partitionKey.getKey() }; |
| } |
| |
| /** |
| * Adds to this generator the updates to be made to the view given a base table row |
| * before and after an update. |
| * |
| * @param existingBaseRow the base table row as it is before an update. |
| * @param mergedBaseRow the base table row after the update is applied (note that |
| * this is not just the new update, but rather the resulting row). |
| */ |
| public void addBaseTableUpdate(Row existingBaseRow, Row mergedBaseRow) |
| { |
| switch (updateAction(existingBaseRow, mergedBaseRow)) |
| { |
| case NONE: |
| return; |
| case NEW_ENTRY: |
| createEntry(mergedBaseRow); |
| return; |
| case DELETE_OLD: |
| deleteOldEntry(existingBaseRow, mergedBaseRow); |
| return; |
| case UPDATE_EXISTING: |
| updateEntry(existingBaseRow, mergedBaseRow); |
| return; |
| case SWITCH_ENTRY: |
| createEntry(mergedBaseRow); |
| deleteOldEntry(existingBaseRow, mergedBaseRow); |
| return; |
| } |
| } |
| |
| /** |
| * Returns the updates that needs to be done to the view given the base table updates |
| * passed to {@link #generateViewMutations}. |
| * |
| * @return the updates to do to the view. |
| */ |
| public Collection<PartitionUpdate> generateViewUpdates() |
| { |
| return updates.values(); |
| } |
| |
| /** |
| * Clears the current state so that the generator may be reused. |
| */ |
| public void clear() |
| { |
| updates.clear(); |
| } |
| |
| /** |
| * Compute which type of action needs to be performed to the view for a base table row |
| * before and after an update. |
| */ |
| private UpdateAction updateAction(Row existingBaseRow, Row mergedBaseRow) |
| { |
| // Having existing empty is useful, it just means we'll insert a brand new entry for mergedBaseRow, |
| // but if we have no update at all, we shouldn't get there. |
| assert !mergedBaseRow.isEmpty(); |
| |
| // Note that none of the base PK columns will differ since we're intrinsically dealing |
| // with the same base row. So we have to check 3 things: |
| // 1) that the clustering doesn't have a null, which can happen for compact tables. If that's the case, |
| // there is no corresponding entries. |
| // 2) if there is a column not part of the base PK in the view PK, whether it is changed by the update. |
| // 3) whether mergedBaseRow actually match the view SELECT filter |
| |
| if (baseMetadata.isCompactTable()) |
| { |
| Clustering clustering = mergedBaseRow.clustering(); |
| for (int i = 0; i < clustering.size(); i++) |
| { |
| if (clustering.get(i) == null) |
| return UpdateAction.NONE; |
| } |
| } |
| |
| assert view.baseNonPKColumnsInViewPK.size() <= 1 : "We currently only support one base non-PK column in the view PK"; |
| |
| if (view.baseNonPKColumnsInViewPK.isEmpty()) |
| { |
| // The view entry is necessarily the same pre and post update. |
| |
| // Note that we allow existingBaseRow to be null and treat it as empty (see MultiViewUpdateBuilder.generateViewsMutations). |
| boolean existingHasLiveData = existingBaseRow != null && existingBaseRow.hasLiveData(nowInSec, baseEnforceStrictLiveness); |
| boolean mergedHasLiveData = mergedBaseRow.hasLiveData(nowInSec, baseEnforceStrictLiveness); |
| return existingHasLiveData |
| ? (mergedHasLiveData ? UpdateAction.UPDATE_EXISTING : UpdateAction.DELETE_OLD) |
| : (mergedHasLiveData ? UpdateAction.NEW_ENTRY : UpdateAction.NONE); |
| } |
| |
| ColumnDefinition baseColumn = view.baseNonPKColumnsInViewPK.get(0); |
| assert !baseColumn.isComplex() : "A complex column couldn't be part of the view PK"; |
| Cell before = existingBaseRow == null ? null : existingBaseRow.getCell(baseColumn); |
| Cell after = mergedBaseRow.getCell(baseColumn); |
| |
| // If the update didn't modified this column, the cells will be the same object so it's worth checking |
| if (before == after) |
| return isLive(before) ? UpdateAction.UPDATE_EXISTING : UpdateAction.NONE; |
| |
| if (!isLive(before)) |
| return isLive(after) ? UpdateAction.NEW_ENTRY : UpdateAction.NONE; |
| if (!isLive(after)) |
| { |
| return UpdateAction.DELETE_OLD; |
| } |
| |
| return baseColumn.cellValueType().compare(before.value(), after.value()) == 0 |
| ? UpdateAction.UPDATE_EXISTING |
| : UpdateAction.SWITCH_ENTRY; |
| } |
| |
| private boolean matchesViewFilter(Row baseRow) |
| { |
| return view.matchesViewFilter(baseDecoratedKey, baseRow, nowInSec); |
| } |
| |
| private boolean isLive(Cell cell) |
| { |
| return cell != null && cell.isLive(nowInSec); |
| } |
| |
| /** |
| * Creates a view entry corresponding to the provided base row. |
| * <p> |
| * This method checks that the base row does match the view filter before applying it. |
| */ |
| private void createEntry(Row baseRow) |
| { |
| // Before create a new entry, make sure it matches the view filter |
| if (!matchesViewFilter(baseRow)) |
| return; |
| |
| startNewUpdate(baseRow); |
| currentViewEntryBuilder.addPrimaryKeyLivenessInfo(computeLivenessInfoForEntry(baseRow)); |
| currentViewEntryBuilder.addRowDeletion(baseRow.deletion()); |
| |
| for (ColumnData data : baseRow) |
| { |
| ColumnDefinition viewColumn = view.getViewColumn(data.column()); |
| // If that base table column is not denormalized in the view, we had nothing to do. |
| // Alose, if it's part of the view PK it's already been taken into account in the clustering. |
| if (viewColumn == null || viewColumn.isPrimaryKeyColumn()) |
| continue; |
| |
| addColumnData(viewColumn, data); |
| } |
| |
| submitUpdate(); |
| } |
| |
| /** |
| * Creates the updates to apply to the existing view entry given the base table row before |
| * and after the update, assuming that the update hasn't changed to which view entry the |
| * row correspond (that is, we know the columns composing the view PK haven't changed). |
| * <p> |
| * This method checks that the base row (before and after) does match the view filter before |
| * applying anything. |
| */ |
| private void updateEntry(Row existingBaseRow, Row mergedBaseRow) |
| { |
| // While we know existingBaseRow and mergedBaseRow are corresponding to the same view entry, |
| // they may not match the view filter. |
| if (!matchesViewFilter(existingBaseRow)) |
| { |
| createEntry(mergedBaseRow); |
| return; |
| } |
| if (!matchesViewFilter(mergedBaseRow)) |
| { |
| deleteOldEntryInternal(existingBaseRow, mergedBaseRow); |
| return; |
| } |
| |
| startNewUpdate(mergedBaseRow); |
| |
| // In theory, it may be the PK liveness and row deletion hasn't been change by the update |
| // and we could condition the 2 additions below. In practice though, it's as fast (if not |
| // faster) to compute those info than to check if they have changed so we keep it simple. |
| currentViewEntryBuilder.addPrimaryKeyLivenessInfo(computeLivenessInfoForEntry(mergedBaseRow)); |
| currentViewEntryBuilder.addRowDeletion(mergedBaseRow.deletion()); |
| |
| addDifferentCells(existingBaseRow, mergedBaseRow); |
| submitUpdate(); |
| } |
| |
| private void addDifferentCells(Row existingBaseRow, Row mergedBaseRow) |
| { |
| // We only add to the view update the cells from mergedBaseRow that differs from |
| // existingBaseRow. For that and for speed we can just cell pointer equality: if the update |
| // hasn't touched a cell, we know it will be the same object in existingBaseRow and |
| // mergedBaseRow (note that including more cells than we strictly should isn't a problem |
| // for correction, so even if the code change and pointer equality don't work anymore, it'll |
| // only a slightly inefficiency which we can fix then). |
| // Note: we could alternatively use Rows.diff() for this, but because it is a bit more generic |
| // than what we need here, it's also a bit less efficient (it allocates more in particular), |
| // and this might be called a lot of time for view updates. So, given that this is not a whole |
| // lot of code anyway, it's probably doing the diff manually. |
| PeekingIterator<ColumnData> existingIter = Iterators.peekingIterator(existingBaseRow.iterator()); |
| for (ColumnData mergedData : mergedBaseRow) |
| { |
| ColumnDefinition baseColumn = mergedData.column(); |
| ColumnDefinition viewColumn = view.getViewColumn(baseColumn); |
| // If that base table column is not denormalized in the view, we had nothing to do. |
| // Alose, if it's part of the view PK it's already been taken into account in the clustering. |
| if (viewColumn == null || viewColumn.isPrimaryKeyColumn()) |
| continue; |
| |
| ColumnData existingData = null; |
| // Find if there is data for that column in the existing row |
| while (existingIter.hasNext()) |
| { |
| int cmp = baseColumn.compareTo(existingIter.peek().column()); |
| if (cmp < 0) |
| break; |
| |
| ColumnData next = existingIter.next(); |
| if (cmp == 0) |
| { |
| existingData = next; |
| break; |
| } |
| } |
| |
| if (existingData == null) |
| { |
| addColumnData(viewColumn, mergedData); |
| continue; |
| } |
| |
| if (mergedData == existingData) |
| continue; |
| |
| if (baseColumn.isComplex()) |
| { |
| ComplexColumnData mergedComplexData = (ComplexColumnData)mergedData; |
| ComplexColumnData existingComplexData = (ComplexColumnData)existingData; |
| if (mergedComplexData.complexDeletion().supersedes(existingComplexData.complexDeletion())) |
| currentViewEntryBuilder.addComplexDeletion(viewColumn, mergedComplexData.complexDeletion()); |
| |
| PeekingIterator<Cell> existingCells = Iterators.peekingIterator(existingComplexData.iterator()); |
| for (Cell mergedCell : mergedComplexData) |
| { |
| Cell existingCell = null; |
| // Find if there is corresponding cell in the existing row |
| while (existingCells.hasNext()) |
| { |
| int cmp = baseColumn.cellPathComparator().compare(mergedCell.path(), existingCells.peek().path()); |
| if (cmp > 0) |
| break; |
| |
| Cell next = existingCells.next(); |
| if (cmp == 0) |
| { |
| existingCell = next; |
| break; |
| } |
| } |
| |
| if (mergedCell != existingCell) |
| addCell(viewColumn, mergedCell); |
| } |
| } |
| else |
| { |
| // Note that we've already eliminated the case where merged == existing |
| addCell(viewColumn, (Cell)mergedData); |
| } |
| } |
| } |
| |
| /** |
| * Deletes the view entry corresponding to the provided base row. |
| * <p> |
| * This method checks that the base row does match the view filter before bothering. |
| */ |
| private void deleteOldEntry(Row existingBaseRow, Row mergedBaseRow) |
| { |
| // Before deleting an old entry, make sure it was matching the view filter (otherwise there is nothing to delete) |
| if (!matchesViewFilter(existingBaseRow)) |
| return; |
| |
| deleteOldEntryInternal(existingBaseRow, mergedBaseRow); |
| } |
| |
| private void deleteOldEntryInternal(Row existingBaseRow, Row mergedBaseRow) |
| { |
| startNewUpdate(existingBaseRow); |
| long timestamp = computeTimestampForEntryDeletion(existingBaseRow, mergedBaseRow); |
| long rowDeletion = mergedBaseRow.deletion().time().markedForDeleteAt(); |
| assert timestamp >= rowDeletion; |
| |
| // If computed deletion timestamp greater than row deletion, it must be coming from |
| // 1. non-pk base column used in view pk, or |
| // 2. unselected base column |
| // any case, we need to use it as expired livenessInfo |
| // If computed deletion timestamp is from row deletion, we only need row deletion itself |
| if (timestamp > rowDeletion) |
| { |
| /** |
| * We use an expired liveness instead of a row tombstone to allow a shadowed MV |
| * entry to co-exist with a row tombstone, see ViewComplexTest#testCommutativeRowDeletion. |
| * |
| * TODO This is a dirty overload of LivenessInfo and we should modify |
| * the storage engine to properly support this on CASSANDRA-13826. |
| */ |
| LivenessInfo info = LivenessInfo.create(timestamp, LivenessInfo.EXPIRED_LIVENESS_TTL, nowInSec); |
| currentViewEntryBuilder.addPrimaryKeyLivenessInfo(info); |
| } |
| currentViewEntryBuilder.addRowDeletion(mergedBaseRow.deletion()); |
| |
| addDifferentCells(existingBaseRow, mergedBaseRow); |
| submitUpdate(); |
| } |
| |
| /** |
| * Computes the partition key and clustering for a new view entry, and setup the internal |
| * row builder for the new row. |
| * |
| * This assumes that there is corresponding entry, i.e. no values for the partition key and |
| * clustering are null (since we have eliminated that case through updateAction). |
| */ |
| private void startNewUpdate(Row baseRow) |
| { |
| ByteBuffer[] clusteringValues = new ByteBuffer[viewMetadata.clusteringColumns().size()]; |
| for (ColumnDefinition viewColumn : viewMetadata.primaryKeyColumns()) |
| { |
| ColumnDefinition baseColumn = view.getBaseColumn(viewColumn); |
| ByteBuffer value = getValueForPK(baseColumn, baseRow); |
| if (viewColumn.isPartitionKey()) |
| currentViewEntryPartitionKey[viewColumn.position()] = value; |
| else |
| clusteringValues[viewColumn.position()] = value; |
| } |
| |
| currentViewEntryBuilder.newRow(new Clustering(clusteringValues)); |
| } |
| |
| private LivenessInfo computeLivenessInfoForEntry(Row baseRow) |
| { |
| /** |
| * There 3 cases: |
| * 1. No extra primary key in view and all base columns are selected in MV. all base row's components(livenessInfo, |
| * deletion, cells) are same as view row. Simply map base components to view row. |
| * 2. There is a base non-key column used in view pk. This base non-key column determines the liveness of view row. view's row level |
| * info should based on this column. |
| * 3. Most tricky case is no extra primary key in view and some base columns are not selected in MV. We cannot use 1 livenessInfo or |
| * row deletion to represent the liveness of unselected column properly, see CASSANDRA-11500. |
| * We could make some simplification: the unselected columns will be used only when it affects view row liveness. eg. if view row |
| * already exists and not expiring, there is no need to use unselected columns. |
| * Note: if the view row is removed due to unselected column removal(ttl or cell tombstone), we will have problem keeping view |
| * row alive with a smaller or equal timestamp than the max unselected column timestamp. |
| * |
| */ |
| assert view.baseNonPKColumnsInViewPK.size() <= 1; // This may change, but is currently an enforced limitation |
| |
| LivenessInfo baseLiveness = baseRow.primaryKeyLivenessInfo(); |
| |
| if (view.hasSamePrimaryKeyColumnsAsBaseTable()) |
| { |
| if (view.getDefinition().includeAllColumns) |
| return baseLiveness; |
| |
| long timestamp = baseLiveness.timestamp(); |
| boolean hasNonExpiringLiveCell = false; |
| Cell biggestExpirationCell = null; |
| for (Cell cell : baseRow.cells()) |
| { |
| if (view.getViewColumn(cell.column()) != null) |
| continue; |
| if (!isLive(cell)) |
| continue; |
| timestamp = Math.max(timestamp, cell.maxTimestamp()); |
| if (!cell.isExpiring()) |
| hasNonExpiringLiveCell = true; |
| else |
| { |
| if (biggestExpirationCell == null) |
| biggestExpirationCell = cell; |
| else if (cell.localDeletionTime() > biggestExpirationCell.localDeletionTime()) |
| biggestExpirationCell = cell; |
| } |
| } |
| if (baseLiveness.isLive(nowInSec) && !baseLiveness.isExpiring()) |
| return LivenessInfo.create(viewMetadata, timestamp, nowInSec); |
| if (hasNonExpiringLiveCell) |
| return LivenessInfo.create(viewMetadata, timestamp, nowInSec); |
| if (biggestExpirationCell == null) |
| return baseLiveness; |
| if (biggestExpirationCell.localDeletionTime() > baseLiveness.localExpirationTime() |
| || !baseLiveness.isLive(nowInSec)) |
| return LivenessInfo.create(timestamp, |
| biggestExpirationCell.ttl(), |
| biggestExpirationCell.localDeletionTime()); |
| return baseLiveness; |
| } |
| |
| Cell cell = baseRow.getCell(view.baseNonPKColumnsInViewPK.get(0)); |
| assert isLive(cell) : "We shouldn't have got there if the base row had no associated entry"; |
| |
| return LivenessInfo.create(cell.timestamp(), cell.ttl(), cell.localDeletionTime()); |
| } |
| |
| private long computeTimestampForEntryDeletion(Row existingBaseRow, Row mergedBaseRow) |
| { |
| DeletionTime deletion = mergedBaseRow.deletion().time(); |
| if (view.hasSamePrimaryKeyColumnsAsBaseTable()) |
| { |
| long timestamp = Math.max(deletion.markedForDeleteAt(), existingBaseRow.primaryKeyLivenessInfo().timestamp()); |
| if (view.getDefinition().includeAllColumns) |
| return timestamp; |
| |
| for (Cell cell : existingBaseRow.cells()) |
| { |
| // selected column should not contribute to view deletion, itself is already included in view row |
| if (view.getViewColumn(cell.column()) != null) |
| continue; |
| // unselected column is used regardless live or dead, because we don't know if it was used for liveness. |
| timestamp = Math.max(timestamp, cell.maxTimestamp()); |
| } |
| return timestamp; |
| } |
| // has base non-pk column in view pk |
| Cell before = existingBaseRow.getCell(view.baseNonPKColumnsInViewPK.get(0)); |
| assert isLive(before) : "We shouldn't have got there if the base row had no associated entry"; |
| return deletion.deletes(before) ? deletion.markedForDeleteAt() : before.timestamp(); |
| } |
| |
| private void addColumnData(ColumnDefinition viewColumn, ColumnData baseTableData) |
| { |
| assert viewColumn.isComplex() == baseTableData.column().isComplex(); |
| if (!viewColumn.isComplex()) |
| { |
| addCell(viewColumn, (Cell)baseTableData); |
| return; |
| } |
| |
| ComplexColumnData complexData = (ComplexColumnData)baseTableData; |
| currentViewEntryBuilder.addComplexDeletion(viewColumn, complexData.complexDeletion()); |
| for (Cell cell : complexData) |
| addCell(viewColumn, cell); |
| } |
| |
| private void addCell(ColumnDefinition viewColumn, Cell baseTableCell) |
| { |
| assert !viewColumn.isPrimaryKeyColumn(); |
| currentViewEntryBuilder.addCell(baseTableCell.withUpdatedColumn(viewColumn)); |
| } |
| |
| /** |
| * Finish building the currently updated view entry and add it to the other built |
| * updates. |
| */ |
| private void submitUpdate() |
| { |
| Row row = currentViewEntryBuilder.build(); |
| // I'm not sure we can reach there is there is nothing is updated, but adding an empty row breaks things |
| // and it costs us nothing to be prudent here. |
| if (row.isEmpty()) |
| return; |
| |
| DecoratedKey partitionKey = makeCurrentPartitionKey(); |
| PartitionUpdate update = updates.get(partitionKey); |
| if (update == null) |
| { |
| // We can't really know which columns of the view will be updated nor how many row will be updated for this key |
| // so we rely on hopefully sane defaults. |
| update = new PartitionUpdate(viewMetadata, partitionKey, viewMetadata.partitionColumns(), 4); |
| updates.put(partitionKey, update); |
| } |
| update.add(row); |
| } |
| |
| private DecoratedKey makeCurrentPartitionKey() |
| { |
| ByteBuffer rawKey = viewMetadata.partitionKeyColumns().size() == 1 |
| ? currentViewEntryPartitionKey[0] |
| : CompositeType.build(currentViewEntryPartitionKey); |
| |
| return viewMetadata.decorateKey(rawKey); |
| } |
| |
| private ByteBuffer getValueForPK(ColumnDefinition column, Row row) |
| { |
| switch (column.kind) |
| { |
| case PARTITION_KEY: |
| return basePartitionKey[column.position()]; |
| case CLUSTERING: |
| return row.clustering().get(column.position()); |
| default: |
| // This shouldn't NPE as we shouldn't get there if the value can be null (or there is a bug in updateAction()) |
| return row.getCell(column).value(); |
| } |
| } |
| } |