blob: 774d3a3b620c08fc5dd4aca1c651d1005c9dbe35 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.cassandra;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
import org.apache.flink.types.Row;
import com.datastax.driver.core.Cluster;
import scala.Product;
/**
* This class wraps different Cassandra sink implementations to provide a common interface for all of them.
*
* @param <IN> input type
*/
public class CassandraSink<IN> {
private final boolean useDataStreamSink;
private DataStreamSink<IN> sink1;
private SingleOutputStreamOperator<IN> sink2;
private CassandraSink(DataStreamSink<IN> sink) {
sink1 = sink;
useDataStreamSink = true;
}
private CassandraSink(SingleOutputStreamOperator<IN> sink) {
sink2 = sink;
useDataStreamSink = false;
}
public DataStreamSink<IN> getSink1() {
return sink1;
}
private SinkTransformation<IN> getSinkTransformation() {
return sink1.getTransformation();
}
private StreamTransformation<IN> getStreamTransformation() {
return sink2.getTransformation();
}
/**
* Sets the name of this sink. This name is
* used by the visualization and logging during runtime.
*
* @return The named sink.
*/
public CassandraSink<IN> name(String name) {
if (useDataStreamSink) {
getSinkTransformation().setName(name);
} else {
getStreamTransformation().setName(name);
}
return this;
}
/**
* Sets an ID for this operator.
*
* <p>The specified ID is used to assign the same operator ID across job
* submissions (for example when starting a job from a savepoint).
*
* <p><strong>Important</strong>: this ID needs to be unique per
* transformation and job. Otherwise, job submission will fail.
*
* @param uid The unique user-specified ID of this transformation.
* @return The operator with the specified ID.
*/
@PublicEvolving
public CassandraSink<IN> uid(String uid) {
if (useDataStreamSink) {
getSinkTransformation().setUid(uid);
} else {
getStreamTransformation().setUid(uid);
}
return this;
}
/**
* Sets an user provided hash for this operator. This will be used AS IS the create the JobVertexID.
*
* <p>The user provided hash is an alternative to the generated hashes, that is considered when identifying an
* operator through the default hash mechanics fails (e.g. because of changes between Flink versions).
*
* <p><strong>Important</strong>: this should be used as a workaround or for trouble shooting. The provided hash
* needs to be unique per transformation and job. Otherwise, job submission will fail. Furthermore, you cannot
* assign user-specified hash to intermediate nodes in an operator chain and trying so will let your job fail.
*
* <p>A use case for this is in migration between Flink versions or changing the jobs in a way that changes the
* automatically generated hashes. In this case, providing the previous hashes directly through this method (e.g.
* obtained from old logs) can help to reestablish a lost mapping from states to their target operator.
*
* @param uidHash The user provided hash for this operator. This will become the JobVertexID, which is shown in the
* logs and web ui.
* @return The operator with the user provided hash.
*/
@PublicEvolving
public CassandraSink<IN> setUidHash(String uidHash) {
if (useDataStreamSink) {
getSinkTransformation().setUidHash(uidHash);
} else {
getStreamTransformation().setUidHash(uidHash);
}
return this;
}
/**
* Sets the parallelism for this sink. The degree must be higher than zero.
*
* @param parallelism The parallelism for this sink.
* @return The sink with set parallelism.
*/
public CassandraSink<IN> setParallelism(int parallelism) {
if (useDataStreamSink) {
getSinkTransformation().setParallelism(parallelism);
} else {
getStreamTransformation().setParallelism(parallelism);
}
return this;
}
/**
* Turns off chaining for this operator so thread co-location will not be
* used as an optimization.
* <p/>
* <p/>
* Chaining can be turned off for the whole
* job by {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#disableOperatorChaining()}
* however it is not advised for performance considerations.
*
* @return The sink with chaining disabled
*/
public CassandraSink<IN> disableChaining() {
if (useDataStreamSink) {
getSinkTransformation().setChainingStrategy(ChainingStrategy.NEVER);
} else {
getStreamTransformation().setChainingStrategy(ChainingStrategy.NEVER);
}
return this;
}
/**
* Sets the slot sharing group of this operation. Parallel instances of
* operations that are in the same slot sharing group will be co-located in the same
* TaskManager slot, if possible.
*
* <p>Operations inherit the slot sharing group of input operations if all input operations
* are in the same slot sharing group and no slot sharing group was explicitly specified.
*
* <p>Initially an operation is in the default slot sharing group. An operation can be put into
* the default group explicitly by setting the slot sharing group to {@code "default"}.
*
* @param slotSharingGroup The slot sharing group name.
*/
public CassandraSink<IN> slotSharingGroup(String slotSharingGroup) {
if (useDataStreamSink) {
getSinkTransformation().setSlotSharingGroup(slotSharingGroup);
} else {
getStreamTransformation().setSlotSharingGroup(slotSharingGroup);
}
return this;
}
/**
* Writes a DataStream into a Cassandra database.
*
* @param input input DataStream
* @param <IN> input type
* @return CassandraSinkBuilder, to further configure the sink
*/
public static <IN> CassandraSinkBuilder<IN> addSink(org.apache.flink.streaming.api.scala.DataStream<IN> input) {
return addSink(input.javaStream());
}
/**
* Writes a DataStream into a Cassandra database.
*
* @param input input DataStream
* @param <IN> input type
* @return CassandraSinkBuilder, to further configure the sink
*/
public static <IN> CassandraSinkBuilder<IN> addSink(DataStream<IN> input) {
TypeInformation<IN> typeInfo = input.getType();
if (typeInfo instanceof TupleTypeInfo) {
DataStream<Tuple> tupleInput = (DataStream<Tuple>) input;
return (CassandraSinkBuilder<IN>) new CassandraTupleSinkBuilder<>(tupleInput, tupleInput.getType(), tupleInput.getType().createSerializer(tupleInput.getExecutionEnvironment().getConfig()));
}
if (typeInfo instanceof RowTypeInfo) {
DataStream<Row> rowInput = (DataStream<Row>) input;
return (CassandraSinkBuilder<IN>) new CassandraRowSinkBuilder(rowInput, rowInput.getType(), rowInput.getType().createSerializer(rowInput.getExecutionEnvironment().getConfig()));
}
if (typeInfo instanceof PojoTypeInfo) {
return new CassandraPojoSinkBuilder<>(input, input.getType(), input.getType().createSerializer(input.getExecutionEnvironment().getConfig()));
}
if (typeInfo instanceof CaseClassTypeInfo) {
DataStream<Product> productInput = (DataStream<Product>) input;
return (CassandraSinkBuilder<IN>) new CassandraScalaProductSinkBuilder<>(productInput, productInput.getType(), productInput.getType().createSerializer(input.getExecutionEnvironment().getConfig()));
}
throw new IllegalArgumentException("No support for the type of the given DataStream: " + input.getType());
}
/**
* Builder for a {@link CassandraSink}.
* @param <IN>
*/
public abstract static class CassandraSinkBuilder<IN> {
protected final DataStream<IN> input;
protected final TypeSerializer<IN> serializer;
protected final TypeInformation<IN> typeInfo;
protected ClusterBuilder builder;
protected MapperOptions mapperOptions;
protected String query;
protected CheckpointCommitter committer;
protected boolean isWriteAheadLogEnabled;
public CassandraSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
this.input = input;
this.typeInfo = typeInfo;
this.serializer = serializer;
}
/**
* Sets the query that is to be executed for every record.
*
* @param query query to use
* @return this builder
*/
public CassandraSinkBuilder<IN> setQuery(String query) {
this.query = query;
return this;
}
/**
* Sets the cassandra host to connect to.
*
* @param host host to connect to
* @return this builder
*/
public CassandraSinkBuilder<IN> setHost(String host) {
return setHost(host, 9042);
}
/**
* Sets the cassandra host/port to connect to.
*
* @param host host to connect to
* @param port port to connect to
* @return this builder
*/
public CassandraSinkBuilder<IN> setHost(final String host, final int port) {
if (this.builder != null) {
throw new IllegalArgumentException("Builder was already set. You must use either setHost() or setClusterBuilder().");
}
this.builder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint(host).withPort(port).build();
}
};
return this;
}
/**
* Sets the ClusterBuilder for this sink. A ClusterBuilder is used to configure the connection to cassandra.
*
* @param builder ClusterBuilder to configure the connection to cassandra
* @return this builder
*/
public CassandraSinkBuilder<IN> setClusterBuilder(ClusterBuilder builder) {
if (this.builder != null) {
throw new IllegalArgumentException("Builder was already set. You must use either setHost() or setClusterBuilder().");
}
this.builder = builder;
return this;
}
/**
* Enables the write-ahead log, which allows exactly-once processing for non-deterministic algorithms that use
* idempotent updates.
*
* @return this builder
*/
public CassandraSinkBuilder<IN> enableWriteAheadLog() {
this.isWriteAheadLogEnabled = true;
return this;
}
/**
* Enables the write-ahead log, which allows exactly-once processing for non-deterministic algorithms that use
* idempotent updates.
*
* @param committer CheckpointCommitter, that stores information about completed checkpoints in an external
* resource. By default this information is stored within a separate table within Cassandra.
* @return this builder
*/
public CassandraSinkBuilder<IN> enableWriteAheadLog(CheckpointCommitter committer) {
this.isWriteAheadLogEnabled = true;
this.committer = committer;
return this;
}
/**
* Sets the mapper options for this sink. The mapper options are used to configure the DataStax
* {@link com.datastax.driver.mapping.Mapper} when writing POJOs.
*
* <p>This call has no effect if the input {@link DataStream} for this sink does not contain POJOs.
*
* @param options MapperOptions, that return an array of options that are used to configure the DataStax mapper.
*
* @return this builder
*/
public CassandraSinkBuilder<IN> setMapperOptions(MapperOptions options) {
this.mapperOptions = options;
return this;
}
/**
* Finalizes the configuration of this sink.
*
* @return finalized sink
* @throws Exception
*/
public CassandraSink<IN> build() throws Exception {
sanityCheck();
return isWriteAheadLogEnabled
? createWriteAheadSink()
: createSink();
}
protected abstract CassandraSink<IN> createSink() throws Exception;
protected abstract CassandraSink<IN> createWriteAheadSink() throws Exception;
protected void sanityCheck() {
if (builder == null) {
throw new IllegalArgumentException("Cassandra host information must be supplied using either setHost() or setClusterBuilder().");
}
}
}
/**
* Builder for a {@link CassandraTupleSink}.
* @param <IN>
*/
public static class CassandraTupleSinkBuilder<IN extends Tuple> extends CassandraSinkBuilder<IN> {
public CassandraTupleSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
super(input, typeInfo, serializer);
}
@Override
protected void sanityCheck() {
super.sanityCheck();
if (query == null || query.length() == 0) {
throw new IllegalArgumentException("Query must not be null or empty.");
}
}
@Override
public CassandraSink<IN> createSink() throws Exception {
return new CassandraSink<>(input.addSink(new CassandraTupleSink<IN>(query, builder)).name("Cassandra Sink"));
}
@Override
protected CassandraSink<IN> createWriteAheadSink() throws Exception {
return committer == null
? new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraTupleWriteAheadSink<>(query, serializer, builder, new CassandraCommitter(builder))))
: new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraTupleWriteAheadSink<>(query, serializer, builder, committer)));
}
}
/**
* Builder for a {@link CassandraRowSink}.
*/
public static class CassandraRowSinkBuilder extends CassandraSinkBuilder<Row> {
public CassandraRowSinkBuilder(DataStream<Row> input, TypeInformation<Row> typeInfo, TypeSerializer<Row> serializer) {
super(input, typeInfo, serializer);
}
@Override
protected void sanityCheck() {
super.sanityCheck();
if (query == null || query.length() == 0) {
throw new IllegalArgumentException("Query must not be null or empty.");
}
}
@Override
protected CassandraSink<Row> createSink() throws Exception {
return new CassandraSink<>(input.addSink(new CassandraRowSink(typeInfo.getArity(), query, builder)).name("Cassandra Sink"));
}
@Override
protected CassandraSink<Row> createWriteAheadSink() throws Exception {
return committer == null
? new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraRowWriteAheadSink(query, serializer, builder, new CassandraCommitter(builder))))
: new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraRowWriteAheadSink(query, serializer, builder, committer)));
}
}
/**
* Builder for a {@link CassandraPojoSink}.
* @param <IN>
*/
public static class CassandraPojoSinkBuilder<IN> extends CassandraSinkBuilder<IN> {
public CassandraPojoSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
super(input, typeInfo, serializer);
}
@Override
protected void sanityCheck() {
super.sanityCheck();
if (query != null) {
throw new IllegalArgumentException("Specifying a query is not allowed when using a Pojo-Stream as input.");
}
}
@Override
public CassandraSink<IN> createSink() throws Exception {
return new CassandraSink<>(input.addSink(new CassandraPojoSink<>(typeInfo.getTypeClass(), builder, mapperOptions)).name("Cassandra Sink"));
}
@Override
protected CassandraSink<IN> createWriteAheadSink() throws Exception {
throw new IllegalArgumentException("Exactly-once guarantees can only be provided for tuple types.");
}
}
/**
* Builder for a {@link CassandraScalaProductSink}.
* @param <IN>
*/
public static class CassandraScalaProductSinkBuilder<IN extends Product> extends CassandraSinkBuilder<IN> {
public CassandraScalaProductSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
super(input, typeInfo, serializer);
}
@Override
protected void sanityCheck() {
super.sanityCheck();
if (query == null || query.length() == 0) {
throw new IllegalArgumentException("Query must not be null or empty.");
}
}
@Override
public CassandraSink<IN> createSink() throws Exception {
return new CassandraSink<>(input.addSink(new CassandraScalaProductSink<IN>(query, builder)).name("Cassandra Sink"));
}
@Override
protected CassandraSink<IN> createWriteAheadSink() throws Exception {
throw new IllegalArgumentException("Exactly-once guarantees can only be provided for flink tuple types.");
}
}
}