| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.cassandra.distributed.test; |
| |
| import java.io.IOException; |
| import java.util.List; |
| |
| import org.junit.AfterClass; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| |
| import org.apache.cassandra.db.Keyspace; |
| import org.apache.cassandra.distributed.Cluster; |
| import org.apache.cassandra.distributed.api.IInvokableInstance; |
| import org.apache.cassandra.distributed.api.SimpleQueryResult; |
| import org.apache.cassandra.exceptions.OverloadedException; |
| import org.apache.cassandra.service.StorageService; |
| |
| import static org.apache.cassandra.config.ReplicaFilteringProtectionOptions.DEFAULT_FAIL_THRESHOLD; |
| import static org.apache.cassandra.config.ReplicaFilteringProtectionOptions.DEFAULT_WARN_THRESHOLD; |
| import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL; |
| import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows; |
| import static org.apache.cassandra.distributed.shared.AssertUtils.row; |
| import static org.junit.Assert.assertEquals; |
| |
| /** |
| * Exercises the functionality of {@link org.apache.cassandra.service.ReplicaFilteringProtection}, the |
| * mechanism that ensures distributed index and filtering queries at read consistency levels > ONE/LOCAL_ONE |
| * avoid stale replica results. |
| */ |
| public class ReplicaFilteringProtectionTest extends TestBaseImpl |
| { |
| private static final int REPLICAS = 2; |
| private static final int ROWS = 3; |
| |
| private static Cluster cluster; |
| |
| @BeforeClass |
| public static void setup() throws IOException |
| { |
| cluster = init(Cluster.build() |
| .withNodes(REPLICAS) |
| .withConfig(config -> config.set("hinted_handoff_enabled", false) |
| .set("commitlog_sync", "batch") |
| .set("num_tokens", 1)).start()); |
| |
| // Make sure we start w/ the correct defaults: |
| cluster.get(1).runOnInstance(() -> assertEquals(DEFAULT_WARN_THRESHOLD, StorageService.instance.getCachedReplicaRowsWarnThreshold())); |
| cluster.get(1).runOnInstance(() -> assertEquals(DEFAULT_FAIL_THRESHOLD, StorageService.instance.getCachedReplicaRowsFailThreshold())); |
| } |
| |
| @AfterClass |
| public static void teardown() |
| { |
| cluster.close(); |
| } |
| |
| @Test |
| public void testMissedUpdatesBelowCachingWarnThreshold() |
| { |
| String tableName = "missed_updates_no_warning"; |
| cluster.schemaChange(withKeyspace("CREATE TABLE %s." + tableName + " (k int PRIMARY KEY, v text)")); |
| |
| // The warning threshold provided is one more than the total number of rows returned |
| // to the coordinator from all replicas and therefore should not be triggered. |
| testMissedUpdates(tableName, REPLICAS * ROWS, Integer.MAX_VALUE, false); |
| } |
| |
| @Test |
| public void testMissedUpdatesAboveCachingWarnThreshold() |
| { |
| String tableName = "missed_updates_cache_warn"; |
| cluster.schemaChange(withKeyspace("CREATE TABLE %s." + tableName + " (k int PRIMARY KEY, v text)")); |
| |
| // The warning threshold provided is one less than the total number of rows returned |
| // to the coordinator from all replicas and therefore should be triggered but not fail the query. |
| testMissedUpdates(tableName, REPLICAS * ROWS - 1, Integer.MAX_VALUE, true); |
| } |
| |
| @Test |
| public void testMissedUpdatesAroundCachingFailThreshold() |
| { |
| String tableName = "missed_updates_cache_fail"; |
| cluster.schemaChange(withKeyspace("CREATE TABLE %s." + tableName + " (k int PRIMARY KEY, v text)")); |
| |
| // The failure threshold provided is exactly the total number of rows returned |
| // to the coordinator from all replicas and therefore should just warn. |
| testMissedUpdates(tableName, 1, REPLICAS * ROWS, true); |
| |
| try |
| { |
| // The failure threshold provided is one less than the total number of rows returned |
| // to the coordinator from all replicas and therefore should fail the query. |
| testMissedUpdates(tableName, 1, REPLICAS * ROWS - 1, true); |
| } |
| catch (RuntimeException e) |
| { |
| assertEquals(e.getCause().getClass().getName(), OverloadedException.class.getName()); |
| } |
| } |
| |
| private void testMissedUpdates(String tableName, int warnThreshold, int failThreshold, boolean shouldWarn) |
| { |
| cluster.get(1).runOnInstance(() -> StorageService.instance.setCachedReplicaRowsWarnThreshold(warnThreshold)); |
| cluster.get(1).runOnInstance(() -> StorageService.instance.setCachedReplicaRowsFailThreshold(failThreshold)); |
| |
| String fullTableName = KEYSPACE + '.' + tableName; |
| |
| // Case 1: Insert and query rows at ALL to verify base line. |
| for (int i = 0; i < ROWS; i++) |
| { |
| cluster.coordinator(1).execute("INSERT INTO " + fullTableName + "(k, v) VALUES (?, 'old')", ALL, i); |
| } |
| |
| long histogramSampleCount = rowsCachedPerQueryCount(cluster.get(1), tableName); |
| |
| String query = "SELECT * FROM " + fullTableName + " WHERE v = ? LIMIT ? ALLOW FILTERING"; |
| |
| Object[][] initialRows = cluster.coordinator(1).execute(query, ALL, "old", ROWS); |
| assertRows(initialRows, row(1, "old"), row(0, "old"), row(2, "old")); |
| |
| // Make sure only one sample was recorded for the query. |
| assertEquals(histogramSampleCount + 1, rowsCachedPerQueryCount(cluster.get(1), tableName)); |
| |
| // Case 2: Update all rows on only one replica, leaving the entire dataset of the remaining replica out-of-date. |
| updateAllRowsOn(1, fullTableName, "new"); |
| |
| // The replica that missed the results creates a mismatch at every row, and we therefore cache a version |
| // of that row for all replicas. |
| SimpleQueryResult oldResult = cluster.coordinator(1).executeWithResult(query, ALL, "old", ROWS); |
| assertRows(oldResult.toObjectArrays()); |
| verifyWarningState(shouldWarn, oldResult); |
| |
| // We should have made 3 row "completion" requests. |
| assertEquals(ROWS, protectionQueryCount(cluster.get(1), tableName)); |
| |
| // In all cases above, the queries should be caching 1 row per partition per replica, but |
| // 6 for the whole query, given every row is potentially stale. |
| assertEquals(ROWS * REPLICAS, maxRowsCachedPerQuery(cluster.get(1), tableName)); |
| |
| // Make sure only one more sample was recorded for the query. |
| assertEquals(histogramSampleCount + 2, rowsCachedPerQueryCount(cluster.get(1), tableName)); |
| |
| // Case 3: Observe the effects of blocking read-repair. |
| |
| // The previous query peforms a blocking read-repair, which removes replica divergence. This |
| // will only warn, therefore, if the warning threshold is actually below the number of replicas. |
| // (i.e. The row cache counter is decremented/reset as each partition is consumed.) |
| SimpleQueryResult newResult = cluster.coordinator(1).executeWithResult(query, ALL, "new", ROWS); |
| Object[][] newRows = newResult.toObjectArrays(); |
| assertRows(newRows, row(1, "new"), row(0, "new"), row(2, "new")); |
| |
| verifyWarningState(warnThreshold < REPLICAS, newResult); |
| |
| // We still sould only have made 3 row "completion" requests, with no replica divergence in the last query. |
| assertEquals(ROWS, protectionQueryCount(cluster.get(1), tableName)); |
| |
| // With no replica divergence, we only cache a single partition at a time across 2 replicas. |
| assertEquals(REPLICAS, minRowsCachedPerQuery(cluster.get(1), tableName)); |
| |
| // Make sure only one more sample was recorded for the query. |
| assertEquals(histogramSampleCount + 3, rowsCachedPerQueryCount(cluster.get(1), tableName)); |
| |
| // Case 4: Introduce another mismatch by updating all rows on only one replica. |
| |
| updateAllRowsOn(1, fullTableName, "future"); |
| |
| // Another mismatch is introduced, and we once again cache a version of each row during resolution. |
| SimpleQueryResult futureResult = cluster.coordinator(1).executeWithResult(query, ALL, "future", ROWS); |
| Object[][] futureRows = futureResult.toObjectArrays(); |
| assertRows(futureRows, row(1, "future"), row(0, "future"), row(2, "future")); |
| |
| verifyWarningState(shouldWarn, futureResult); |
| |
| // We sould have made 3 more row "completion" requests. |
| assertEquals(ROWS * 2, protectionQueryCount(cluster.get(1), tableName)); |
| |
| // In all cases above, the queries should be caching 1 row per partition, but 6 for the |
| // whole query, given every row is potentially stale. |
| assertEquals(ROWS * REPLICAS, maxRowsCachedPerQuery(cluster.get(1), tableName)); |
| |
| // Make sure only one more sample was recorded for the query. |
| assertEquals(histogramSampleCount + 4, rowsCachedPerQueryCount(cluster.get(1), tableName)); |
| } |
| |
| private void updateAllRowsOn(int node, String table, String value) |
| { |
| for (int i = 0; i < ROWS; i++) |
| { |
| cluster.get(node).executeInternal("UPDATE " + table + " SET v = ? WHERE k = ?", value, i); |
| } |
| } |
| |
| private void verifyWarningState(boolean shouldWarn, SimpleQueryResult futureResult) |
| { |
| List<String> futureWarnings = futureResult.warnings(); |
| assertEquals(shouldWarn, futureWarnings.stream().anyMatch(w -> w.contains("cached_replica_rows_warn_threshold"))); |
| assertEquals(shouldWarn ? 1 : 0, futureWarnings.size()); |
| } |
| |
| private long protectionQueryCount(IInvokableInstance instance, String tableName) |
| { |
| return instance.callOnInstance(() -> Keyspace.open(KEYSPACE) |
| .getColumnFamilyStore(tableName) |
| .metric.replicaFilteringProtectionRequests.getCount()); |
| } |
| |
| private long maxRowsCachedPerQuery(IInvokableInstance instance, String tableName) |
| { |
| return instance.callOnInstance(() -> Keyspace.open(KEYSPACE) |
| .getColumnFamilyStore(tableName) |
| .metric.rfpRowsCachedPerQuery.getSnapshot().getMax()); |
| } |
| |
| private long minRowsCachedPerQuery(IInvokableInstance instance, String tableName) |
| { |
| return instance.callOnInstance(() -> Keyspace.open(KEYSPACE) |
| .getColumnFamilyStore(tableName) |
| .metric.rfpRowsCachedPerQuery.getSnapshot().getMin()); |
| } |
| |
| private long rowsCachedPerQueryCount(IInvokableInstance instance, String tableName) |
| { |
| return instance.callOnInstance(() -> Keyspace.open(KEYSPACE) |
| .getColumnFamilyStore(tableName) |
| .metric.rfpRowsCachedPerQuery.getCount()); |
| } |
| } |