blob: bcc9086bd8aa95afd68e14cb9099bf3511d9e76d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.spark.sparksql;
import org.apache.cassandra.spark.bulkwriter.BulkWriterContext;
import org.apache.cassandra.spark.bulkwriter.CassandraBulkSourceRelation;
import org.apache.cassandra.spark.bulkwriter.CassandraBulkWriterContext;
import org.apache.cassandra.spark.bulkwriter.JobInfo;
import org.apache.cassandra.spark.bulkwriter.LoadNotSupportedException;
import org.apache.cassandra.spark.utils.ScalaConversionUtils;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.sources.BaseRelation;
import org.apache.spark.sql.sources.CreatableRelationProvider;
import org.apache.spark.sql.sources.DataSourceRegister;
import org.apache.spark.sql.types.StructType;
import org.jetbrains.annotations.NotNull;
import scala.collection.immutable.Map;
public class CassandraDataSink implements DataSourceRegister, CreatableRelationProvider
{
@Override
@NotNull
public String shortName()
{
return "cassandraBulkWrite";
}
/**
* @param sqlContext the SQLContext instance
* @param saveMode must be {@link SaveMode#Append}
* @param parameters the writer options
* @param data the data to persist into the Cassandra table
* @throws LoadNotSupportedException if the @<code>saveMode</code> is not supported: {@link SaveMode#Overwrite},
* {@link SaveMode#ErrorIfExists}, or {@link SaveMode#Ignore}
*/
@Override
@NotNull
public BaseRelation createRelation(@NotNull SQLContext sqlContext,
@NotNull SaveMode saveMode,
@NotNull Map<String, String> parameters,
@NotNull Dataset<Row> data)
{
switch (saveMode)
{
case Append:
// Initialize the job group ID for later use if we need to cancel the job
// TODO: Can we get a more descriptive "description" in here from the end user somehow?
BulkWriterContext writerContext = createBulkWriterContext(
sqlContext.sparkContext(),
ScalaConversionUtils.<String, String>mapAsJavaMap(parameters),
data.schema());
try
{
JobInfo jobInfo = writerContext.job();
String description = "Cassandra Bulk Load for table " + jobInfo.getFullTableName();
CassandraBulkSourceRelation relation = new CassandraBulkSourceRelation(writerContext, sqlContext);
sqlContext.sparkContext().setJobGroup(jobInfo.getId().toString(), description, false);
relation.insert(data, false);
return relation;
}
catch (Exception exception)
{
throw new RuntimeException(exception);
}
finally
{
writerContext.shutdown();
sqlContext.sparkContext().clearJobGroup();
}
case Overwrite:
throw new LoadNotSupportedException("SaveMode.Overwrite is not supported on Cassandra as it needs privileged TRUNCATE operation");
default:
throw new LoadNotSupportedException("SaveMode." + saveMode + " is not supported");
}
}
@NotNull
protected BulkWriterContext createBulkWriterContext(@NotNull SparkContext sparkContext,
@NotNull java.util.Map<String, String> options,
@NotNull StructType schema)
{
return CassandraBulkWriterContext.fromOptions(sparkContext, options, schema);
}
}