blob: f284ec0910b7b496521c3190727970c063801720 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package harry.reconciler;
import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import harry.ddl.ColumnSpec;
import harry.ddl.SchemaSpec;
import harry.operations.Query;
import harry.util.BitSet;
import harry.util.Ranges;
import static harry.generators.DataGenerators.NIL_DESCR;
import static harry.generators.DataGenerators.UNSET_DESCR;
import static harry.model.Model.NO_TIMESTAMP;
public class PartitionState implements Iterable<Reconciler.RowState>
private static final Logger logger = LoggerFactory.getLogger(Reconciler.class);
final long pd;
final long debugCd;
final SchemaSpec schema;
// Collected state
Reconciler.RowState staticRow;
final NavigableMap<Long, Reconciler.RowState> rows;
public PartitionState(long pd, long debugCd, SchemaSpec schema)
this.pd = pd;
this.rows = new TreeMap<>();
if (!schema.staticColumns.isEmpty())
staticRow = new Reconciler.RowState(this,
Reconciler.arr(schema.staticColumns.size(), NIL_DESCR),
Reconciler.arr(schema.staticColumns.size(), NO_TIMESTAMP));
this.debugCd = debugCd;
this.schema = schema;
public NavigableMap<Long, Reconciler.RowState> rows()
return rows;
public void writeStaticRow(long[] staticVds, long lts)
if (staticRow != null)
staticRow = updateRowState(staticRow, schema.staticColumns, Reconciler.STATIC_CLUSTERING, staticVds, lts, false);
public void write(long cd, long[] vds, long lts, boolean writePrimaryKeyLiveness)
rows.compute(cd, (cd_, current) -> updateRowState(current, schema.regularColumns, cd, vds, lts, writePrimaryKeyLiveness));
public void delete(Ranges.Range range, long lts)
if (range.minBound > range.maxBound)
Iterator<Map.Entry<Long, Reconciler.RowState>> iter = rows.subMap(range.minBound, range.minInclusive,
range.maxBound, range.maxInclusive)
while (iter.hasNext())
Map.Entry<Long, Reconciler.RowState> e =;
if (debugCd != -1 && e.getKey() == debugCd)"Hiding {} at {} because of range tombstone {}", debugCd, lts, range);
// assert row state doesn't have fresher lts
public void delete(long cd, long lts)
Reconciler.RowState state = rows.remove(cd);
if (state != null)
for (long v : state.lts)
assert lts >= v : String.format("Attempted to remove a row with a tombstone that has older timestamp (%d): %s", lts, state);
public boolean isEmpty()
return rows.isEmpty();
private Reconciler.RowState updateRowState(Reconciler.RowState currentState, List<ColumnSpec<?>> columns, long cd, long[] vds, long lts, boolean writePrimaryKeyLiveness)
if (currentState == null)
long[] ltss = new long[vds.length];
long[] vdsCopy = new long[vds.length];
for (int i = 0; i < vds.length; i++)
if (vds[i] != UNSET_DESCR)
ltss[i] = lts;
vdsCopy[i] = vds[i];
ltss[i] = NO_TIMESTAMP;
vdsCopy[i] = NIL_DESCR;
currentState = new Reconciler.RowState(this, cd, vdsCopy, ltss);
assert currentState.vds.length == vds.length;
for (int i = 0; i < vds.length; i++)
if (vds[i] == UNSET_DESCR)
assert lts >= currentState.lts[i] : String.format("Out-of-order LTS: %d. Max seen: %s", lts, currentState.lts[i]); // sanity check; we're iterating in lts order
if (currentState.lts[i] == lts)
// Timestamp collision case
ColumnSpec<?> column = columns.get(i);
if (column.type.compareLexicographically(vds[i], currentState.vds[i]) > 0)
currentState.vds[i] = vds[i];
currentState.vds[i] = vds[i];
assert lts > currentState.lts[i];
currentState.lts[i] = lts;
if (writePrimaryKeyLiveness)
currentState.hasPrimaryKeyLivenessInfo = true;
return currentState;
public void deleteRegularColumns(long lts, long cd, int columnOffset, harry.util.BitSet columns, harry.util.BitSet mask)
deleteColumns(lts, rows.get(cd), columnOffset, columns, mask);
public void deleteStaticColumns(long lts, int columnOffset, harry.util.BitSet columns, harry.util.BitSet mask)
deleteColumns(lts, staticRow, columnOffset, columns, mask);
public void deleteColumns(long lts, Reconciler.RowState state, int columnOffset, harry.util.BitSet columns, BitSet mask)
if (state == null)
//TODO: optimise by iterating over the columns that were removed by this deletion
//TODO: optimise final decision to fully remove the column by counting a number of set/unset columns
boolean allNil = true;
for (int i = 0; i < state.vds.length; i++)
if (columns.isSet(columnOffset + i, mask))
state.vds[i] = NIL_DESCR;
state.lts[i] = NO_TIMESTAMP;
else if (state.vds[i] != NIL_DESCR)
allNil = false;
if ( != Reconciler.STATIC_CLUSTERING && allNil & !state.hasPrimaryKeyLivenessInfo)
delete(, lts);
public void deletePartition(long lts)
if (debugCd != -1)"Hiding {} at {} because partition deletion", debugCd, lts);
if (!schema.staticColumns.isEmpty())
Arrays.fill(staticRow.vds, NIL_DESCR);
Arrays.fill(staticRow.lts, NO_TIMESTAMP);
public Iterator<Reconciler.RowState> iterator()
return iterator(false);
public Iterator<Reconciler.RowState> iterator(boolean reverse)
if (reverse)
return rows.descendingMap().values().iterator();
return rows.values().iterator();
public Collection<Reconciler.RowState> rows(boolean reverse)
if (reverse)
return rows.descendingMap().values();
return rows.values();
public Reconciler.RowState staticRow()
return staticRow;
public PartitionState apply(Query query)
PartitionState partitionState = new PartitionState(pd, debugCd, schema);
partitionState.staticRow = staticRow;
// TODO: we could improve this if we could get original descriptors
for (Reconciler.RowState rowState : rows.values())
if (query.match(
partitionState.rows.put(, rowState);
return partitionState;
public String toString(SchemaSpec schema)
StringBuilder sb = new StringBuilder();
if (staticRow != null)
sb.append("Static row: " + staticRow.toString(schema)).append("\n");
for (Reconciler.RowState row : rows.values())
return sb.toString();