| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package harry.reconciler; |
| |
| import java.util.*; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import harry.ddl.ColumnSpec; |
| import harry.ddl.SchemaSpec; |
| import harry.operations.Query; |
| import harry.util.BitSet; |
| import harry.util.Ranges; |
| |
| import static harry.generators.DataGenerators.NIL_DESCR; |
| import static harry.generators.DataGenerators.UNSET_DESCR; |
| import static harry.model.Model.NO_TIMESTAMP; |
| |
| public class PartitionState implements Iterable<Reconciler.RowState> |
| { |
| private static final Logger logger = LoggerFactory.getLogger(Reconciler.class); |
| |
| final long pd; |
| final long debugCd; |
| final SchemaSpec schema; |
| |
| // Collected state |
| Reconciler.RowState staticRow; |
| final NavigableMap<Long, Reconciler.RowState> rows; |
| |
| public PartitionState(long pd, long debugCd, SchemaSpec schema) |
| { |
| this.pd = pd; |
| this.rows = new TreeMap<>(); |
| if (!schema.staticColumns.isEmpty()) |
| { |
| staticRow = new Reconciler.RowState(this, |
| Reconciler.STATIC_CLUSTERING, |
| Reconciler.arr(schema.staticColumns.size(), NIL_DESCR), |
| Reconciler.arr(schema.staticColumns.size(), NO_TIMESTAMP)); |
| } |
| this.debugCd = debugCd; |
| this.schema = schema; |
| } |
| |
| public NavigableMap<Long, Reconciler.RowState> rows() |
| { |
| return rows; |
| } |
| |
| public void writeStaticRow(long[] staticVds, long lts) |
| { |
| if (staticRow != null) |
| staticRow = updateRowState(staticRow, schema.staticColumns, Reconciler.STATIC_CLUSTERING, staticVds, lts, false); |
| } |
| |
| public void write(long cd, long[] vds, long lts, boolean writePrimaryKeyLiveness) |
| { |
| rows.compute(cd, (cd_, current) -> updateRowState(current, schema.regularColumns, cd, vds, lts, writePrimaryKeyLiveness)); |
| } |
| |
| public void delete(Ranges.Range range, long lts) |
| { |
| if (range.minBound > range.maxBound) |
| return; |
| |
| Iterator<Map.Entry<Long, Reconciler.RowState>> iter = rows.subMap(range.minBound, range.minInclusive, |
| range.maxBound, range.maxInclusive) |
| .entrySet() |
| .iterator(); |
| while (iter.hasNext()) |
| { |
| Map.Entry<Long, Reconciler.RowState> e = iter.next(); |
| if (debugCd != -1 && e.getKey() == debugCd) |
| logger.info("Hiding {} at {} because of range tombstone {}", debugCd, lts, range); |
| |
| // assert row state doesn't have fresher lts |
| iter.remove(); |
| } |
| } |
| |
| public void delete(long cd, long lts) |
| { |
| Reconciler.RowState state = rows.remove(cd); |
| if (state != null) |
| { |
| for (long v : state.lts) |
| assert lts >= v : String.format("Attempted to remove a row with a tombstone that has older timestamp (%d): %s", lts, state); |
| } |
| } |
| |
| public boolean isEmpty() |
| { |
| return rows.isEmpty(); |
| } |
| |
| private Reconciler.RowState updateRowState(Reconciler.RowState currentState, List<ColumnSpec<?>> columns, long cd, long[] vds, long lts, boolean writePrimaryKeyLiveness) |
| { |
| if (currentState == null) |
| { |
| long[] ltss = new long[vds.length]; |
| long[] vdsCopy = new long[vds.length]; |
| for (int i = 0; i < vds.length; i++) |
| { |
| if (vds[i] != UNSET_DESCR) |
| { |
| ltss[i] = lts; |
| vdsCopy[i] = vds[i]; |
| } |
| else |
| { |
| ltss[i] = NO_TIMESTAMP; |
| vdsCopy[i] = NIL_DESCR; |
| } |
| } |
| |
| currentState = new Reconciler.RowState(this, cd, vdsCopy, ltss); |
| } |
| else |
| { |
| assert currentState.vds.length == vds.length; |
| for (int i = 0; i < vds.length; i++) |
| { |
| if (vds[i] == UNSET_DESCR) |
| continue; |
| |
| assert lts >= currentState.lts[i] : String.format("Out-of-order LTS: %d. Max seen: %s", lts, currentState.lts[i]); // sanity check; we're iterating in lts order |
| |
| if (currentState.lts[i] == lts) |
| { |
| // Timestamp collision case |
| ColumnSpec<?> column = columns.get(i); |
| if (column.type.compareLexicographically(vds[i], currentState.vds[i]) > 0) |
| currentState.vds[i] = vds[i]; |
| } |
| else |
| { |
| currentState.vds[i] = vds[i]; |
| assert lts > currentState.lts[i]; |
| currentState.lts[i] = lts; |
| } |
| } |
| } |
| |
| if (writePrimaryKeyLiveness) |
| currentState.hasPrimaryKeyLivenessInfo = true; |
| |
| return currentState; |
| } |
| |
| public void deleteRegularColumns(long lts, long cd, int columnOffset, harry.util.BitSet columns, harry.util.BitSet mask) |
| { |
| deleteColumns(lts, rows.get(cd), columnOffset, columns, mask); |
| } |
| |
| public void deleteStaticColumns(long lts, int columnOffset, harry.util.BitSet columns, harry.util.BitSet mask) |
| { |
| deleteColumns(lts, staticRow, columnOffset, columns, mask); |
| } |
| |
| public void deleteColumns(long lts, Reconciler.RowState state, int columnOffset, harry.util.BitSet columns, BitSet mask) |
| { |
| if (state == null) |
| return; |
| |
| //TODO: optimise by iterating over the columns that were removed by this deletion |
| //TODO: optimise final decision to fully remove the column by counting a number of set/unset columns |
| boolean allNil = true; |
| for (int i = 0; i < state.vds.length; i++) |
| { |
| if (columns.isSet(columnOffset + i, mask)) |
| { |
| state.vds[i] = NIL_DESCR; |
| state.lts[i] = NO_TIMESTAMP; |
| } |
| else if (state.vds[i] != NIL_DESCR) |
| { |
| allNil = false; |
| } |
| } |
| |
| if (state.cd != Reconciler.STATIC_CLUSTERING && allNil & !state.hasPrimaryKeyLivenessInfo) |
| delete(state.cd, lts); |
| } |
| |
| public void deletePartition(long lts) |
| { |
| if (debugCd != -1) |
| logger.info("Hiding {} at {} because partition deletion", debugCd, lts); |
| |
| rows.clear(); |
| if (!schema.staticColumns.isEmpty()) |
| { |
| Arrays.fill(staticRow.vds, NIL_DESCR); |
| Arrays.fill(staticRow.lts, NO_TIMESTAMP); |
| } |
| } |
| |
| public Iterator<Reconciler.RowState> iterator() |
| { |
| return iterator(false); |
| } |
| |
| public Iterator<Reconciler.RowState> iterator(boolean reverse) |
| { |
| if (reverse) |
| return rows.descendingMap().values().iterator(); |
| |
| return rows.values().iterator(); |
| } |
| |
| public Collection<Reconciler.RowState> rows(boolean reverse) |
| { |
| if (reverse) |
| return rows.descendingMap().values(); |
| |
| return rows.values(); |
| } |
| |
| public Reconciler.RowState staticRow() |
| { |
| return staticRow; |
| } |
| |
| public PartitionState apply(Query query) |
| { |
| PartitionState partitionState = new PartitionState(pd, debugCd, schema); |
| partitionState.staticRow = staticRow; |
| // TODO: we could improve this if we could get original descriptors |
| for (Reconciler.RowState rowState : rows.values()) |
| if (query.match(rowState.cd)) |
| partitionState.rows.put(rowState.cd, rowState); |
| |
| return partitionState; |
| } |
| |
| public String toString(SchemaSpec schema) |
| { |
| StringBuilder sb = new StringBuilder(); |
| |
| if (staticRow != null) |
| sb.append("Static row: " + staticRow.toString(schema)).append("\n"); |
| |
| for (Reconciler.RowState row : rows.values()) |
| sb.append(row.toString(schema)).append("\n"); |
| |
| return sb.toString(); |
| } |
| } |