blob: 60845fa022dbb2b8970158ed27c66892b410d88a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.streaming;
import java.util.*;
import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.utils.UUIDGen;
import static com.google.common.collect.Iterables.all;
import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
/**
* {@link StreamPlan} is a helper class that builds StreamOperation of given configuration.
*
* This is the class you want to use for building streaming plan and starting streaming.
*/
public class StreamPlan
{
private static final String[] EMPTY_COLUMN_FAMILIES = new String[0];
private final UUID planId = UUIDGen.getTimeUUID();
private final StreamOperation streamOperation;
private final List<StreamEventHandler> handlers = new ArrayList<>();
private final StreamCoordinator coordinator;
private boolean flushBeforeTransfer = true;
/**
* Start building stream plan.
*
* @param streamOperation Stream streamOperation that describes this StreamPlan
*/
public StreamPlan(StreamOperation streamOperation)
{
this(streamOperation, 1, false, NO_PENDING_REPAIR, PreviewKind.NONE);
}
public StreamPlan(StreamOperation streamOperation, boolean connectSequentially)
{
this(streamOperation, 1, connectSequentially, NO_PENDING_REPAIR, PreviewKind.NONE);
}
public StreamPlan(StreamOperation streamOperation, int connectionsPerHost,
boolean connectSequentially, UUID pendingRepair, PreviewKind previewKind)
{
this.streamOperation = streamOperation;
this.coordinator = new StreamCoordinator(streamOperation, connectionsPerHost, new DefaultConnectionFactory(),
false, connectSequentially, pendingRepair, previewKind);
}
/**
* Request data in {@code keyspace} and {@code ranges} from specific node.
*
* Here, we have to encode both _local_ range transientness (encoded in Replica itself, in RangesAtEndpoint)
* and _remote_ (source) range transientmess, which is encoded by splitting ranges into full and transient.
*
* At the other end the distinction between full and transient is ignored it just used the transient status
* of the Replica objects we send to determine what to send. The real reason we have this split down to
* StreamRequest is that on completion StreamRequest is used to write to the system table tracking
* what has already been streamed. At that point since we only have the local Replica instances so we don't
* know what we got from the remote. We preserve that here by splitting based on the remotes transient
* status.
*
* @param from endpoint address to fetch data from.
* @param keyspace name of keyspace
* @param fullRanges ranges to fetch that from provides the full version of
* @param transientRanges ranges to fetch that from provides only transient data of
* @return this object for chaining
*/
public StreamPlan requestRanges(InetAddressAndPort from, String keyspace, RangesAtEndpoint fullRanges, RangesAtEndpoint transientRanges)
{
return requestRanges(from, keyspace, fullRanges, transientRanges, EMPTY_COLUMN_FAMILIES);
}
/**
* Request data in {@code columnFamilies} under {@code keyspace} and {@code ranges} from specific node.
*
* @param from endpoint address to fetch data from.
* @param keyspace name of keyspace
* @param fullRanges ranges to fetch that from provides the full data for
* @param transientRanges ranges to fetch that from provides only transient data for
* @param columnFamilies specific column families
* @return this object for chaining
*/
public StreamPlan requestRanges(InetAddressAndPort from, String keyspace, RangesAtEndpoint fullRanges, RangesAtEndpoint transientRanges, String... columnFamilies)
{
//It should either be a dummy address for repair or if it's a bootstrap/move/rebuild it should be this node
assert all(fullRanges, Replica::isSelf) || RangesAtEndpoint.isDummyList(fullRanges) : fullRanges.toString();
assert all(transientRanges, Replica::isSelf) || RangesAtEndpoint.isDummyList(transientRanges) : transientRanges.toString();
StreamSession session = coordinator.getOrCreateNextSession(from);
session.addStreamRequest(keyspace, fullRanges, transientRanges, Arrays.asList(columnFamilies));
return this;
}
/**
* Add transfer task to send data of specific {@code columnFamilies} under {@code keyspace} and {@code ranges}.
*
* @param to endpoint address of receiver
* @param keyspace name of keyspace
* @param replicas ranges to send
* @param columnFamilies specific column families
* @return this object for chaining
*/
public StreamPlan transferRanges(InetAddressAndPort to, String keyspace, RangesAtEndpoint replicas, String... columnFamilies)
{
StreamSession session = coordinator.getOrCreateNextSession(to);
session.addTransferRanges(keyspace, replicas, Arrays.asList(columnFamilies), flushBeforeTransfer);
return this;
}
/**
* Add transfer task to send given streams
*
* @param to endpoint address of receiver
* @param streams streams to send
* @return this object for chaining
*/
public StreamPlan transferStreams(InetAddressAndPort to, Collection<OutgoingStream> streams)
{
coordinator.transferStreams(to, streams);
return this;
}
public StreamPlan listeners(StreamEventHandler handler, StreamEventHandler... handlers)
{
this.handlers.add(handler);
if (handlers != null)
Collections.addAll(this.handlers, handlers);
return this;
}
/**
* Set custom StreamConnectionFactory to be used for establishing connection
*
* @param factory StreamConnectionFactory to use
* @return self
*/
public StreamPlan connectionFactory(StreamConnectionFactory factory)
{
this.coordinator.setConnectionFactory(factory);
return this;
}
/**
* @return true if this plan has no plan to execute
*/
public boolean isEmpty()
{
return !coordinator.hasActiveSessions();
}
/**
* Execute this {@link StreamPlan} asynchronously.
*
* @return Future {@link StreamState} that you can use to listen on progress of streaming.
*/
public StreamResultFuture execute()
{
return StreamResultFuture.createInitiator(planId, streamOperation, handlers, coordinator);
}
/**
* Set flushBeforeTransfer option.
* When it's true, will flush before streaming ranges. (Default: true)
*
* @param flushBeforeTransfer set to true when the node should flush before transfer
* @return this object for chaining
*/
public StreamPlan flushBeforeTransfer(boolean flushBeforeTransfer)
{
this.flushBeforeTransfer = flushBeforeTransfer;
return this;
}
public UUID getPendingRepair()
{
return coordinator.getPendingRepair();
}
public boolean getFlushBeforeTransfer()
{
return flushBeforeTransfer;
}
@VisibleForTesting
public StreamCoordinator getCoordinator()
{
return coordinator;
}
}