blob: 749a051dcc3a87fd32d7705a239bc7d005b16688 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.spark.example;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import static org.apache.cassandra.spark.example.LocalStorageTransportExtension.BUCKET_NAME;
/**
* A sample Cassandra spark job that writes to (local) s3 first and imports into Cassandra via Sidecar
*/
public class LocalS3WriteAndReadJob extends AbstractCassandraJob
{
private String dataCenter = "datacenter1";
private String sidecarInstances = "localhost,localhost2,localhost3";
LocalS3WriteAndReadJob(String[] args)
{
if (args.length == 2)
{
dataCenter = args[0];
sidecarInstances = args[1];
}
}
public static void main(String[] args)
{
System.setProperty("SKIP_STARTUP_VALIDATIONS", "true");
// It expects to have mocks3 running locally on 9090
ProcessBuilder pb = new ProcessBuilder();
pb.command("curl", "-X", "PUT", "localhost:9090/" + BUCKET_NAME);
try
{
pb.start().waitFor();
}
catch (Exception e)
{
// ignore when the bucket is already created
}
new LocalS3WriteAndReadJob(args).start(args);
}
protected JobConfiguration configureJob(SparkContext sc, SparkConf sparkConf)
{
Map<String, String> writeOptions = new HashMap<>();
writeOptions.put("sidecar_instances", sidecarInstances);
writeOptions.put("keyspace", "spark_test");
writeOptions.put("table", "test");
writeOptions.put("local_dc", dataCenter);
writeOptions.put("bulk_writer_cl", "LOCAL_QUORUM");
writeOptions.put("number_splits", "-1");
// ---- Below write options are for S3_COMPAT impl only ----
// Set the data transport mode to "S3_COMPAT" to use an AWS S3-compatible
// storage service to move data from Spark to Sidecar
writeOptions.put("data_transport", "S3_COMPAT");
writeOptions.put("data_transport_extension_class", LocalStorageTransportExtension.class.getCanonicalName());
// It is only needed in order to talk to the local mocks3 server. Do not set the option in other scenarios.
writeOptions.put("storage_client_endpoint_override", "http://localhost:9090");
// 5MiB for testing. The default is 100MiB. It controls chunk size for multipart upload
writeOptions.put("storage_client_max_chunk_size_in_bytes", "5242880");
// 10MiB for testing. The default is 5GiB. It controls object size on S3
writeOptions.put("max_size_per_sstable_bundle_in_bytes_s3_transport", "10485760");
writeOptions.put("max_job_duration_minutes", "10");
writeOptions.put("job_id", "a_unique_id_made_of_arbitrary_string");
int coresPerExecutor = sparkConf.getInt("spark.executor.cores", 1);
int numExecutors = sparkConf.getInt("spark.dynamicAllocation.maxExecutors",
sparkConf.getInt("spark.executor.instances", 1));
int numCores = coresPerExecutor * numExecutors;
Map<String, String> readerOptions = new HashMap<>();
readerOptions.put("sidecar_instances", "localhost,localhost2,localhost3");
readerOptions.put("keyspace", "spark_test");
readerOptions.put("table", "test");
readerOptions.put("DC", "datacenter1");
readerOptions.put("snapshotName", UUID.randomUUID().toString());
readerOptions.put("createSnapshot", "true");
readerOptions.put("defaultParallelism", String.valueOf(sc.defaultParallelism()));
readerOptions.put("numCores", String.valueOf(numCores));
readerOptions.put("sizing", "default");
JobConfiguration config = new JobConfiguration(writeOptions, readerOptions); // empty read option since the job does not perform read.
config.rowCount = 2_000_000L;
return config;
}
}