| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.accumulo.test; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.fail; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.TreeMap; |
| |
| import org.apache.accumulo.core.client.Accumulo; |
| import org.apache.accumulo.core.client.AccumuloClient; |
| import org.apache.accumulo.core.client.AccumuloException; |
| import org.apache.accumulo.core.client.AccumuloSecurityException; |
| import org.apache.accumulo.core.client.BatchScanner; |
| import org.apache.accumulo.core.client.BatchWriter; |
| import org.apache.accumulo.core.client.BatchWriterConfig; |
| import org.apache.accumulo.core.client.ClientSideIteratorScanner; |
| import org.apache.accumulo.core.client.IsolatedScanner; |
| import org.apache.accumulo.core.client.IteratorSetting; |
| import org.apache.accumulo.core.client.MutationsRejectedException; |
| import org.apache.accumulo.core.client.SampleNotPresentException; |
| import org.apache.accumulo.core.client.Scanner; |
| import org.apache.accumulo.core.client.ScannerBase; |
| import org.apache.accumulo.core.client.TableNotFoundException; |
| import org.apache.accumulo.core.client.admin.CompactionConfig; |
| import org.apache.accumulo.core.client.admin.NewTableConfiguration; |
| import org.apache.accumulo.core.client.sample.RowSampler; |
| import org.apache.accumulo.core.client.sample.SamplerConfiguration; |
| import org.apache.accumulo.core.clientImpl.ClientContext; |
| import org.apache.accumulo.core.clientImpl.OfflineScanner; |
| import org.apache.accumulo.core.data.ByteSequence; |
| import org.apache.accumulo.core.data.Key; |
| import org.apache.accumulo.core.data.Mutation; |
| import org.apache.accumulo.core.data.Range; |
| import org.apache.accumulo.core.data.TableId; |
| import org.apache.accumulo.core.data.Value; |
| import org.apache.accumulo.core.iterators.IteratorEnvironment; |
| import org.apache.accumulo.core.iterators.SortedKeyValueIterator; |
| import org.apache.accumulo.core.iterators.WrappingIterator; |
| import org.apache.accumulo.core.security.Authorizations; |
| import org.apache.accumulo.harness.AccumuloClusterHarness; |
| import org.junit.Test; |
| |
| import com.google.common.collect.Iterables; |
| |
| public class SampleIT extends AccumuloClusterHarness { |
| |
| private static final Map<String,String> OPTIONS_1 = |
| Map.of("hasher", "murmur3_32", "modulus", "1009"); |
| private static final Map<String,String> OPTIONS_2 = |
| Map.of("hasher", "murmur3_32", "modulus", "997"); |
| |
| private static final SamplerConfiguration SC1 = |
| new SamplerConfiguration(RowSampler.class.getName()).setOptions(OPTIONS_1); |
| private static final SamplerConfiguration SC2 = |
| new SamplerConfiguration(RowSampler.class.getName()).setOptions(OPTIONS_2); |
| |
| public static class IteratorThatUsesSample extends WrappingIterator { |
| private SortedKeyValueIterator<Key,Value> sampleDC; |
| private boolean hasTop; |
| |
| @Override |
| public boolean hasTop() { |
| return hasTop && super.hasTop(); |
| } |
| |
| @Override |
| public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) |
| throws IOException { |
| |
| int sampleCount = 0; |
| sampleDC.seek(range, columnFamilies, inclusive); |
| |
| while (sampleDC.hasTop()) { |
| sampleCount++; |
| sampleDC.next(); |
| } |
| |
| if (sampleCount < 10) { |
| hasTop = true; |
| super.seek(range, columnFamilies, inclusive); |
| } else { |
| // its too much data |
| hasTop = false; |
| } |
| } |
| |
| @Override |
| public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, |
| IteratorEnvironment env) throws IOException { |
| super.init(source, options, env); |
| |
| IteratorEnvironment sampleEnv = env.cloneWithSamplingEnabled(); |
| |
| sampleDC = source.deepCopy(sampleEnv); |
| } |
| } |
| |
| @Test |
| public void testBasic() throws Exception { |
| |
| try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { |
| String tableName = getUniqueNames(1)[0]; |
| String clone = tableName + "_clone"; |
| |
| client.tableOperations().create(tableName, new NewTableConfiguration().enableSampling(SC1)); |
| |
| BatchWriter bw = client.createBatchWriter(tableName, new BatchWriterConfig()); |
| |
| TreeMap<Key,Value> expected = new TreeMap<>(); |
| String someRow = writeData(bw, SC1, expected); |
| assertEquals(20, expected.size()); |
| |
| Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY); |
| Scanner isoScanner = |
| new IsolatedScanner(client.createScanner(tableName, Authorizations.EMPTY)); |
| Scanner csiScanner = |
| new ClientSideIteratorScanner(client.createScanner(tableName, Authorizations.EMPTY)); |
| scanner.setSamplerConfiguration(SC1); |
| csiScanner.setSamplerConfiguration(SC1); |
| isoScanner.setSamplerConfiguration(SC1); |
| isoScanner.setBatchSize(10); |
| |
| try (BatchScanner bScanner = client.createBatchScanner(tableName)) { |
| bScanner.setSamplerConfiguration(SC1); |
| bScanner.setRanges(Arrays.asList(new Range())); |
| |
| check(expected, scanner, bScanner, isoScanner, csiScanner); |
| |
| client.tableOperations().flush(tableName, null, null, true); |
| |
| Scanner oScanner = newOfflineScanner(client, tableName, clone, SC1); |
| check(expected, scanner, bScanner, isoScanner, csiScanner, oScanner); |
| |
| // ensure non sample data can be scanned after scanning sample data |
| for (ScannerBase sb : Arrays.asList(scanner, bScanner, isoScanner, csiScanner, oScanner)) { |
| sb.clearSamplerConfiguration(); |
| assertEquals(20000, Iterables.size(sb)); |
| sb.setSamplerConfiguration(SC1); |
| } |
| |
| expected.keySet().removeIf(k -> k.getRow().toString().equals(someRow)); |
| |
| expected.put(new Key(someRow, "cf1", "cq1", 8), new Value("42")); |
| expected.put(new Key(someRow, "cf1", "cq3", 8), new Value("suprise")); |
| |
| Mutation m = new Mutation(someRow); |
| |
| m.put("cf1", "cq1", 8, "42"); |
| m.putDelete("cf1", "cq2", 8); |
| m.put("cf1", "cq3", 8, "suprise"); |
| |
| bw.addMutation(m); |
| bw.close(); |
| |
| check(expected, scanner, bScanner, isoScanner, csiScanner); |
| |
| client.tableOperations().flush(tableName, null, null, true); |
| |
| oScanner = newOfflineScanner(client, tableName, clone, SC1); |
| check(expected, scanner, bScanner, isoScanner, csiScanner, oScanner); |
| |
| scanner.setRange(new Range(someRow)); |
| isoScanner.setRange(new Range(someRow)); |
| csiScanner.setRange(new Range(someRow)); |
| oScanner.setRange(new Range(someRow)); |
| bScanner.setRanges(Arrays.asList(new Range(someRow))); |
| |
| expected.clear(); |
| |
| expected.put(new Key(someRow, "cf1", "cq1", 8), new Value("42")); |
| expected.put(new Key(someRow, "cf1", "cq3", 8), new Value("suprise")); |
| |
| check(expected, scanner, bScanner, isoScanner, csiScanner, oScanner); |
| } |
| } |
| } |
| |
| private Scanner newOfflineScanner(AccumuloClient client, String tableName, String clone, |
| SamplerConfiguration sc) throws Exception { |
| if (client.tableOperations().exists(clone)) { |
| client.tableOperations().delete(clone); |
| } |
| Map<String,String> em = Collections.emptyMap(); |
| Set<String> es = Collections.emptySet(); |
| client.tableOperations().clone(tableName, clone, false, em, es); |
| client.tableOperations().offline(clone, true); |
| TableId cloneID = TableId.of(client.tableOperations().tableIdMap().get(clone)); |
| OfflineScanner oScanner = |
| new OfflineScanner((ClientContext) client, cloneID, Authorizations.EMPTY); |
| if (sc != null) { |
| oScanner.setSamplerConfiguration(sc); |
| } |
| return oScanner; |
| } |
| |
| private void updateExpected(SamplerConfiguration sc, TreeMap<Key,Value> expected) { |
| expected.clear(); |
| |
| RowSampler sampler = new RowSampler(); |
| sampler.init(sc); |
| |
| for (int i = 0; i < 10000; i++) { |
| String row = String.format("r_%06d", i); |
| |
| Key k1 = new Key(row, "cf1", "cq1", 7); |
| if (sampler.accept(k1)) { |
| expected.put(k1, new Value("" + i)); |
| } |
| |
| Key k2 = new Key(row, "cf1", "cq2", 7); |
| if (sampler.accept(k2)) { |
| expected.put(k2, new Value("" + (100000000 - i))); |
| } |
| } |
| } |
| |
| private String writeData(BatchWriter bw, SamplerConfiguration sc, TreeMap<Key,Value> expected) |
| throws MutationsRejectedException { |
| int count = 0; |
| String someRow = null; |
| |
| RowSampler sampler = new RowSampler(); |
| sampler.init(sc); |
| |
| for (int i = 0; i < 10000; i++) { |
| String row = String.format("r_%06d", i); |
| Mutation m = new Mutation(row); |
| |
| m.put("cf1", "cq1", 7, "" + i); |
| m.put("cf1", "cq2", 7, "" + (100000000 - i)); |
| |
| bw.addMutation(m); |
| |
| Key k1 = new Key(row, "cf1", "cq1", 7); |
| if (sampler.accept(k1)) { |
| expected.put(k1, new Value("" + i)); |
| count++; |
| if (count == 5) { |
| someRow = row; |
| } |
| } |
| |
| Key k2 = new Key(row, "cf1", "cq2", 7); |
| if (sampler.accept(k2)) { |
| expected.put(k2, new Value("" + (100000000 - i))); |
| } |
| } |
| |
| bw.flush(); |
| |
| return someRow; |
| } |
| |
| private int countEntries(Iterable<Entry<Key,Value>> scanner) { |
| return Iterables.size(scanner); |
| } |
| |
| private void setRange(Range range, List<? extends ScannerBase> scanners) { |
| for (ScannerBase s : scanners) { |
| if (s instanceof Scanner) { |
| ((Scanner) s).setRange(range); |
| } else { |
| ((BatchScanner) s).setRanges(Collections.singleton(range)); |
| } |
| |
| } |
| } |
| |
| @Test |
| public void testIterator() throws Exception { |
| try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { |
| String tableName = getUniqueNames(1)[0]; |
| String clone = tableName + "_clone"; |
| |
| client.tableOperations().create(tableName, new NewTableConfiguration().enableSampling(SC1)); |
| |
| BatchWriter bw = client.createBatchWriter(tableName, new BatchWriterConfig()); |
| |
| TreeMap<Key,Value> expected = new TreeMap<>(); |
| writeData(bw, SC1, expected); |
| |
| ArrayList<Key> keys = new ArrayList<>(expected.keySet()); |
| |
| Range range1 = new Range(keys.get(6), true, keys.get(11), true); |
| |
| Scanner scanner = null; |
| Scanner isoScanner = null; |
| ClientSideIteratorScanner csiScanner = null; |
| BatchScanner bScanner = null; |
| Scanner oScanner = null; |
| try { |
| scanner = client.createScanner(tableName, Authorizations.EMPTY); |
| isoScanner = new IsolatedScanner(client.createScanner(tableName, Authorizations.EMPTY)); |
| csiScanner = |
| new ClientSideIteratorScanner(client.createScanner(tableName, Authorizations.EMPTY)); |
| bScanner = client.createBatchScanner(tableName, Authorizations.EMPTY, 2); |
| |
| csiScanner.setIteratorSamplerConfiguration(SC1); |
| |
| List<? extends ScannerBase> scanners = |
| Arrays.asList(scanner, isoScanner, bScanner, csiScanner); |
| |
| for (ScannerBase s : scanners) { |
| s.addScanIterator(new IteratorSetting(100, IteratorThatUsesSample.class)); |
| } |
| |
| // the iterator should see less than 10 entries in sample data, and return data |
| setRange(range1, scanners); |
| for (ScannerBase s : scanners) { |
| assertEquals(2954, countEntries(s)); |
| } |
| |
| Range range2 = new Range(keys.get(5), true, keys.get(18), true); |
| setRange(range2, scanners); |
| |
| // the iterator should see more than 10 entries in sample data, and return no data |
| for (ScannerBase s : scanners) { |
| assertEquals(0, countEntries(s)); |
| } |
| |
| // flush an rerun same test against files |
| client.tableOperations().flush(tableName, null, null, true); |
| |
| oScanner = newOfflineScanner(client, tableName, clone, null); |
| oScanner.addScanIterator(new IteratorSetting(100, IteratorThatUsesSample.class)); |
| scanners = Arrays.asList(scanner, isoScanner, bScanner, csiScanner, oScanner); |
| |
| setRange(range1, scanners); |
| for (ScannerBase s : scanners) { |
| assertEquals(2954, countEntries(s)); |
| } |
| |
| setRange(range2, scanners); |
| for (ScannerBase s : scanners) { |
| assertEquals(0, countEntries(s)); |
| } |
| |
| updateSamplingConfig(client, tableName, SC2); |
| |
| csiScanner.setIteratorSamplerConfiguration(SC2); |
| |
| oScanner = newOfflineScanner(client, tableName, clone, null); |
| oScanner.addScanIterator(new IteratorSetting(100, IteratorThatUsesSample.class)); |
| scanners = Arrays.asList(scanner, isoScanner, bScanner, csiScanner, oScanner); |
| |
| for (ScannerBase s : scanners) { |
| try { |
| countEntries(s); |
| fail("Expected SampleNotPresentException, but it did not happen : " |
| + s.getClass().getSimpleName()); |
| } catch (SampleNotPresentException e) { |
| |
| } |
| } |
| } finally { |
| if (scanner != null) { |
| scanner.close(); |
| } |
| if (bScanner != null) { |
| bScanner.close(); |
| } |
| if (isoScanner != null) { |
| isoScanner.close(); |
| } |
| if (csiScanner != null) { |
| csiScanner.close(); |
| } |
| if (oScanner != null) { |
| oScanner.close(); |
| } |
| } |
| } |
| } |
| |
| private void setSamplerConfig(SamplerConfiguration sc, ScannerBase... scanners) { |
| for (ScannerBase s : scanners) { |
| s.setSamplerConfiguration(sc); |
| } |
| } |
| |
| @Test |
| public void testSampleNotPresent() throws Exception { |
| |
| try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { |
| String tableName = getUniqueNames(1)[0]; |
| String clone = tableName + "_clone"; |
| |
| client.tableOperations().create(tableName); |
| |
| TreeMap<Key,Value> expected = new TreeMap<>(); |
| try (BatchWriter bw = client.createBatchWriter(tableName, new BatchWriterConfig())) { |
| writeData(bw, SC1, expected); |
| } |
| |
| Scanner scanner = client.createScanner(tableName); |
| Scanner isoScanner = new IsolatedScanner(client.createScanner(tableName)); |
| isoScanner.setBatchSize(10); |
| Scanner csiScanner = new ClientSideIteratorScanner(client.createScanner(tableName)); |
| try (BatchScanner bScanner = client.createBatchScanner(tableName)) { |
| bScanner.setRanges(Arrays.asList(new Range())); |
| |
| // ensure sample not present exception occurs when sampling is not configured |
| assertSampleNotPresent(SC1, scanner, isoScanner, bScanner, csiScanner); |
| |
| client.tableOperations().flush(tableName, null, null, true); |
| |
| Scanner oScanner = newOfflineScanner(client, tableName, clone, SC1); |
| assertSampleNotPresent(SC1, scanner, isoScanner, bScanner, csiScanner, oScanner); |
| |
| // configure sampling, however there exist an rfile w/o sample data... so should still see |
| // sample not present exception |
| |
| updateSamplingConfig(client, tableName, SC1); |
| |
| // create clone with new config |
| oScanner = newOfflineScanner(client, tableName, clone, SC1); |
| |
| assertSampleNotPresent(SC1, scanner, isoScanner, bScanner, csiScanner, oScanner); |
| |
| // create rfile with sample data present |
| client.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); |
| |
| // should be able to scan sample now |
| oScanner = newOfflineScanner(client, tableName, clone, SC1); |
| setSamplerConfig(SC1, scanner, csiScanner, isoScanner, bScanner, oScanner); |
| check(expected, scanner, isoScanner, bScanner, csiScanner, oScanner); |
| |
| // change sampling config |
| updateSamplingConfig(client, tableName, SC2); |
| |
| // create clone with new config |
| oScanner = newOfflineScanner(client, tableName, clone, SC2); |
| |
| // rfile should have different sample config than table, and scan should not work |
| assertSampleNotPresent(SC2, scanner, isoScanner, bScanner, csiScanner, oScanner); |
| |
| // create rfile that has same sample data as table config |
| client.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); |
| |
| // should be able to scan sample now |
| updateExpected(SC2, expected); |
| oScanner = newOfflineScanner(client, tableName, clone, SC2); |
| setSamplerConfig(SC2, scanner, csiScanner, isoScanner, bScanner, oScanner); |
| check(expected, scanner, isoScanner, bScanner, csiScanner, oScanner); |
| } |
| } |
| } |
| |
| private void updateSamplingConfig(AccumuloClient client, String tableName, |
| SamplerConfiguration sc) |
| throws TableNotFoundException, AccumuloException, AccumuloSecurityException { |
| client.tableOperations().setSamplerConfiguration(tableName, sc); |
| // wait for for config change |
| client.tableOperations().offline(tableName, true); |
| client.tableOperations().online(tableName, true); |
| } |
| |
| private void assertSampleNotPresent(SamplerConfiguration sc, ScannerBase... scanners) { |
| |
| for (ScannerBase scanner : scanners) { |
| SamplerConfiguration csc = scanner.getSamplerConfiguration(); |
| |
| scanner.setSamplerConfiguration(sc); |
| |
| try { |
| for (Entry<Key,Value> entry : scanner) { |
| entry.getKey(); |
| } |
| fail("Expected SampleNotPresentException, but it did not happen : " |
| + scanner.getClass().getSimpleName()); |
| } catch (SampleNotPresentException e) { |
| |
| } |
| |
| scanner.clearSamplerConfiguration(); |
| for (Entry<Key,Value> entry : scanner) { |
| entry.getKey(); |
| } |
| |
| if (csc == null) { |
| scanner.clearSamplerConfiguration(); |
| } else { |
| scanner.setSamplerConfiguration(csc); |
| } |
| } |
| } |
| |
| private void check(TreeMap<Key,Value> expected, ScannerBase... scanners) { |
| TreeMap<Key,Value> actual = new TreeMap<>(); |
| for (ScannerBase s : scanners) { |
| actual.clear(); |
| for (Entry<Key,Value> entry : s) { |
| actual.put(entry.getKey(), entry.getValue()); |
| } |
| assertEquals(String.format("Saw %d instead of %d entries using %s", actual.size(), |
| expected.size(), s.getClass().getSimpleName()), expected, actual); |
| } |
| } |
| } |