blob: ea82d9b27dca2334dc331c5341d1e41d6759eb4c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.streaming;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.google.common.collect.Iterables;
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.view.View;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.concurrent.Refs;
/**
* Task that manages receiving files for the session for certain ColumnFamily.
*/
public class StreamReceiveTask extends StreamTask
{
private static final Logger logger = LoggerFactory.getLogger(StreamReceiveTask.class);
private static final ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("StreamReceiveTask"));
// number of files to receive
private final int totalFiles;
// total size of files to receive
private final long totalSize;
// Transaction tracking new files received
private final LifecycleTransaction txn;
// true if task is done (either completed or aborted)
private volatile boolean done = false;
// holds references to SSTables received
protected Collection<SSTableReader> sstables;
private int remoteSSTablesReceived = 0;
public StreamReceiveTask(StreamSession session, UUID cfId, int totalFiles, long totalSize)
{
super(session, cfId);
this.totalFiles = totalFiles;
this.totalSize = totalSize;
// this is an "offline" transaction, as we currently manually expose the sstables once done;
// this should be revisited at a later date, so that LifecycleTransaction manages all sstable state changes
this.txn = LifecycleTransaction.offline(OperationType.STREAM);
this.sstables = new ArrayList<>(totalFiles);
}
/**
* Process received file.
*
* @param sstable SSTable file received.
*/
public synchronized void received(SSTableMultiWriter sstable)
{
if (done)
{
logger.warn("[{}] Received sstable {} on already finished stream received task. Aborting sstable.", session.planId(),
sstable.getFilename());
Throwables.maybeFail(sstable.abort(null));
return;
}
remoteSSTablesReceived++;
assert cfId.equals(sstable.getCfId());
Collection<SSTableReader> finished = null;
try
{
finished = sstable.finish(true);
}
catch (Throwable t)
{
Throwables.maybeFail(sstable.abort(t));
}
txn.update(finished, false);
sstables.addAll(finished);
if (remoteSSTablesReceived == totalFiles)
{
done = true;
executor.submit(new OnCompletionRunnable(this));
}
}
public int getTotalNumberOfFiles()
{
return totalFiles;
}
public long getTotalSize()
{
return totalSize;
}
/**
* @return a LifecycleNewTracker whose operations are synchronised on this StreamReceiveTask.
*/
public synchronized LifecycleNewTracker createLifecycleNewTracker()
{
if (done)
throw new RuntimeException(String.format("Stream receive task %s of cf %s already finished.", session.planId(), cfId));
return new LifecycleNewTracker()
{
@Override
public void trackNew(SSTable table)
{
synchronized (StreamReceiveTask.this)
{
txn.trackNew(table);
}
}
@Override
public void untrackNew(SSTable table)
{
synchronized (StreamReceiveTask.this)
{
txn.untrackNew(table);
}
}
public OperationType opType()
{
return txn.opType();
}
};
}
private static class OnCompletionRunnable implements Runnable
{
private final StreamReceiveTask task;
public OnCompletionRunnable(StreamReceiveTask task)
{
this.task = task;
}
public void run()
{
boolean hasViews = false;
ColumnFamilyStore cfs = null;
try
{
Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
if (kscf == null)
{
// schema was dropped during streaming
task.sstables.clear();
task.abortTransaction();
task.session.taskCompleted(task);
return;
}
cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right));
Collection<SSTableReader> readers = task.sstables;
try (Refs<SSTableReader> refs = Refs.ref(readers))
{
//We have a special path for views.
//Since the view requires cleaning up any pre-existing state, we must put
//all partitions through the same write path as normal mutations.
//This also ensures any 2is are also updated
if (hasViews)
{
for (SSTableReader reader : readers)
{
Keyspace ks = Keyspace.open(reader.getKeyspaceName());
try (ISSTableScanner scanner = reader.getScanner())
{
while (scanner.hasNext())
{
try (UnfilteredRowIterator rowIterator = scanner.next())
{
// MV *can* be applied unsafe as we flush below before transaction is done.
ks.apply(new Mutation(PartitionUpdate.fromIterator(rowIterator)), false, true, false);
}
}
}
}
}
else
{
task.finishTransaction();
logger.debug("[Stream #{}] Received {} sstables from {} ({})", task.session.planId(), readers.size(), task.session.peer, readers);
// add sstables and build secondary indexes
cfs.addSSTables(readers);
cfs.indexManager.buildAllIndexesBlocking(readers);
//invalidate row and counter cache
if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
{
List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
readers.forEach(sstable -> boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken())));
Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
if (cfs.isRowCacheEnabled())
{
int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
if (invalidatedKeys > 0)
logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
"receive task completed.", task.session.planId(), invalidatedKeys,
cfs.keyspace.getName(), cfs.getTableName());
}
if (cfs.metadata.isCounter())
{
int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
if (invalidatedKeys > 0)
logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
"receive task completed.", task.session.planId(), invalidatedKeys,
cfs.keyspace.getName(), cfs.getTableName());
}
}
}
}
task.session.taskCompleted(task);
}
catch (Throwable t)
{
JVMStabilityInspector.inspectThrowable(t);
task.session.onError(t);
}
finally
{
//We don't keep the streamed sstables since we've applied them manually
//So we abort the txn and delete the streamed sstables
if (hasViews)
{
if (cfs != null)
cfs.forceBlockingFlush();
task.abortTransaction();
}
}
}
}
/**
* Abort this task.
* If the task already received all files and
* {@link org.apache.cassandra.streaming.StreamReceiveTask.OnCompletionRunnable} task is submitted,
* then task cannot be aborted.
*/
public synchronized void abort()
{
if (done)
return;
done = true;
abortTransaction();
sstables.clear();
}
private synchronized void abortTransaction()
{
txn.abort();
}
private synchronized void finishTransaction()
{
txn.finish();
}
}