blob: c84c6978dcce33e4850c14e584c828d48af8402d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.view;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import com.google.common.util.concurrent.Futures;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.DeletionTime;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.ReadExecutionController;
import org.apache.cassandra.db.ReadQuery;
import org.apache.cassandra.db.SinglePartitionReadCommand;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.compaction.CompactionInfo;
import org.apache.cassandra.db.compaction.CompactionInfo.Unit;
import org.apache.cassandra.db.compaction.CompactionInterruptedException;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
import org.apache.cassandra.db.rows.Rows;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.sstable.ReducingKeyIterator;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.concurrent.Refs;
public class ViewBuilderTask extends CompactionInfo.Holder implements Callable<Long>
{
private static final Logger logger = LoggerFactory.getLogger(ViewBuilderTask.class);
private static final int ROWS_BETWEEN_CHECKPOINTS = 1000;
private final ColumnFamilyStore baseCfs;
private final View view;
private final Range<Token> range;
private final UUID compactionId;
private volatile Token prevToken;
private volatile long keysBuilt = 0;
private volatile boolean isStopped = false;
private volatile boolean isCompactionInterrupted = false;
@VisibleForTesting
public ViewBuilderTask(ColumnFamilyStore baseCfs, View view, Range<Token> range, Token lastToken, long keysBuilt)
{
this.baseCfs = baseCfs;
this.view = view;
this.range = range;
this.compactionId = UUIDGen.getTimeUUID();
this.prevToken = lastToken;
this.keysBuilt = keysBuilt;
}
@SuppressWarnings("resource")
private void buildKey(DecoratedKey key)
{
ReadQuery selectQuery = view.getReadQuery();
if (!selectQuery.selectsKey(key))
{
logger.trace("Skipping {}, view query filters", key);
return;
}
int nowInSec = FBUtilities.nowInSeconds();
SinglePartitionReadCommand command = view.getSelectStatement().internalReadForView(key, nowInSec);
// We're rebuilding everything from what's on disk, so we read everything, consider that as new updates
// and pretend that there is nothing pre-existing.
UnfilteredRowIterator empty = UnfilteredRowIterators.noRowsIterator(baseCfs.metadata(), key, Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE, false);
try (ReadExecutionController orderGroup = command.executionController();
UnfilteredRowIterator data = UnfilteredPartitionIterators.getOnlyElement(command.executeLocally(orderGroup), command))
{
Iterator<Collection<Mutation>> mutations = baseCfs.keyspace.viewManager
.forTable(baseCfs.metadata.id)
.generateViewUpdates(Collections.singleton(view), data, empty, nowInSec, true);
AtomicLong noBase = new AtomicLong(Long.MAX_VALUE);
mutations.forEachRemaining(m -> StorageProxy.mutateMV(key.getKey(), m, true, noBase, System.nanoTime()));
}
}
public Long call()
{
String ksName = baseCfs.metadata.keyspace;
if (prevToken == null)
logger.debug("Starting new view build for range {}", range);
else
logger.debug("Resuming view build for range {} from token {} with {} covered keys", range, prevToken, keysBuilt);
/*
* It's possible for view building to start before MV creation got propagated to other nodes. For this reason
* we should wait for schema to converge before attempting to send any view mutations to other nodes, or else
* face UnknownTableException upon Mutation deserialization on the nodes that haven't processed the schema change.
*/
boolean schemaConverged = Gossiper.instance.waitForSchemaAgreement(10, TimeUnit.SECONDS, () -> this.isStopped);
if (!schemaConverged)
logger.warn("Failed to get schema to converge before building view {}.{}", baseCfs.keyspace.getName(), view.name);
Function<org.apache.cassandra.db.lifecycle.View, Iterable<SSTableReader>> function;
function = org.apache.cassandra.db.lifecycle.View.select(SSTableSet.CANONICAL, s -> range.intersects(s.getBounds()));
try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(function);
Refs<SSTableReader> sstables = viewFragment.refs;
ReducingKeyIterator keyIter = new ReducingKeyIterator(sstables))
{
PeekingIterator<DecoratedKey> iter = Iterators.peekingIterator(keyIter);
while (!isStopped && iter.hasNext())
{
DecoratedKey key = iter.next();
Token token = key.getToken();
//skip tokens already built or not present in range
if (range.contains(token) && (prevToken == null || token.compareTo(prevToken) > 0))
{
buildKey(key);
++keysBuilt;
//build other keys sharing the same token
while (iter.hasNext() && iter.peek().getToken().equals(token))
{
key = iter.next();
buildKey(key);
++keysBuilt;
}
if (keysBuilt % ROWS_BETWEEN_CHECKPOINTS == 1)
SystemKeyspace.updateViewBuildStatus(ksName, view.name, range, token, keysBuilt);
prevToken = token;
}
}
}
finish();
return keysBuilt;
}
private void finish()
{
String ksName = baseCfs.keyspace.getName();
if (!isStopped)
{
// Save the completed status using the end of the range as last token. This way it will be possible for
// future view build attempts to don't even create a task for this range
SystemKeyspace.updateViewBuildStatus(ksName, view.name, range, range.right, keysBuilt);
logger.debug("Completed build of view({}.{}) for range {} after covering {} keys ", ksName, view.name, range, keysBuilt);
}
else
{
logger.debug("Stopped build for view({}.{}) for range {} after covering {} keys", ksName, view.name, range, keysBuilt);
// If it's stopped due to a compaction interruption we should throw that exception.
// Otherwise we assume that the task has been stopped due to a schema update and we can finish successfully.
if (isCompactionInterrupted)
throw new StoppedException(ksName, view.name, getCompactionInfo());
}
}
@Override
public CompactionInfo getCompactionInfo()
{
// we don't know the sstables at construction of ViewBuilderTask and we could change this to return once we know the
// but since we basically only cancel view builds on truncation where we cancel all compactions anyway, this seems reasonable
// If there's splitter, calculate progress based on last token position
if (range.left.getPartitioner().splitter().isPresent())
{
long progress = prevToken == null ? 0 : Math.round(prevToken.getPartitioner().splitter().get().positionInRange(prevToken, range) * 1000);
return CompactionInfo.withoutSSTables(baseCfs.metadata(), OperationType.VIEW_BUILD, progress, 1000, Unit.RANGES, compactionId);
}
// When there is no splitter, estimate based on number of total keys but
// take the max with keysBuilt + 1 to avoid having more completed than total
long keysTotal = Math.max(keysBuilt + 1, baseCfs.estimatedKeysForRange(range));
return CompactionInfo.withoutSSTables(baseCfs.metadata(), OperationType.VIEW_BUILD, keysBuilt, keysTotal, Unit.KEYS, compactionId);
}
@Override
public void stop()
{
stop(true);
}
public boolean isGlobal()
{
return false;
}
synchronized void stop(boolean isCompactionInterrupted)
{
isStopped = true;
this.isCompactionInterrupted = isCompactionInterrupted;
}
long keysBuilt()
{
return keysBuilt;
}
/**
* {@link CompactionInterruptedException} with {@link Object#equals(Object)} and {@link Object#hashCode()}
* implementations that consider equals all the exceptions produced by the same view build, independently of their
* token range.
* <p>
* This is used to avoid Guava's {@link Futures#allAsList(Iterable)} log spamming when multiple build tasks fail
* due to compaction interruption.
*/
static class StoppedException extends CompactionInterruptedException
{
private final String ksName, viewName;
private StoppedException(String ksName, String viewName, CompactionInfo info)
{
super(info);
this.ksName = ksName;
this.viewName = viewName;
}
@Override
public boolean equals(Object o)
{
if (!(o instanceof StoppedException))
return false;
StoppedException that = (StoppedException) o;
return Objects.equal(this.ksName, that.ksName) && Objects.equal(this.viewName, that.viewName);
}
@Override
public int hashCode()
{
return 31 * ksName.hashCode() + viewName.hashCode();
}
}
}