blob: 00834bae58a6c5a425d7113b8151ad3e65cd4b73 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.test;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.io.FileUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.tools.BulkLoader;
import org.apache.cassandra.tools.ToolRunner;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.NativeSSTableLoaderClient;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static com.google.common.collect.Lists.transform;
import static org.apache.cassandra.distributed.test.ExecUtil.rethrow;
import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
import static org.apache.cassandra.distributed.shared.AssertUtils.row;
public class SSTableLoaderEncryptionOptionsTest extends AbstractEncryptionOptionsImpl
{
static Cluster CLUSTER;
static String NODES;
static int NATIVE_PORT;
static int STORAGE_PORT;
static int SSL_STORAGE_PORT;
@BeforeClass
public static void setupCluster() throws IOException
{
CLUSTER = Cluster.build().withNodes(1).withConfig(c -> {
c.with(Feature.NATIVE_PROTOCOL, Feature.NETWORK, Feature.GOSSIP); // need gossip to get hostid for java driver
c.set("server_encryption_options",
ImmutableMap.builder().putAll(validKeystore)
.put("internode_encryption", "all")
.put("optional", false)
.build());
c.set("client_encryption_options",
ImmutableMap.builder().putAll(validKeystore)
.put("enabled", true)
.put("optional", false)
.put("accepted_protocols", Collections.singletonList("TLSv1.2"))
.build());
}).start();
NODES = CLUSTER.get(1).config().broadcastAddress().getHostString();
NATIVE_PORT = CLUSTER.get(1).callOnInstance(DatabaseDescriptor::getNativeTransportPort);
STORAGE_PORT = CLUSTER.get(1).callOnInstance(DatabaseDescriptor::getStoragePort);
SSL_STORAGE_PORT = CLUSTER.get(1).callOnInstance(DatabaseDescriptor::getSSLStoragePort);
}
@AfterClass
public static void tearDownCluster()
{
if (CLUSTER != null)
CLUSTER.close();
}
@Test
public void bulkLoaderSuccessfullyStreamsOverSsl() throws Throwable
{
File sstables_to_upload = prepareSstablesForUpload();
ToolRunner.ToolResult tool = ToolRunner.invokeClass(BulkLoader.class,
"--nodes", NODES,
"--port", Integer.toString(NATIVE_PORT),
"--storage-port", Integer.toString(STORAGE_PORT),
"--keystore", validKeyStorePath,
"--keystore-password", validKeyStorePassword,
"--truststore", validTrustStorePath,
"--truststore-password", validTrustStorePassword,
"--conf-path", "test/conf/sstableloader_with_encryption.yaml",
sstables_to_upload.absolutePath());
tool.assertOnCleanExit();
assertTrue(tool.getStdout().contains("Summary statistics"));
assertRows(CLUSTER.get(1).executeInternal("SELECT count(*) FROM ssl_upload_tables.test"), row(42L));
}
@Test
public void bulkLoaderSuccessfullyStreamsOverSslWithDeprecatedSslStoragePort() throws Throwable
{
File sstables_to_upload = prepareSstablesForUpload();
ToolRunner.ToolResult tool = ToolRunner.invokeClass(BulkLoader.class,
"--nodes", NODES,
"--port", Integer.toString(NATIVE_PORT),
"--storage-port", Integer.toString(STORAGE_PORT),
"--ssl-storage-port", Integer.toString(SSL_STORAGE_PORT),
"--keystore", validKeyStorePath,
"--keystore-password", validKeyStorePassword,
"--truststore", validTrustStorePath,
"--truststore-password", validTrustStorePassword,
"--conf-path", "test/conf/sstableloader_with_encryption.yaml",
sstables_to_upload.absolutePath());
tool.assertOnCleanExit();
assertTrue(tool.getStdout().contains("Summary statistics"));
assertTrue(tool.getStdout().contains("ssl storage port is deprecated and not used"));
assertRows(CLUSTER.get(1).executeInternal("SELECT count(*) FROM ssl_upload_tables.test"), row(42L));
}
@Test
public void bulkLoaderCannotAgreeOnClientTLSProtocol()
{
ToolRunner.ToolResult tool = ToolRunner.invokeClass(BulkLoader.class,
"--ssl-protocol", "TLSv1",
"--nodes", NODES,
"--port", Integer.toString(NATIVE_PORT),
"--storage-port", Integer.toString(STORAGE_PORT),
"--keystore", validKeyStorePath,
"--keystore-password", validKeyStorePassword,
"--truststore", validTrustStorePath,
"--truststore-password", validTrustStorePassword,
"test/data/legacy-sstables/na/legacy_tables/legacy_na_clust");
assertNotEquals(0, tool.getExitCode());
assertTrue(tool.getStderr().contains("Unable to initialise " + NativeSSTableLoaderClient.class.getName()));
}
private static File prepareSstablesForUpload() throws IOException
{
generateSstables();
File sstable_dir = copySstablesFromDataDir("test");
truncateGeneratedTables();
return sstable_dir;
}
private static void generateSstables() throws IOException
{
CLUSTER.schemaChange("CREATE KEYSPACE IF NOT EXISTS ssl_upload_tables WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}");
CLUSTER.schemaChange("CREATE TABLE IF NOT EXISTS ssl_upload_tables.test (pk int, val text, PRIMARY KEY (pk))");
for (int i = 0; i < 42; i++)
{
CLUSTER.get(1).executeInternal(String.format("INSERT INTO ssl_upload_tables.test (pk, val) VALUES (%s, '%s')", i, i));
}
CLUSTER.get(1).runOnInstance(rethrow(() -> StorageService.instance.forceKeyspaceFlush("ssl_upload_tables",
ColumnFamilyStore.FlushReason.UNIT_TESTS)));
}
private static void truncateGeneratedTables() throws IOException
{
CLUSTER.get(1).executeInternal("TRUNCATE ssl_upload_tables.test");
}
private static File copySstablesFromDataDir(String table) throws IOException
{
File cfDir = new File("build/test/cassandra/ssl_upload_tables", table);
cfDir.tryCreateDirectories();
// Get paths as Strings, because org.apache.cassandra.io.util.File in the dtest
// node is loaded by org.apache.cassandra.distributed.shared.InstanceClassLoader.
List<String> keyspace_dir_paths = CLUSTER.get(1).callOnInstance(() -> {
List<File> cfDirs = Keyspace.open("ssl_upload_tables").getColumnFamilyStore(table).getDirectories().getCFDirectories();
return transform(cfDirs, (d) -> d.absolutePath());
});
for (File srcDir : transform(keyspace_dir_paths, (p) -> new File(p)))
{
for (File file : srcDir.tryList((file) -> file.isFile()))
{
FileUtils.copyFileToDirectory(file.toJavaIOFile(), cfDir.toJavaIOFile());
}
}
return cfDir;
}
}