blob: 114dc276a3d0e8b1aa5764844ef5ee0c0c49ceff [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.test.functional;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.test.categories.MiniClusterOnlyTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.common.hash.Hashing;
@Category(MiniClusterOnlyTests.class)
public class ZooMutatorIT extends AccumuloClusterHarness {
/**
* This test uses multiple threads to update the data in a single zookeeper node using
* {@link ZooReaderWriter#mutateOrCreate(String, byte[], org.apache.accumulo.fate.zookeeper.ZooReaderWriter.Mutator)}
* and tries to detect errors and race conditions in that code. Each thread uses
* {@link #nextValue(String)} to compute a new value for the ZK node based on the current value,
* producing a new unique value. Its expected that multiple threads calling
* {@link #nextValue(String)} as previously described should yield the same final value as a
* single thread repeatedly calling {@link #nextValue(String)} the same number of times. There are
* many things that can go wrong in the multithreaded case. This test tries to ensure the
* following are true for the multithreaded case.
*
* <ul>
* <li>All expected updates are made, none were skipped.
* <li>No updates are made twice. For example if two threads wrote the exact same value to the
* node this should be detected by the test. Would expect each update to be unique.
* <li>The updates are made in the same order as a single thread repeatedly calling
* {@link #nextValue(String)}.
* </ul>
*
* <p>
* If any of the expectations above are not met it should cause the hash, count, and/or count
* tracking done in the test to not match the what is computed by the single threaded code at the
* end of the test.
*
* <p>
* A hash and a counter are stored in ZK. The hashes form a chain of hashes as each new value is
* written because its a hash of the previous value. The chain of hashes is useful for detecting
* missing and out of order updates, but not duplicates. The counter and associated map that
* tracks which counts were seen is useful for detecting missing and duplicate updates. The
* counter is also used to weakly check for out of order updates, but the chain of hashes provides
* a much strong check for this.
*/
@Test
public void concurrentMutatorTest() throws Exception {
try (var client = Accumulo.newClient().from(getClientProps()).build();
var context = (ClientContext) client) {
String secret = cluster.getSiteConfiguration().get(Property.INSTANCE_SECRET);
ZooReaderWriter zk = new ZooReaderWriter(context.getZooKeepers(),
context.getZooKeepersSessionTimeOut(), secret);
var executor = Executors.newFixedThreadPool(16);
String initialData = hash("Accumulo Zookeeper Mutator test data") + " 0";
List<Future<?>> futures = new ArrayList<>();
// This map is used to ensure multiple threads do not successfully write the same value and no
// values are skipped. The hash in the value also verifies similar things in a different way.
ConcurrentHashMap<Integer,Integer> countCounts = new ConcurrentHashMap<>();
for (int i = 0; i < 16; i++) {
futures.add(executor.submit(() -> {
try {
int count = -1;
while (count < 200) {
byte[] val =
zk.mutateOrCreate("/test-zm", initialData.getBytes(UTF_8), this::nextValue);
int nextCount = getCount(val);
assertTrue("nextCount <= count " + nextCount + " " + count, nextCount > count);
count = nextCount;
countCounts.merge(count, 1, Integer::sum);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}));
}
// wait and check for errors in background threads
for (Future<?> future : futures) {
future.get();
}
executor.shutdown();
byte[] actual = zk.getData("/test-zm");
int settledCount = getCount(actual);
assertTrue(settledCount >= 200);
String expected = initialData;
assertEquals(1, (int) countCounts.get(0));
for (int i = 1; i <= settledCount; i++) {
assertEquals(1, (int) countCounts.get(i));
expected = nextValue(expected);
}
assertEquals(settledCount + 1, countCounts.size());
assertEquals(expected, new String(actual, UTF_8));
}
}
private String hash(String data) {
return Hashing.sha256().hashString(data, UTF_8).toString();
}
private String nextValue(String currString) {
String[] tokens = currString.split(" ");
String currHash = tokens[0];
int count = Integer.parseInt(tokens[1]);
return (hash(currHash) + " " + (count + 1));
}
private byte[] nextValue(byte[] curr) {
return nextValue(new String(curr, UTF_8)).getBytes(UTF_8);
}
private int getCount(byte[] val) {
return Integer.parseInt(new String(val, UTF_8).split(" ")[1]);
}
}