blob: 2e828e0c155bb1bd9c8452336468ab00ba1ad7a6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.test;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datastax.driver.core.Session;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.NodeToolResult;
import org.apache.cassandra.utils.Pair;
import static java.util.stream.Collectors.toList;
public abstract class AbstractNetstatsStreaming extends TestBaseImpl
{
protected static final Logger logger = LoggerFactory.getLogger(AbstractNetstatsStreaming.class);
protected ExecutorService executorService;
@Before
public void setup()
{
executorService = Executors.newCachedThreadPool();
}
@After
public void teardown() throws Exception
{
try
{
executorService.shutdownNow();
if (!executorService.isShutdown())
{
if (!executorService.awaitTermination(1, TimeUnit.MINUTES))
{
throw new IllegalStateException("Unable to shutdown executor for invoking netstat commands.");
}
}
}
finally
{
executorService = null;
}
}
protected void changeReplicationFactor()
{
try (com.datastax.driver.core.Cluster c = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build();
Session s = c.connect())
{
s.execute("ALTER KEYSPACE netstats_test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2 };");
}
}
protected void createTable(Cluster cluster, int replicationFactor, boolean compressionEnabled)
{
// replication factor is 1
cluster.schemaChange("CREATE KEYSPACE netstats_test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + replicationFactor + "};");
if (compressionEnabled)
{
cluster.schemaChange("CREATE TABLE netstats_test.test_table (id uuid primary key) WITH compression = {'enabled':'true', 'class': 'LZ4Compressor'};");
}
else
{
cluster.schemaChange("CREATE TABLE netstats_test.test_table (id uuid primary key) WITH compression = {'enabled':'false'};");
}
}
protected void populateData(boolean forCompressedTest)
{
try (com.datastax.driver.core.Cluster c = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build();
Session s = c.connect("netstats_test"))
{
int records = forCompressedTest ? 100_000 : 70_000;
for (int i = 0; i < records; i++)
{
s.execute("INSERT INTO test_table (id) VALUES (" + UUID.randomUUID() + ')');
}
}
}
protected static class NetstatsOutputParser
{
public static List<Pair<ReceivingStastistics, SendingStatistics>> parse(final NetstatResults results)
{
final Set<String> outputs = new LinkedHashSet<>();
results.netstatOutputs.stream()
.map(NodeToolResult::getStdout)
.filter(output -> !output.contains("Not sending any streams"))
.filter(output -> output.contains("Receiving") || output.contains("Sending"))
.forEach(outputs::add);
final List<Pair<ReceivingStastistics, SendingStatistics>> parsed = new ArrayList<>();
for (final String output : outputs)
{
boolean processingReceiving = false;
boolean processingSending = false;
final ReceivingStastistics receivingStastistics = new ReceivingStastistics();
final SendingStatistics sendingStatistics = new SendingStatistics();
final List<String> sanitisedOutput = Stream.of(output.split("\n"))
.map(String::trim)
.filter(line -> !line.isEmpty())
// sometimes logs are mangled into output
.filter(line -> Stream.of("DEBUG", "INFO", "ERROR", "WARN").noneMatch(line::contains))
.filter(line -> Stream.of("Mode:", "Read", "Attempted", "Mismatch", "Pool", "Large", "Small", "Gossip").noneMatch(line::startsWith))
.collect(toList());
for (final String outputLine : sanitisedOutput)
{
if (outputLine.startsWith("Receiving"))
{
processingReceiving = true;
processingSending = false;
receivingStastistics.parseHeader(outputLine);
}
else if (outputLine.startsWith("Sending"))
{
processingSending = true;
processingReceiving = false;
sendingStatistics.parseHeader(outputLine);
}
else if (processingReceiving)
{
receivingStastistics.parseTable(outputLine);
}
else if (processingSending)
{
sendingStatistics.parseTable(outputLine);
}
}
parsed.add(Pair.create(receivingStastistics, sendingStatistics));
}
return parsed;
}
public static void validate(List<Pair<ReceivingStastistics, SendingStatistics>> result)
{
List<SendingStatistics> sendingStatistics = result.stream().map(pair -> pair.right).collect(toList());
if (sendingStatistics.size() >= 2)
{
for (int i = 0; i < sendingStatistics.size() - 1; i++)
{
SendingStatistics.SendingHeader header1 = sendingStatistics.get(i).sendingHeader;
SendingStatistics.SendingHeader header2 = sendingStatistics.get(i + 1).sendingHeader;
if (header1 != null && header2 != null)
{
Assert.assertTrue(header1.compareTo(header2) <= 0);
}
}
}
for (SendingStatistics sending : sendingStatistics)
{
if (sending.sendingHeader != null)
{
Assert.assertEquals(sending.sendingHeader.bytesTotalSoFar, (long) sending.sendingSSTable.stream().map(table -> table.bytesSent).reduce(Long::sum).orElse(0L));
Assert.assertTrue(sending.sendingHeader.bytesTotal >= sending.sendingSSTable.stream().map(table -> table.bytesInTotal).reduce(Long::sum).orElse(0L));
if (sending.sendingHeader.bytesTotalSoFar != 0)
{
double progress = (double) sending.sendingSSTable.stream().map(table -> table.bytesSent).reduce(Long::sum).orElse(0L) / (double) sending.sendingHeader.bytesTotal;
Assert.assertTrue((int) sending.sendingHeader.progressBytes >= (int) (progress * 100));
Assert.assertTrue((double) sending.sendingHeader.bytesTotal >= (double) sending.sendingSSTable.stream().map(table -> table.bytesInTotal).reduce(Long::sum).orElse(0L));
}
}
}
List<ReceivingStastistics> receivingStastistics = result.stream().map(pair -> pair.left).collect(toList());
for (ReceivingStastistics receiving : receivingStastistics)
{
if (receiving.receivingHeader != null)
{
Assert.assertTrue(receiving.receivingHeader.bytesTotal >= receiving.receivingTables.stream().map(table -> table.receivedSoFar).reduce(Long::sum).orElse(0L));
Assert.assertEquals(receiving.receivingHeader.bytesTotalSoFar, (long) receiving.receivingTables.stream().map(table -> table.receivedSoFar).reduce(Long::sum).orElse(0L));
}
}
}
public static class ReceivingStastistics
{
public ReceivingHeader receivingHeader;
public List<ReceivingTable> receivingTables = new ArrayList<>();
public void parseHeader(String header)
{
receivingHeader = ReceivingHeader.parseHeader(header);
}
public void parseTable(String table)
{
receivingTables.add(ReceivingTable.parseTable(table));
}
public String toString()
{
return "ReceivingStastistics{" +
"receivingHeader=" + receivingHeader +
", receivingTables=" + receivingTables +
'}';
}
public static class ReceivingHeader
{
private static final Pattern receivingHeaderPattern = Pattern.compile(
"Receiving (.*) files, (.*) bytes total. Already received (.*) files \\((.*)%\\), (.*) bytes total \\((.*)%\\)"
);
int totalReceiving = 0;
long bytesTotal = 0;
int alreadyReceived = 0;
double progressFiles = 0.0f;
long bytesTotalSoFar = 0;
double progressBytes = 0.0f;
public static ReceivingHeader parseHeader(String header)
{
final Matcher matcher = receivingHeaderPattern.matcher(header);
if (matcher.matches())
{
final ReceivingHeader receivingHeader = new ReceivingHeader();
receivingHeader.totalReceiving = Integer.parseInt(matcher.group(1));
receivingHeader.bytesTotal = Long.parseLong(matcher.group(2));
receivingHeader.alreadyReceived = Integer.parseInt(matcher.group(3));
receivingHeader.progressFiles = Double.parseDouble(matcher.group(4));
receivingHeader.bytesTotalSoFar = Long.parseLong(matcher.group(5));
receivingHeader.progressBytes = Double.parseDouble(matcher.group(6));
return receivingHeader;
}
throw new IllegalStateException("Header does not match - " + header);
}
public String toString()
{
return "ReceivingHeader{" +
"totalReceiving=" + totalReceiving +
", bytesTotal=" + bytesTotal +
", alreadyReceived=" + alreadyReceived +
", progressFiles=" + progressFiles +
", bytesTotalSoFar=" + bytesTotalSoFar +
", progressBytes=" + progressBytes +
'}';
}
}
public static class ReceivingTable
{
long receivedSoFar = 0;
long toReceive = 0;
double progress = 0.0;
private static final Pattern recievingFilePattern = Pattern.compile("(.*) (.*)/(.*) bytes \\((.*)%\\) received from (.*)");
public static ReceivingTable parseTable(String table)
{
final Matcher matcher = recievingFilePattern.matcher(table);
if (matcher.matches())
{
final ReceivingTable receivingTable = new ReceivingTable();
receivingTable.receivedSoFar = Long.parseLong(matcher.group(2));
receivingTable.toReceive = Long.parseLong(matcher.group(3));
receivingTable.progress = Double.parseDouble(matcher.group(4));
return receivingTable;
}
throw new IllegalStateException("Table line does not match - " + table);
}
public String toString()
{
return "ReceivingTable{" +
"receivedSoFar=" + receivedSoFar +
", toReceive=" + toReceive +
", progress=" + progress +
'}';
}
}
}
public static class SendingStatistics
{
public SendingHeader sendingHeader;
public List<SendingSSTable> sendingSSTable = new ArrayList<>();
public void parseHeader(String outputLine)
{
this.sendingHeader = SendingHeader.parseHeader(outputLine);
}
public void parseTable(String table)
{
sendingSSTable.add(SendingSSTable.parseTable(table));
}
public String toString()
{
return "SendingStatistics{" +
"sendingHeader=" + sendingHeader +
", sendingSSTable=" + sendingSSTable +
'}';
}
public static class SendingHeader implements Comparable<SendingHeader>
{
private static final Pattern sendingHeaderPattern = Pattern.compile(
"Sending (.*) files, (.*) bytes total. Already sent (.*) files \\((.*)%\\), (.*) bytes total \\((.*)%\\)"
);
int totalSending = 0;
long bytesTotal = 0;
int alreadySent = 0;
double progressFiles = 0.0f;
long bytesTotalSoFar = 0;
double progressBytes = 0.0f;
public static SendingHeader parseHeader(String header)
{
final Matcher matcher = sendingHeaderPattern.matcher(header);
if (matcher.matches())
{
final SendingHeader sendingHeader = new SendingHeader();
sendingHeader.totalSending = Integer.parseInt(matcher.group(1));
sendingHeader.bytesTotal = Long.parseLong(matcher.group(2));
sendingHeader.alreadySent = Integer.parseInt(matcher.group(3));
sendingHeader.progressFiles = Double.parseDouble(matcher.group(4));
sendingHeader.bytesTotalSoFar = Long.parseLong(matcher.group(5));
sendingHeader.progressBytes = Double.parseDouble(matcher.group(6));
return sendingHeader;
}
throw new IllegalStateException("Header does not match - " + header);
}
public String toString()
{
return "SendingHeader{" +
"totalSending=" + totalSending +
", bytesTotal=" + bytesTotal +
", alreadySent=" + alreadySent +
", progressFiles=" + progressFiles +
", bytesTotalSoFar=" + bytesTotalSoFar +
", progressBytes=" + progressBytes +
'}';
}
public int compareTo(SendingHeader o)
{
// progress on bytes has to be strictly lower,
// even alreadySent and progressFiles and progressBytes are same,
// bytesTotalSoFar has to be lower, bigger or same
if (alreadySent <= o.alreadySent
&& progressFiles <= o.progressFiles
&& bytesTotalSoFar <= o.bytesTotalSoFar
&& progressBytes <= o.progressBytes)
{
return -1;
}
else if (alreadySent == o.alreadySent
&& progressFiles == o.progressFiles
&& bytesTotalSoFar == o.bytesTotalSoFar
&& progressBytes == o.progressBytes)
{
return 0;
}
else if (alreadySent >= o.alreadySent
&& progressFiles >= o.progressFiles
&& bytesTotalSoFar > o.bytesTotalSoFar
&& progressBytes >= o.progressBytes)
{
return 1;
}
else
{
throw new IllegalStateException(String.format("Could not compare arguments %s and %s", this, o));
}
}
}
public static class SendingSSTable
{
private static final Pattern sendingFilePattern = Pattern.compile("(.*) (.*)/(.*) bytes \\((.*)%\\) sent to (.*)");
long bytesSent = 0;
long bytesInTotal = 0;
double progress = 0.0f;
public static SendingSSTable parseTable(String table)
{
final Matcher matcher = sendingFilePattern.matcher(table);
if (matcher.matches())
{
final SendingSSTable sendingSSTable = new SendingSSTable();
sendingSSTable.bytesSent = Long.parseLong(matcher.group(2));
sendingSSTable.bytesInTotal = Long.parseLong(matcher.group(3));
sendingSSTable.progress = Double.parseDouble(matcher.group(4));
return sendingSSTable;
}
throw new IllegalStateException("Table does not match - " + table);
}
public String toString()
{
return "SendingSSTable{" +
"bytesSent=" + bytesSent +
", bytesInTotal=" + bytesInTotal +
", progress=" + progress +
'}';
}
}
}
}
protected static final class NetstatResults
{
private final List<NodeToolResult> netstatOutputs = new ArrayList<>();
public void add(NodeToolResult result)
{
netstatOutputs.add(result);
}
public void assertSuccessful()
{
for (final NodeToolResult result : netstatOutputs)
{
Assert.assertEquals(result.getRc(), 0);
Assert.assertTrue(result.getStderr().isEmpty());
}
}
}
protected static class NetstatsCallable implements Callable<NetstatResults>
{
private final IInvokableInstance node;
public NetstatsCallable(final IInvokableInstance node)
{
this.node = node;
}
public NetstatResults call() throws Exception
{
final NetstatResults results = new NetstatResults();
boolean sawAnyStreamingOutput = false;
while (true)
{
try
{
final NodeToolResult result = node.nodetoolResult(false, "netstats");
logger.info(node.broadcastAddress().toString() + ' ' + result.getStdout());
if (!sawAnyStreamingOutput)
{
if (result.getStdout().contains("Receiving") || result.getStdout().contains("Sending"))
{
sawAnyStreamingOutput = true;
}
}
if (sawAnyStreamingOutput && (!result.getStdout().contains("Receiving") && !result.getStdout().contains("Sending")))
{
break;
}
results.add(result);
Thread.sleep(500);
}
catch (final Exception ex)
{
logger.error(ex.getMessage());
Thread.sleep(500);
}
}
return results;
}
}
}