blob: 5e3b5404e8125073e4550ebc1bd2b19391274ce6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.service.paxos.uncommitted;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Callables;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.Operator;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.filter.RowFilter;
import org.apache.cassandra.db.lifecycle.View;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.memtable.Memtable;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.index.IndexRegistry;
import org.apache.cassandra.index.transactions.IndexTransaction;
import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.schema.Indexes;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.utils.CloseableIterator;
import static java.util.Collections.*;
import static org.apache.cassandra.schema.SchemaConstants.SYSTEM_KEYSPACE_NAME;
import static org.apache.cassandra.service.paxos.PaxosState.ballotTracker;
import static org.apache.cassandra.service.paxos.PaxosState.uncommittedTracker;
/**
* A 2i implementation made specifically for system.paxos that listens for changes to paxos state by interpreting
* mutations against system.paxos and updates the uncommitted tracker accordingly.
*
* No read expressions are supported by the index.
*
* This is implemented as a 2i so it can piggy back off the commit log and paxos table flushes, and avoid worrying
* about implementing a parallel log/flush system for the tracker and potential bugs there. It also means we don't
* have to worry about cases where the tracker can become out of sync with the paxos table due to failure/edge cases
* outside of the PaxosTableState class itself.
*/
public class PaxosUncommittedIndex implements Index, PaxosUncommittedTracker.UpdateSupplier
{
public final ColumnFamilyStore baseCfs;
protected IndexMetadata metadata;
private static final DataRange FULL_RANGE = DataRange.allData(DatabaseDescriptor.getPartitioner());
private final ColumnFilter memtableColumnFilter;
public PaxosUncommittedIndex(ColumnFamilyStore baseTable, IndexMetadata metadata)
{
Preconditions.checkState(baseTable.metadata.keyspace.equals(SYSTEM_KEYSPACE_NAME));
Preconditions.checkState(baseTable.metadata.name.equals(SystemKeyspace.PAXOS));
this.baseCfs = baseTable;
this.metadata = metadata;
this.memtableColumnFilter = ColumnFilter.all(baseTable.metadata.get());
PaxosUncommittedTracker.unsafSetUpdateSupplier(this);
}
public static IndexMetadata indexMetadata()
{
Map<String, String> options = new HashMap<>();
options.put("class_name", PaxosUncommittedIndex.class.getName());
options.put("target", "");
return IndexMetadata.fromSchemaMetadata("PaxosUncommittedIndex", IndexMetadata.Kind.CUSTOM, options);
}
public static Indexes indexes()
{
return Indexes.builder().add(indexMetadata()).build();
}
public Callable<?> getInitializationTask()
{
return Callables.returning(null);
}
public IndexMetadata getIndexMetadata()
{
return metadata;
}
public Callable<?> getMetadataReloadTask(IndexMetadata indexMetadata)
{
return Callables.returning(null);
}
public void register(IndexRegistry registry)
{
registry.registerIndex(this);
}
public Optional<ColumnFamilyStore> getBackingTable()
{
return Optional.empty();
}
private CloseableIterator<PaxosKeyState> getPaxosUpdates(List<UnfilteredPartitionIterator> iterators, TableId filterByTableId, boolean materializeLazily)
{
Preconditions.checkArgument((filterByTableId == null) == materializeLazily);
return PaxosRows.toIterator(UnfilteredPartitionIterators.merge(iterators, UnfilteredPartitionIterators.MergeListener.NOOP), filterByTableId, materializeLazily);
}
public CloseableIterator<PaxosKeyState> repairIterator(TableId tableId, Collection<Range<Token>> ranges)
{
Preconditions.checkNotNull(tableId);
View view = baseCfs.getTracker().getView();
List<Memtable> memtables = view.flushingMemtables.isEmpty()
? view.liveMemtables
: ImmutableList.<Memtable>builder().addAll(view.flushingMemtables).addAll(view.liveMemtables).build();
List<DataRange> dataRanges = ranges.stream().map(DataRange::forTokenRange).collect(Collectors.toList());
List<UnfilteredPartitionIterator> iters = new ArrayList<>(memtables.size() * ranges.size());
for (int j=0, jsize=dataRanges.size(); j<jsize; j++)
{
for (int i=0, isize=memtables.size(); i<isize; i++)
iters.add(memtables.get(i).partitionIterator(memtableColumnFilter, dataRanges.get(j), SSTableReadsListener.NOOP_LISTENER));
}
return getPaxosUpdates(iters, tableId, false);
}
public CloseableIterator<PaxosKeyState> flushIterator(Memtable flushing)
{
List<UnfilteredPartitionIterator> iters = singletonList(flushing.partitionIterator(memtableColumnFilter, FULL_RANGE, SSTableReadsListener.NOOP_LISTENER));
return getPaxosUpdates(iters, null, true);
}
public Callable<?> getBlockingFlushTask()
{
return (Callable<Object>) () -> {
ballotTracker().flush();
return null;
};
}
public Callable<?> getBlockingFlushTask(Memtable paxos)
{
return (Callable<Object>) () -> {
uncommittedTracker().flushUpdates(paxos);
ballotTracker().flush();
return null;
};
}
public Callable<?> getInvalidateTask()
{
return (Callable<Object>) () -> {
uncommittedTracker().truncate();
ballotTracker().truncate();
return null;
};
}
public Callable<?> getTruncateTask(long truncatedAt)
{
return (Callable<Object>) () -> {
uncommittedTracker().truncate();
ballotTracker().truncate();
return null;
};
}
public boolean shouldBuildBlocking()
{
return false;
}
public boolean dependsOn(ColumnMetadata column)
{
return false;
}
public boolean supportsExpression(ColumnMetadata column, Operator operator)
{
// should prevent this from ever being used
return false;
}
public AbstractType<?> customExpressionValueType()
{
return null;
}
public RowFilter getPostIndexQueryFilter(RowFilter filter)
{
return null;
}
public long getEstimatedResultRows()
{
return 0;
}
public void validate(PartitionUpdate update) throws InvalidRequestException
{
}
public Indexer indexerFor(DecoratedKey key, RegularAndStaticColumns columns, int nowInSec, WriteContext ctx, IndexTransaction.Type transactionType)
{
return indexer;
}
public BiFunction<PartitionIterator, ReadCommand, PartitionIterator> postProcessorFor(ReadCommand command)
{
return null;
}
public Searcher searcherFor(ReadCommand command)
{
throw new UnsupportedOperationException();
}
private final Indexer indexer = new Indexer()
{
public void begin() {}
public void partitionDelete(DeletionTime deletionTime) {}
public void rangeTombstone(RangeTombstone tombstone) {}
public void insertRow(Row row)
{
ballotTracker().onUpdate(row);
}
public void updateRow(Row oldRowData, Row newRowData)
{
ballotTracker().onUpdate(newRowData);
}
public void removeRow(Row row) {}
public void finish() {}
};
}