blob: bf982ef153affb6cd42f45a5145705e1987724cb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.spark.bulkwriter;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigInteger;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.MessageDigest;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Range;
import org.apache.commons.codec.digest.DigestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.bridge.CassandraBridgeFactory;
import org.apache.cassandra.bridge.CassandraVersion;
import org.apache.cassandra.bridge.CassandraVersionFeatures;
import org.apache.cassandra.spark.common.MD5Hash;
import org.apache.cassandra.spark.common.SSTables;
import org.apache.cassandra.spark.data.DataLayer;
import org.apache.cassandra.spark.data.LocalDataLayer;
import org.apache.cassandra.spark.reader.StreamScanner;
import org.jetbrains.annotations.NotNull;
@SuppressWarnings("WeakerAccess")
public class SSTableWriter
{
private static final Logger LOGGER = LoggerFactory.getLogger(SSTableWriter.class);
public static final String CASSANDRA_VERSION_PREFIX = "cassandra-";
private final Path outDir;
private final org.apache.cassandra.bridge.SSTableWriter cqlSSTableWriter;
private BigInteger minToken = null;
private BigInteger maxToken = null;
private final Map<Path, MD5Hash> fileHashes = new HashMap<>();
public SSTableWriter(org.apache.cassandra.bridge.SSTableWriter tableWriter, Path outDir)
{
cqlSSTableWriter = tableWriter;
this.outDir = outDir;
}
public SSTableWriter(BulkWriterContext writerContext, Path outDir)
{
this.outDir = outDir;
String lowestCassandraVersion = writerContext.cluster().getLowestCassandraVersion();
String packageVersion = getPackageVersion(lowestCassandraVersion);
LOGGER.info("Running with version " + packageVersion);
TableSchema tableSchema = writerContext.schema().getTableSchema();
boolean sorted = writerContext.job().getRowBufferMode() == RowBufferMode.UNBUFFERRED;
this.cqlSSTableWriter = SSTableWriterFactory.getSSTableWriter(
CassandraVersionFeatures.cassandraVersionFeaturesFromCassandraVersion(packageVersion),
this.outDir.toString(),
writerContext.cluster().getPartitioner().toString(),
tableSchema.createStatement,
tableSchema.modificationStatement,
sorted,
writerContext.job().getSstableDataSizeInMB());
}
@NotNull
public String getPackageVersion(String lowestCassandraVersion)
{
return CASSANDRA_VERSION_PREFIX + lowestCassandraVersion;
}
public void addRow(BigInteger token, Object[] values) throws IOException
{
if (minToken == null)
{
minToken = token;
}
maxToken = token;
cqlSSTableWriter.addRow(values);
}
public void close(BulkWriterContext writerContext, int partitionId) throws IOException
{
cqlSSTableWriter.close();
for (Path dataFile : getDataFileStream())
{
// NOTE: We calculate file hashes before re-reading so that we know what we hashed
// is what we validated. Then we send these along with the files and the
// receiving end re-hashes the files to make sure they still match.
fileHashes.putAll(calculateFileHashes(dataFile));
}
validateSSTables(writerContext, partitionId);
}
@VisibleForTesting
public void validateSSTables(@NotNull BulkWriterContext writerContext, int partitionId)
{
// NOTE: If this current implementation of SS-tables' validation proves to be a performance issue,
// we will need to modify LocalDataLayer to allow scanning and compaction of single data file,
// and then validate all of them in parallel threads
try
{
CassandraVersion version = CassandraBridgeFactory.getCassandraVersion(writerContext.cluster().getLowestCassandraVersion());
String keyspace = writerContext.cluster().getRing(true).getKeyspace();
String schema = writerContext.schema().getTableSchema().createStatement;
String directory = getOutDir().toString();
DataLayer layer = new LocalDataLayer(version, keyspace, schema, directory);
try (StreamScanner scanner = layer.openCompactionScanner(partitionId, Collections.emptyList(), null))
{
while (scanner.hasNext())
{
scanner.advanceToNextColumn();
}
}
}
catch (IOException exception)
{
LOGGER.error("[{}]: Unexpected exception while validating SSTables {}", partitionId, getOutDir());
throw new RuntimeException(exception);
}
}
private DirectoryStream<Path> getDataFileStream() throws IOException
{
return Files.newDirectoryStream(getOutDir(), "*Data.db");
}
private Map<Path, MD5Hash> calculateFileHashes(Path dataFile) throws IOException
{
Map<Path, MD5Hash> fileHashes = new HashMap<>();
try (DirectoryStream<Path> filesToHash =
Files.newDirectoryStream(dataFile.getParent(), SSTables.getSSTableBaseName(dataFile) + "*"))
{
for (Path path: filesToHash)
{
fileHashes.put(path, calculateFileHash(path));
}
}
return fileHashes;
}
private MD5Hash calculateFileHash(Path path) throws IOException
{
try (InputStream is = Files.newInputStream(path))
{
MessageDigest computedMd5 = DigestUtils.updateDigest(DigestUtils.getMd5Digest(), is);
return MD5Hash.fromDigest(computedMd5);
}
}
public Range<BigInteger> getTokenRange()
{
return Range.closed(minToken, maxToken);
}
public Path getOutDir()
{
return outDir;
}
public Map<Path, MD5Hash> getFileHashes()
{
return fileHashes;
}
}