blob: 14d49dc0ca77e22fcc1fbae5c70b7eb6fb6e6f0f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.test;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.concurrent.TimeoutException;
import org.junit.Test;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.implementation.MethodDelegation;
import net.bytebuddy.implementation.bind.annotation.SuperCall;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.NodeToolResult;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.repair.AsymmetricRemoteSyncTask;
import org.apache.cassandra.repair.LocalSyncTask;
import org.apache.cassandra.repair.RepairJob;
import org.apache.cassandra.repair.RepairJobDesc;
import org.apache.cassandra.repair.SyncTask;
import org.apache.cassandra.repair.TreeResponse;
import org.apache.cassandra.streaming.PreviewKind;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class OptimiseStreamsRepairTest extends TestBaseImpl
{
@Test
public void testBasic() throws Exception
{
try(Cluster cluster = init(Cluster.build(3)
.withInstanceInitializer(BBHelper::install)
.withConfig(config -> config.set("hinted_handoff_enabled", false)
.with(GOSSIP)
.with(NETWORK))
.start()))
{
cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int) with compaction={'class': 'SizeTieredCompactionStrategy'}");
for (int i = 0; i < 10000; i++)
cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (id, t) values (?,?)", ConsistencyLevel.ALL, i, i);
cluster.forEach((i) -> i.flush(KEYSPACE));
cluster.get(2).shutdown().get();
for (int i = 0; i < 2000; i++)
cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl (id, t) values (?,?)", ConsistencyLevel.QUORUM, i, i * 2 + 2);
cluster.get(2).startup();
Thread.sleep(1000);
cluster.forEach(c -> c.flush(KEYSPACE));
cluster.forEach(c -> c.forceCompact(KEYSPACE, "tbl"));
long [] marks = PreviewRepairTest.logMark(cluster);
NodeToolResult res = cluster.get(1).nodetoolResult("repair", KEYSPACE, "-os");
res.asserts().success();
PreviewRepairTest.waitLogsRepairFullyFinished(cluster, marks);
res = cluster.get(1).nodetoolResult("repair", KEYSPACE, "-vd");
res.asserts().success();
res.asserts().notificationContains("Repaired data is in sync");
res = cluster.get(1).nodetoolResult("repair", KEYSPACE, "--preview", "--full");
res.asserts().success();
res.asserts().notificationContains("Previewed data was in sync");
}
}
public static class BBHelper
{
public static void install(ClassLoader cl, int id)
{
new ByteBuddy().rebase(RepairJob.class)
.method(named("createOptimisedSyncingSyncTasks"))
.intercept(MethodDelegation.to(BBHelper.class))
.make()
.load(cl, ClassLoadingStrategy.Default.INJECTION);
}
public static List<SyncTask> createOptimisedSyncingSyncTasks(RepairJobDesc desc,
List<TreeResponse> trees,
InetAddressAndPort local,
Predicate<InetAddressAndPort> isTransient,
Function<InetAddressAndPort, String> getDC,
boolean isIncremental,
PreviewKind previewKind,
@SuperCall Callable<List<SyncTask>> zuperCall)
{
List<SyncTask> tasks = null;
try
{
tasks = zuperCall.call();
verifySyncTasks(tasks);
}
catch (Exception e)
{
throw new RuntimeException(e);
}
return tasks;
}
private static void verifySyncTasks(List<SyncTask> tasks) throws UnknownHostException
{
Map<InetAddressAndPort, Map<InetAddressAndPort, List<Range<Token>>>> fetching = new HashMap<>();
for (SyncTask task : tasks)
{
if (task instanceof LocalSyncTask)
{
assertFalse(((LocalSyncTask)task).transferRanges);
assertTrue(((LocalSyncTask)task).requestRanges);
}
else
assertTrue(task instanceof AsymmetricRemoteSyncTask);
Map<InetAddressAndPort, List<Range<Token>>> fetch = fetching.computeIfAbsent(task.nodePair().coordinator, k -> new HashMap<>());
fetch.computeIfAbsent(task.nodePair().peer, k -> new ArrayList<>()).addAll(task.rangesToSync);
}
// 127.0.0.2 is the node out of sync - make sure it does not receive multiple copies of the same range from the other nodes;
Map<InetAddressAndPort, List<Range<Token>>> node2 = fetching.get(InetAddressAndPort.getByName("127.0.0.2"));
Set<Range<Token>> allRanges = new HashSet<>();
node2.values().forEach(ranges -> ranges.forEach(r -> assertTrue(allRanges.add(r))));
// 127.0.0.2 should stream the same ranges to .1 and .3
Set<Range<Token>> node2ToNode1 = new HashSet<>(fetching.get(InetAddressAndPort.getByName("127.0.0.1")).get(InetAddressAndPort.getByName("127.0.0.2")));
Set<Range<Token>> node2ToNode3 = new HashSet<>(fetching.get(InetAddressAndPort.getByName("127.0.0.3")).get(InetAddressAndPort.getByName("127.0.0.2")));
assertEquals(node2ToNode1, allRanges);
assertEquals(node2ToNode3, allRanges);
}
}
@Test
public void randomTest() throws IOException, TimeoutException
{
try(Cluster cluster = init(Cluster.build(3)
.withConfig(config -> config.set("hinted_handoff_enabled", false)
.with(GOSSIP)
.with(NETWORK))
.start()))
{
cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int) with compaction={'class': 'SizeTieredCompactionStrategy'}");
for (int i = 0; i < 1000; i++)
cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (id, t) values (?,?)", ConsistencyLevel.ALL, i, i);
cluster.forEach((i) -> i.flush(KEYSPACE));
Random r = new Random();
for (int i = 0; i < 500; i++)
for (int j = 1; j <= 3; j++)
cluster.get(j).executeInternal("INSERT INTO "+KEYSPACE+".tbl (id, t) values (?,?)", r.nextInt(), i * 2 + 2);
long [] marks = PreviewRepairTest.logMark(cluster);
NodeToolResult res = cluster.get(1).nodetoolResult("repair", KEYSPACE, "-os");
res.asserts().success();
PreviewRepairTest.waitLogsRepairFullyFinished(cluster, marks);
res = cluster.get(1).nodetoolResult("repair", KEYSPACE, "-vd");
res.asserts().success();
res.asserts().notificationContains("Repaired data is in sync");
res = cluster.get(1).nodetoolResult("repair", KEYSPACE, "--preview", "--full");
res.asserts().success();
res.asserts().notificationContains("Previewed data was in sync");
}
}
}