blob: 79129db30e7b32c44de8964b9c14e8f67cc3c1be [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.ClientSideIteratorScanner;
import org.apache.accumulo.core.client.IsolatedScanner;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.SampleNotPresentException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.sample.RowSampler;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.OfflineScanner;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.WrappingIterator;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.junit.Test;
import com.google.common.collect.Iterables;
public class SampleIT extends AccumuloClusterHarness {
private static final Map<String,String> OPTIONS_1 =
Map.of("hasher", "murmur3_32", "modulus", "1009");
private static final Map<String,String> OPTIONS_2 =
Map.of("hasher", "murmur3_32", "modulus", "997");
private static final SamplerConfiguration SC1 =
new SamplerConfiguration(RowSampler.class.getName()).setOptions(OPTIONS_1);
private static final SamplerConfiguration SC2 =
new SamplerConfiguration(RowSampler.class.getName()).setOptions(OPTIONS_2);
public static class IteratorThatUsesSample extends WrappingIterator {
private SortedKeyValueIterator<Key,Value> sampleDC;
private boolean hasTop;
@Override
public boolean hasTop() {
return hasTop && super.hasTop();
}
@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
throws IOException {
int sampleCount = 0;
sampleDC.seek(range, columnFamilies, inclusive);
while (sampleDC.hasTop()) {
sampleCount++;
sampleDC.next();
}
if (sampleCount < 10) {
hasTop = true;
super.seek(range, columnFamilies, inclusive);
} else {
// its too much data
hasTop = false;
}
}
@Override
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
IteratorEnvironment env) throws IOException {
super.init(source, options, env);
IteratorEnvironment sampleEnv = env.cloneWithSamplingEnabled();
sampleDC = source.deepCopy(sampleEnv);
}
}
@Test
public void testBasic() throws Exception {
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
String tableName = getUniqueNames(1)[0];
String clone = tableName + "_clone";
client.tableOperations().create(tableName, new NewTableConfiguration().enableSampling(SC1));
BatchWriter bw = client.createBatchWriter(tableName, new BatchWriterConfig());
TreeMap<Key,Value> expected = new TreeMap<>();
String someRow = writeData(bw, SC1, expected);
assertEquals(20, expected.size());
Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY);
Scanner isoScanner =
new IsolatedScanner(client.createScanner(tableName, Authorizations.EMPTY));
Scanner csiScanner =
new ClientSideIteratorScanner(client.createScanner(tableName, Authorizations.EMPTY));
scanner.setSamplerConfiguration(SC1);
csiScanner.setSamplerConfiguration(SC1);
isoScanner.setSamplerConfiguration(SC1);
isoScanner.setBatchSize(10);
try (BatchScanner bScanner = client.createBatchScanner(tableName)) {
bScanner.setSamplerConfiguration(SC1);
bScanner.setRanges(Arrays.asList(new Range()));
check(expected, scanner, bScanner, isoScanner, csiScanner);
client.tableOperations().flush(tableName, null, null, true);
Scanner oScanner = newOfflineScanner(client, tableName, clone, SC1);
check(expected, scanner, bScanner, isoScanner, csiScanner, oScanner);
// ensure non sample data can be scanned after scanning sample data
for (ScannerBase sb : Arrays.asList(scanner, bScanner, isoScanner, csiScanner, oScanner)) {
sb.clearSamplerConfiguration();
assertEquals(20000, Iterables.size(sb));
sb.setSamplerConfiguration(SC1);
}
expected.keySet().removeIf(k -> k.getRow().toString().equals(someRow));
expected.put(new Key(someRow, "cf1", "cq1", 8), new Value("42"));
expected.put(new Key(someRow, "cf1", "cq3", 8), new Value("suprise"));
Mutation m = new Mutation(someRow);
m.put("cf1", "cq1", 8, "42");
m.putDelete("cf1", "cq2", 8);
m.put("cf1", "cq3", 8, "suprise");
bw.addMutation(m);
bw.close();
check(expected, scanner, bScanner, isoScanner, csiScanner);
client.tableOperations().flush(tableName, null, null, true);
oScanner = newOfflineScanner(client, tableName, clone, SC1);
check(expected, scanner, bScanner, isoScanner, csiScanner, oScanner);
scanner.setRange(new Range(someRow));
isoScanner.setRange(new Range(someRow));
csiScanner.setRange(new Range(someRow));
oScanner.setRange(new Range(someRow));
bScanner.setRanges(Arrays.asList(new Range(someRow)));
expected.clear();
expected.put(new Key(someRow, "cf1", "cq1", 8), new Value("42"));
expected.put(new Key(someRow, "cf1", "cq3", 8), new Value("suprise"));
check(expected, scanner, bScanner, isoScanner, csiScanner, oScanner);
}
}
}
private Scanner newOfflineScanner(AccumuloClient client, String tableName, String clone,
SamplerConfiguration sc) throws Exception {
if (client.tableOperations().exists(clone)) {
client.tableOperations().delete(clone);
}
Map<String,String> em = Collections.emptyMap();
Set<String> es = Collections.emptySet();
client.tableOperations().clone(tableName, clone, false, em, es);
client.tableOperations().offline(clone, true);
TableId cloneID = TableId.of(client.tableOperations().tableIdMap().get(clone));
OfflineScanner oScanner =
new OfflineScanner((ClientContext) client, cloneID, Authorizations.EMPTY);
if (sc != null) {
oScanner.setSamplerConfiguration(sc);
}
return oScanner;
}
private void updateExpected(SamplerConfiguration sc, TreeMap<Key,Value> expected) {
expected.clear();
RowSampler sampler = new RowSampler();
sampler.init(sc);
for (int i = 0; i < 10000; i++) {
String row = String.format("r_%06d", i);
Key k1 = new Key(row, "cf1", "cq1", 7);
if (sampler.accept(k1)) {
expected.put(k1, new Value("" + i));
}
Key k2 = new Key(row, "cf1", "cq2", 7);
if (sampler.accept(k2)) {
expected.put(k2, new Value("" + (100000000 - i)));
}
}
}
private String writeData(BatchWriter bw, SamplerConfiguration sc, TreeMap<Key,Value> expected)
throws MutationsRejectedException {
int count = 0;
String someRow = null;
RowSampler sampler = new RowSampler();
sampler.init(sc);
for (int i = 0; i < 10000; i++) {
String row = String.format("r_%06d", i);
Mutation m = new Mutation(row);
m.put("cf1", "cq1", 7, "" + i);
m.put("cf1", "cq2", 7, "" + (100000000 - i));
bw.addMutation(m);
Key k1 = new Key(row, "cf1", "cq1", 7);
if (sampler.accept(k1)) {
expected.put(k1, new Value("" + i));
count++;
if (count == 5) {
someRow = row;
}
}
Key k2 = new Key(row, "cf1", "cq2", 7);
if (sampler.accept(k2)) {
expected.put(k2, new Value("" + (100000000 - i)));
}
}
bw.flush();
return someRow;
}
private int countEntries(Iterable<Entry<Key,Value>> scanner) {
return Iterables.size(scanner);
}
private void setRange(Range range, List<? extends ScannerBase> scanners) {
for (ScannerBase s : scanners) {
if (s instanceof Scanner) {
((Scanner) s).setRange(range);
} else {
((BatchScanner) s).setRanges(Collections.singleton(range));
}
}
}
@Test
public void testIterator() throws Exception {
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
String tableName = getUniqueNames(1)[0];
String clone = tableName + "_clone";
client.tableOperations().create(tableName, new NewTableConfiguration().enableSampling(SC1));
BatchWriter bw = client.createBatchWriter(tableName, new BatchWriterConfig());
TreeMap<Key,Value> expected = new TreeMap<>();
writeData(bw, SC1, expected);
ArrayList<Key> keys = new ArrayList<>(expected.keySet());
Range range1 = new Range(keys.get(6), true, keys.get(11), true);
Scanner scanner = null;
Scanner isoScanner = null;
ClientSideIteratorScanner csiScanner = null;
BatchScanner bScanner = null;
Scanner oScanner = null;
try {
scanner = client.createScanner(tableName, Authorizations.EMPTY);
isoScanner = new IsolatedScanner(client.createScanner(tableName, Authorizations.EMPTY));
csiScanner =
new ClientSideIteratorScanner(client.createScanner(tableName, Authorizations.EMPTY));
bScanner = client.createBatchScanner(tableName, Authorizations.EMPTY, 2);
csiScanner.setIteratorSamplerConfiguration(SC1);
List<? extends ScannerBase> scanners =
Arrays.asList(scanner, isoScanner, bScanner, csiScanner);
for (ScannerBase s : scanners) {
s.addScanIterator(new IteratorSetting(100, IteratorThatUsesSample.class));
}
// the iterator should see less than 10 entries in sample data, and return data
setRange(range1, scanners);
for (ScannerBase s : scanners) {
assertEquals(2954, countEntries(s));
}
Range range2 = new Range(keys.get(5), true, keys.get(18), true);
setRange(range2, scanners);
// the iterator should see more than 10 entries in sample data, and return no data
for (ScannerBase s : scanners) {
assertEquals(0, countEntries(s));
}
// flush an rerun same test against files
client.tableOperations().flush(tableName, null, null, true);
oScanner = newOfflineScanner(client, tableName, clone, null);
oScanner.addScanIterator(new IteratorSetting(100, IteratorThatUsesSample.class));
scanners = Arrays.asList(scanner, isoScanner, bScanner, csiScanner, oScanner);
setRange(range1, scanners);
for (ScannerBase s : scanners) {
assertEquals(2954, countEntries(s));
}
setRange(range2, scanners);
for (ScannerBase s : scanners) {
assertEquals(0, countEntries(s));
}
updateSamplingConfig(client, tableName, SC2);
csiScanner.setIteratorSamplerConfiguration(SC2);
oScanner = newOfflineScanner(client, tableName, clone, null);
oScanner.addScanIterator(new IteratorSetting(100, IteratorThatUsesSample.class));
scanners = Arrays.asList(scanner, isoScanner, bScanner, csiScanner, oScanner);
for (ScannerBase s : scanners) {
try {
countEntries(s);
fail("Expected SampleNotPresentException, but it did not happen : "
+ s.getClass().getSimpleName());
} catch (SampleNotPresentException e) {
}
}
} finally {
if (scanner != null) {
scanner.close();
}
if (bScanner != null) {
bScanner.close();
}
if (isoScanner != null) {
isoScanner.close();
}
if (csiScanner != null) {
csiScanner.close();
}
if (oScanner != null) {
oScanner.close();
}
}
}
}
private void setSamplerConfig(SamplerConfiguration sc, ScannerBase... scanners) {
for (ScannerBase s : scanners) {
s.setSamplerConfiguration(sc);
}
}
@Test
public void testSampleNotPresent() throws Exception {
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
String tableName = getUniqueNames(1)[0];
String clone = tableName + "_clone";
client.tableOperations().create(tableName);
TreeMap<Key,Value> expected = new TreeMap<>();
try (BatchWriter bw = client.createBatchWriter(tableName, new BatchWriterConfig())) {
writeData(bw, SC1, expected);
}
Scanner scanner = client.createScanner(tableName);
Scanner isoScanner = new IsolatedScanner(client.createScanner(tableName));
isoScanner.setBatchSize(10);
Scanner csiScanner = new ClientSideIteratorScanner(client.createScanner(tableName));
try (BatchScanner bScanner = client.createBatchScanner(tableName)) {
bScanner.setRanges(Arrays.asList(new Range()));
// ensure sample not present exception occurs when sampling is not configured
assertSampleNotPresent(SC1, scanner, isoScanner, bScanner, csiScanner);
client.tableOperations().flush(tableName, null, null, true);
Scanner oScanner = newOfflineScanner(client, tableName, clone, SC1);
assertSampleNotPresent(SC1, scanner, isoScanner, bScanner, csiScanner, oScanner);
// configure sampling, however there exist an rfile w/o sample data... so should still see
// sample not present exception
updateSamplingConfig(client, tableName, SC1);
// create clone with new config
oScanner = newOfflineScanner(client, tableName, clone, SC1);
assertSampleNotPresent(SC1, scanner, isoScanner, bScanner, csiScanner, oScanner);
// create rfile with sample data present
client.tableOperations().compact(tableName, new CompactionConfig().setWait(true));
// should be able to scan sample now
oScanner = newOfflineScanner(client, tableName, clone, SC1);
setSamplerConfig(SC1, scanner, csiScanner, isoScanner, bScanner, oScanner);
check(expected, scanner, isoScanner, bScanner, csiScanner, oScanner);
// change sampling config
updateSamplingConfig(client, tableName, SC2);
// create clone with new config
oScanner = newOfflineScanner(client, tableName, clone, SC2);
// rfile should have different sample config than table, and scan should not work
assertSampleNotPresent(SC2, scanner, isoScanner, bScanner, csiScanner, oScanner);
// create rfile that has same sample data as table config
client.tableOperations().compact(tableName, new CompactionConfig().setWait(true));
// should be able to scan sample now
updateExpected(SC2, expected);
oScanner = newOfflineScanner(client, tableName, clone, SC2);
setSamplerConfig(SC2, scanner, csiScanner, isoScanner, bScanner, oScanner);
check(expected, scanner, isoScanner, bScanner, csiScanner, oScanner);
}
}
}
private void updateSamplingConfig(AccumuloClient client, String tableName,
SamplerConfiguration sc)
throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
client.tableOperations().setSamplerConfiguration(tableName, sc);
// wait for for config change
client.tableOperations().offline(tableName, true);
client.tableOperations().online(tableName, true);
}
private void assertSampleNotPresent(SamplerConfiguration sc, ScannerBase... scanners) {
for (ScannerBase scanner : scanners) {
SamplerConfiguration csc = scanner.getSamplerConfiguration();
scanner.setSamplerConfiguration(sc);
try {
for (Entry<Key,Value> entry : scanner) {
entry.getKey();
}
fail("Expected SampleNotPresentException, but it did not happen : "
+ scanner.getClass().getSimpleName());
} catch (SampleNotPresentException e) {
}
scanner.clearSamplerConfiguration();
for (Entry<Key,Value> entry : scanner) {
entry.getKey();
}
if (csc == null) {
scanner.clearSamplerConfiguration();
} else {
scanner.setSamplerConfiguration(csc);
}
}
}
private void check(TreeMap<Key,Value> expected, ScannerBase... scanners) {
TreeMap<Key,Value> actual = new TreeMap<>();
for (ScannerBase s : scanners) {
actual.clear();
for (Entry<Key,Value> entry : s) {
actual.put(entry.getKey(), entry.getValue());
}
assertEquals(String.format("Saw %d instead of %d entries using %s", actual.size(),
expected.size(), s.getClass().getSimpleName()), expected, actual);
}
}
}