blob: e9d43cb540b7fceb91a1b740716918d17b7fff23 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.streaming;
import java.net.InetAddress;
import java.util.*;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.UUIDGen;
/**
* {@link StreamPlan} is a helper class that builds StreamOperation of given configuration.
*
* This is the class you want to use for building streaming plan and starting streaming.
*/
public class StreamPlan
{
public static final String[] EMPTY_COLUMN_FAMILIES = new String[0];
private final UUID planId = UUIDGen.getTimeUUID();
private final String description;
private final List<StreamEventHandler> handlers = new ArrayList<>();
private final long repairedAt;
private final StreamCoordinator coordinator;
private boolean flushBeforeTransfer = true;
/**
* Start building stream plan.
*
* @param description Stream type that describes this StreamPlan
*/
public StreamPlan(String description)
{
this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, false, false, false);
}
public StreamPlan(String description, boolean keepSSTableLevels, boolean connectSequentially)
{
this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, keepSSTableLevels, false, connectSequentially);
}
public StreamPlan(String description, long repairedAt, int connectionsPerHost, boolean keepSSTableLevels,
boolean isIncremental, boolean connectSequentially)
{
this.description = description;
this.repairedAt = repairedAt;
this.coordinator = new StreamCoordinator(connectionsPerHost, keepSSTableLevels, isIncremental, new DefaultConnectionFactory(),
connectSequentially);
}
/**
* Request data in {@code keyspace} and {@code ranges} from specific node.
*
* @param from endpoint address to fetch data from.
* @param connecting Actual connecting address for the endpoint
* @param keyspace name of keyspace
* @param ranges ranges to fetch
* @return this object for chaining
*/
public StreamPlan requestRanges(InetAddress from, InetAddress connecting, String keyspace, Collection<Range<Token>> ranges)
{
return requestRanges(from, connecting, keyspace, ranges, EMPTY_COLUMN_FAMILIES);
}
/**
* Request data in {@code columnFamilies} under {@code keyspace} and {@code ranges} from specific node.
*
* @param from endpoint address to fetch data from.
* @param connecting Actual connecting address for the endpoint
* @param keyspace name of keyspace
* @param ranges ranges to fetch
* @param columnFamilies specific column families
* @return this object for chaining
*/
public StreamPlan requestRanges(InetAddress from, InetAddress connecting, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
{
StreamSession session = coordinator.getOrCreateNextSession(from, connecting);
session.addStreamRequest(keyspace, ranges, Arrays.asList(columnFamilies), repairedAt);
return this;
}
/**
* Add transfer task to send data of specific {@code columnFamilies} under {@code keyspace} and {@code ranges}.
*
* @see #transferRanges(java.net.InetAddress, java.net.InetAddress, String, java.util.Collection, String...)
*/
public StreamPlan transferRanges(InetAddress to, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
{
return transferRanges(to, to, keyspace, ranges, columnFamilies);
}
/**
* Add transfer task to send data of specific keyspace and ranges.
*
* @param to endpoint address of receiver
* @param connecting Actual connecting address of the endpoint
* @param keyspace name of keyspace
* @param ranges ranges to send
* @return this object for chaining
*/
public StreamPlan transferRanges(InetAddress to, InetAddress connecting, String keyspace, Collection<Range<Token>> ranges)
{
return transferRanges(to, connecting, keyspace, ranges, EMPTY_COLUMN_FAMILIES);
}
/**
* Add transfer task to send data of specific {@code columnFamilies} under {@code keyspace} and {@code ranges}.
*
* @param to endpoint address of receiver
* @param connecting Actual connecting address of the endpoint
* @param keyspace name of keyspace
* @param ranges ranges to send
* @param columnFamilies specific column families
* @return this object for chaining
*/
public StreamPlan transferRanges(InetAddress to, InetAddress connecting, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
{
StreamSession session = coordinator.getOrCreateNextSession(to, connecting);
session.addTransferRanges(keyspace, ranges, Arrays.asList(columnFamilies), flushBeforeTransfer, repairedAt);
return this;
}
/**
* Add transfer task to send given SSTable files.
*
* @param to endpoint address of receiver
* @param sstableDetails sstables with file positions and estimated key count.
* this collection will be modified to remove those files that are successfully handed off
* @return this object for chaining
*/
public StreamPlan transferFiles(InetAddress to, Collection<StreamSession.SSTableStreamingSections> sstableDetails)
{
coordinator.transferFiles(to, sstableDetails);
return this;
}
public StreamPlan listeners(StreamEventHandler handler, StreamEventHandler... handlers)
{
this.handlers.add(handler);
if (handlers != null)
Collections.addAll(this.handlers, handlers);
return this;
}
/**
* Set custom StreamConnectionFactory to be used for establishing connection
*
* @param factory StreamConnectionFactory to use
* @return self
*/
public StreamPlan connectionFactory(StreamConnectionFactory factory)
{
this.coordinator.setConnectionFactory(factory);
return this;
}
/**
* @return true if this plan has no plan to execute
*/
public boolean isEmpty()
{
return !coordinator.hasActiveSessions();
}
/**
* Execute this {@link StreamPlan} asynchronously.
*
* @return Future {@link StreamState} that you can use to listen on progress of streaming.
*/
public StreamResultFuture execute()
{
return StreamResultFuture.init(planId, description, handlers, coordinator);
}
/**
* Set flushBeforeTransfer option.
* When it's true, will flush before streaming ranges. (Default: true)
*
* @param flushBeforeTransfer set to true when the node should flush before transfer
* @return this object for chaining
*/
public StreamPlan flushBeforeTransfer(boolean flushBeforeTransfer)
{
this.flushBeforeTransfer = flushBeforeTransfer;
return this;
}
}