blob: f284ec0910b7b496521c3190727970c063801720 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package harry.reconciler;
import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import harry.ddl.ColumnSpec;
import harry.ddl.SchemaSpec;
import harry.operations.Query;
import harry.util.BitSet;
import harry.util.Ranges;
import static harry.generators.DataGenerators.NIL_DESCR;
import static harry.generators.DataGenerators.UNSET_DESCR;
import static harry.model.Model.NO_TIMESTAMP;
public class PartitionState implements Iterable<Reconciler.RowState>
{
private static final Logger logger = LoggerFactory.getLogger(Reconciler.class);
final long pd;
final long debugCd;
final SchemaSpec schema;
// Collected state
Reconciler.RowState staticRow;
final NavigableMap<Long, Reconciler.RowState> rows;
public PartitionState(long pd, long debugCd, SchemaSpec schema)
{
this.pd = pd;
this.rows = new TreeMap<>();
if (!schema.staticColumns.isEmpty())
{
staticRow = new Reconciler.RowState(this,
Reconciler.STATIC_CLUSTERING,
Reconciler.arr(schema.staticColumns.size(), NIL_DESCR),
Reconciler.arr(schema.staticColumns.size(), NO_TIMESTAMP));
}
this.debugCd = debugCd;
this.schema = schema;
}
public NavigableMap<Long, Reconciler.RowState> rows()
{
return rows;
}
public void writeStaticRow(long[] staticVds, long lts)
{
if (staticRow != null)
staticRow = updateRowState(staticRow, schema.staticColumns, Reconciler.STATIC_CLUSTERING, staticVds, lts, false);
}
public void write(long cd, long[] vds, long lts, boolean writePrimaryKeyLiveness)
{
rows.compute(cd, (cd_, current) -> updateRowState(current, schema.regularColumns, cd, vds, lts, writePrimaryKeyLiveness));
}
public void delete(Ranges.Range range, long lts)
{
if (range.minBound > range.maxBound)
return;
Iterator<Map.Entry<Long, Reconciler.RowState>> iter = rows.subMap(range.minBound, range.minInclusive,
range.maxBound, range.maxInclusive)
.entrySet()
.iterator();
while (iter.hasNext())
{
Map.Entry<Long, Reconciler.RowState> e = iter.next();
if (debugCd != -1 && e.getKey() == debugCd)
logger.info("Hiding {} at {} because of range tombstone {}", debugCd, lts, range);
// assert row state doesn't have fresher lts
iter.remove();
}
}
public void delete(long cd, long lts)
{
Reconciler.RowState state = rows.remove(cd);
if (state != null)
{
for (long v : state.lts)
assert lts >= v : String.format("Attempted to remove a row with a tombstone that has older timestamp (%d): %s", lts, state);
}
}
public boolean isEmpty()
{
return rows.isEmpty();
}
private Reconciler.RowState updateRowState(Reconciler.RowState currentState, List<ColumnSpec<?>> columns, long cd, long[] vds, long lts, boolean writePrimaryKeyLiveness)
{
if (currentState == null)
{
long[] ltss = new long[vds.length];
long[] vdsCopy = new long[vds.length];
for (int i = 0; i < vds.length; i++)
{
if (vds[i] != UNSET_DESCR)
{
ltss[i] = lts;
vdsCopy[i] = vds[i];
}
else
{
ltss[i] = NO_TIMESTAMP;
vdsCopy[i] = NIL_DESCR;
}
}
currentState = new Reconciler.RowState(this, cd, vdsCopy, ltss);
}
else
{
assert currentState.vds.length == vds.length;
for (int i = 0; i < vds.length; i++)
{
if (vds[i] == UNSET_DESCR)
continue;
assert lts >= currentState.lts[i] : String.format("Out-of-order LTS: %d. Max seen: %s", lts, currentState.lts[i]); // sanity check; we're iterating in lts order
if (currentState.lts[i] == lts)
{
// Timestamp collision case
ColumnSpec<?> column = columns.get(i);
if (column.type.compareLexicographically(vds[i], currentState.vds[i]) > 0)
currentState.vds[i] = vds[i];
}
else
{
currentState.vds[i] = vds[i];
assert lts > currentState.lts[i];
currentState.lts[i] = lts;
}
}
}
if (writePrimaryKeyLiveness)
currentState.hasPrimaryKeyLivenessInfo = true;
return currentState;
}
public void deleteRegularColumns(long lts, long cd, int columnOffset, harry.util.BitSet columns, harry.util.BitSet mask)
{
deleteColumns(lts, rows.get(cd), columnOffset, columns, mask);
}
public void deleteStaticColumns(long lts, int columnOffset, harry.util.BitSet columns, harry.util.BitSet mask)
{
deleteColumns(lts, staticRow, columnOffset, columns, mask);
}
public void deleteColumns(long lts, Reconciler.RowState state, int columnOffset, harry.util.BitSet columns, BitSet mask)
{
if (state == null)
return;
//TODO: optimise by iterating over the columns that were removed by this deletion
//TODO: optimise final decision to fully remove the column by counting a number of set/unset columns
boolean allNil = true;
for (int i = 0; i < state.vds.length; i++)
{
if (columns.isSet(columnOffset + i, mask))
{
state.vds[i] = NIL_DESCR;
state.lts[i] = NO_TIMESTAMP;
}
else if (state.vds[i] != NIL_DESCR)
{
allNil = false;
}
}
if (state.cd != Reconciler.STATIC_CLUSTERING && allNil & !state.hasPrimaryKeyLivenessInfo)
delete(state.cd, lts);
}
public void deletePartition(long lts)
{
if (debugCd != -1)
logger.info("Hiding {} at {} because partition deletion", debugCd, lts);
rows.clear();
if (!schema.staticColumns.isEmpty())
{
Arrays.fill(staticRow.vds, NIL_DESCR);
Arrays.fill(staticRow.lts, NO_TIMESTAMP);
}
}
public Iterator<Reconciler.RowState> iterator()
{
return iterator(false);
}
public Iterator<Reconciler.RowState> iterator(boolean reverse)
{
if (reverse)
return rows.descendingMap().values().iterator();
return rows.values().iterator();
}
public Collection<Reconciler.RowState> rows(boolean reverse)
{
if (reverse)
return rows.descendingMap().values();
return rows.values();
}
public Reconciler.RowState staticRow()
{
return staticRow;
}
public PartitionState apply(Query query)
{
PartitionState partitionState = new PartitionState(pd, debugCd, schema);
partitionState.staticRow = staticRow;
// TODO: we could improve this if we could get original descriptors
for (Reconciler.RowState rowState : rows.values())
if (query.match(rowState.cd))
partitionState.rows.put(rowState.cd, rowState);
return partitionState;
}
public String toString(SchemaSpec schema)
{
StringBuilder sb = new StringBuilder();
if (staticRow != null)
sb.append("Static row: " + staticRow.toString(schema)).append("\n");
for (Reconciler.RowState row : rows.values())
sb.append(row.toString(schema)).append("\n");
return sb.toString();
}
}