| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.accumulo.core.client.rfile; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.security.SecureRandom; |
| import java.util.AbstractMap; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Random; |
| import java.util.SortedMap; |
| import java.util.TreeMap; |
| |
| import org.apache.accumulo.core.client.IteratorSetting; |
| import org.apache.accumulo.core.client.Scanner; |
| import org.apache.accumulo.core.client.admin.NewTableConfiguration; |
| import org.apache.accumulo.core.client.sample.RowSampler; |
| import org.apache.accumulo.core.client.sample.SamplerConfiguration; |
| import org.apache.accumulo.core.client.summary.CounterSummary; |
| import org.apache.accumulo.core.client.summary.SummarizerConfiguration; |
| import org.apache.accumulo.core.client.summary.Summary; |
| import org.apache.accumulo.core.client.summary.summarizers.FamilySummarizer; |
| import org.apache.accumulo.core.client.summary.summarizers.VisibilitySummarizer; |
| import org.apache.accumulo.core.conf.DefaultConfiguration; |
| import org.apache.accumulo.core.conf.Property; |
| import org.apache.accumulo.core.data.ArrayByteSequence; |
| import org.apache.accumulo.core.data.ByteSequence; |
| import org.apache.accumulo.core.data.Key; |
| import org.apache.accumulo.core.data.Range; |
| import org.apache.accumulo.core.data.Value; |
| import org.apache.accumulo.core.file.FileOperations; |
| import org.apache.accumulo.core.file.FileSKVIterator; |
| import org.apache.accumulo.core.file.rfile.RFile.Reader; |
| import org.apache.accumulo.core.iterators.user.RegExFilter; |
| import org.apache.accumulo.core.security.Authorizations; |
| import org.apache.accumulo.core.security.crypto.impl.NoCryptoService; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.LocalFileSystem; |
| import org.apache.hadoop.io.Text; |
| import org.junit.Test; |
| |
| import com.google.common.collect.ImmutableMap; |
| |
| import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; |
| |
| public class RFileTest { |
| |
| @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "path is set by test, not user") |
| private String createTmpTestFile() throws IOException { |
| File dir = new File(System.getProperty("user.dir") + "/target/rfile-test"); |
| assertTrue(dir.mkdirs() || dir.isDirectory()); |
| File testFile = File.createTempFile("test", ".rf", dir); |
| assertTrue(testFile.delete() || !testFile.exists()); |
| return testFile.getAbsolutePath(); |
| } |
| |
| String rowStr(int r) { |
| return String.format("%06x", r); |
| } |
| |
| String colStr(int c) { |
| return String.format("%04x", c); |
| } |
| |
| private SortedMap<Key,Value> createTestData(int rows, int families, int qualifiers) { |
| return createTestData(0, rows, 0, families, qualifiers); |
| } |
| |
| private SortedMap<Key,Value> createTestData(int startRow, int rows, int startFamily, int families, |
| int qualifiers) { |
| return createTestData(startRow, rows, startFamily, families, qualifiers, ""); |
| } |
| |
| private SortedMap<Key,Value> createTestData(int startRow, int rows, int startFamily, int families, |
| int qualifiers, String... vis) { |
| TreeMap<Key,Value> testData = new TreeMap<>(); |
| |
| for (int r = 0; r < rows; r++) { |
| String row = rowStr(r + startRow); |
| for (int f = 0; f < families; f++) { |
| String fam = colStr(f + startFamily); |
| for (int q = 0; q < qualifiers; q++) { |
| String qual = colStr(q); |
| for (String v : vis) { |
| Key k = new Key(row, fam, qual, v); |
| testData.put(k, new Value((k.hashCode() + "").getBytes())); |
| } |
| } |
| } |
| } |
| |
| return testData; |
| } |
| |
| private String createRFile(SortedMap<Key,Value> testData) throws Exception { |
| String testFile = createTmpTestFile(); |
| |
| try (RFileWriter writer = RFile.newWriter().to(testFile) |
| .withFileSystem(FileSystem.getLocal(new Configuration())).build()) { |
| writer.append(testData.entrySet()); |
| // TODO ensure compressors are returned |
| } |
| |
| return testFile; |
| } |
| |
| @Test |
| public void testIndependance() throws Exception { |
| // test to ensure two iterators allocated from same RFile scanner are independent. |
| |
| LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); |
| |
| SortedMap<Key,Value> testData = createTestData(10, 10, 10); |
| |
| String testFile = createRFile(testData); |
| |
| Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).build(); |
| Range range1 = Range.exact(rowStr(5)); |
| scanner.setRange(range1); |
| Iterator<Entry<Key,Value>> scnIter1 = scanner.iterator(); |
| Iterator<Entry<Key,Value>> mapIter1 = testData.subMap(range1.getStartKey(), range1.getEndKey()) |
| .entrySet().iterator(); |
| |
| Range range2 = new Range(rowStr(3), true, rowStr(4), true); |
| scanner.setRange(range2); |
| Iterator<Entry<Key,Value>> scnIter2 = scanner.iterator(); |
| Iterator<Entry<Key,Value>> mapIter2 = testData.subMap(range2.getStartKey(), range2.getEndKey()) |
| .entrySet().iterator(); |
| |
| while (scnIter1.hasNext() || scnIter2.hasNext()) { |
| if (scnIter1.hasNext()) { |
| assertTrue(mapIter1.hasNext()); |
| assertEquals(scnIter1.next(), mapIter1.next()); |
| } else { |
| assertFalse(mapIter1.hasNext()); |
| } |
| |
| if (scnIter2.hasNext()) { |
| assertTrue(mapIter2.hasNext()); |
| assertEquals(scnIter2.next(), mapIter2.next()); |
| } else { |
| assertFalse(mapIter2.hasNext()); |
| } |
| } |
| |
| assertFalse(mapIter1.hasNext()); |
| assertFalse(mapIter2.hasNext()); |
| |
| scanner.close(); |
| } |
| |
| SortedMap<Key,Value> toMap(Scanner scanner) { |
| TreeMap<Key,Value> map = new TreeMap<>(); |
| for (Entry<Key,Value> entry : scanner) { |
| map.put(entry.getKey(), entry.getValue()); |
| } |
| return map; |
| } |
| |
| @Test |
| public void testMultipleSources() throws Exception { |
| SortedMap<Key,Value> testData1 = createTestData(10, 10, 10); |
| SortedMap<Key,Value> testData2 = createTestData(0, 10, 0, 10, 10); |
| |
| String testFile1 = createRFile(testData1); |
| String testFile2 = createRFile(testData2); |
| |
| LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); |
| Scanner scanner = RFile.newScanner().from(testFile1, testFile2).withFileSystem(localFs).build(); |
| |
| TreeMap<Key,Value> expected = new TreeMap<>(testData1); |
| expected.putAll(testData2); |
| |
| assertEquals(expected, toMap(scanner)); |
| |
| Range range = new Range(rowStr(3), true, rowStr(14), true); |
| scanner.setRange(range); |
| assertEquals(expected.subMap(range.getStartKey(), range.getEndKey()), toMap(scanner)); |
| |
| scanner.close(); |
| } |
| |
| @Test |
| public void testWriterTableProperties() throws Exception { |
| LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); |
| |
| String testFile = createTmpTestFile(); |
| |
| Map<String,String> props = new HashMap<>(); |
| props.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "1K"); |
| props.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX.getKey(), "1K"); |
| RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs) |
| .withTableProperties(props).build(); |
| |
| SortedMap<Key,Value> testData1 = createTestData(10, 10, 10); |
| writer.append(testData1.entrySet()); |
| writer.close(); |
| |
| Reader reader = getReader(localFs, testFile); |
| FileSKVIterator iiter = reader.getIndex(); |
| |
| int count = 0; |
| while (iiter.hasTop()) { |
| count++; |
| iiter.next(); |
| } |
| |
| // if settings are used then should create multiple index entries |
| assertTrue(count > 10); |
| |
| reader.close(); |
| |
| Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).build(); |
| assertEquals(testData1, toMap(scanner)); |
| scanner.close(); |
| } |
| |
| @Test |
| public void testLocalityGroups() throws Exception { |
| |
| SortedMap<Key,Value> testData1 = createTestData(0, 10, 0, 2, 10); |
| SortedMap<Key,Value> testData2 = createTestData(0, 10, 2, 1, 10); |
| SortedMap<Key,Value> defaultData = createTestData(0, 10, 3, 7, 10); |
| |
| LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); |
| String testFile = createTmpTestFile(); |
| RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build(); |
| |
| writer.startNewLocalityGroup("z", colStr(0), colStr(1)); |
| writer.append(testData1.entrySet()); |
| |
| writer.startNewLocalityGroup("h", colStr(2)); |
| writer.append(testData2.entrySet()); |
| |
| writer.startDefaultLocalityGroup(); |
| writer.append(defaultData.entrySet()); |
| |
| writer.close(); |
| |
| Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).build(); |
| |
| scanner.fetchColumnFamily(new Text(colStr(0))); |
| scanner.fetchColumnFamily(new Text(colStr(1))); |
| assertEquals(testData1, toMap(scanner)); |
| |
| scanner.clearColumns(); |
| scanner.fetchColumnFamily(new Text(colStr(2))); |
| assertEquals(testData2, toMap(scanner)); |
| |
| scanner.clearColumns(); |
| for (int i = 3; i < 10; i++) { |
| scanner.fetchColumnFamily(new Text(colStr(i))); |
| } |
| assertEquals(defaultData, toMap(scanner)); |
| |
| scanner.clearColumns(); |
| assertEquals(createTestData(10, 10, 10), toMap(scanner)); |
| |
| scanner.close(); |
| |
| Reader reader = getReader(localFs, testFile); |
| Map<String,ArrayList<ByteSequence>> lGroups = reader.getLocalityGroupCF(); |
| assertTrue(lGroups.containsKey("z")); |
| assertEquals(2, lGroups.get("z").size()); |
| assertTrue(lGroups.get("z").contains(new ArrayByteSequence(colStr(0)))); |
| assertTrue(lGroups.get("z").contains(new ArrayByteSequence(colStr(1)))); |
| assertTrue(lGroups.containsKey("h")); |
| assertEquals(Arrays.asList(new ArrayByteSequence(colStr(2))), lGroups.get("h")); |
| reader.close(); |
| } |
| |
| @Test |
| public void testIterators() throws Exception { |
| |
| LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); |
| SortedMap<Key,Value> testData = createTestData(10, 10, 10); |
| String testFile = createRFile(testData); |
| |
| IteratorSetting is = new IteratorSetting(50, "regex", RegExFilter.class); |
| RegExFilter.setRegexs(is, ".*00000[78].*", null, null, null, false); |
| |
| Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).build(); |
| scanner.addScanIterator(is); |
| |
| assertEquals(createTestData(7, 2, 0, 10, 10), toMap(scanner)); |
| |
| scanner.close(); |
| } |
| |
| @Test |
| public void testAuths() throws Exception { |
| LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); |
| String testFile = createTmpTestFile(); |
| RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build(); |
| |
| Key k1 = new Key("r1", "f1", "q1", "A&B"); |
| Key k2 = new Key("r1", "f1", "q2", "A"); |
| Key k3 = new Key("r1", "f1", "q3"); |
| |
| Value v1 = new Value("p".getBytes()); |
| Value v2 = new Value("c".getBytes()); |
| Value v3 = new Value("t".getBytes()); |
| |
| writer.append(k1, v1); |
| writer.append(k2, v2); |
| writer.append(k3, v3); |
| writer.close(); |
| |
| Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs) |
| .withAuthorizations(new Authorizations("A")).build(); |
| assertEquals(ImmutableMap.of(k2, v2, k3, v3), toMap(scanner)); |
| assertEquals(new Authorizations("A"), scanner.getAuthorizations()); |
| scanner.close(); |
| |
| scanner = RFile.newScanner().from(testFile).withFileSystem(localFs) |
| .withAuthorizations(new Authorizations("A", "B")).build(); |
| assertEquals(ImmutableMap.of(k1, v1, k2, v2, k3, v3), toMap(scanner)); |
| assertEquals(new Authorizations("A", "B"), scanner.getAuthorizations()); |
| scanner.close(); |
| |
| scanner = RFile.newScanner().from(testFile).withFileSystem(localFs) |
| .withAuthorizations(new Authorizations("B")).build(); |
| assertEquals(ImmutableMap.of(k3, v3), toMap(scanner)); |
| assertEquals(new Authorizations("B"), scanner.getAuthorizations()); |
| scanner.close(); |
| } |
| |
| @Test |
| public void testNoSystemIters() throws Exception { |
| LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); |
| String testFile = createTmpTestFile(); |
| RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build(); |
| |
| Key k1 = new Key("r1", "f1", "q1"); |
| k1.setTimestamp(3); |
| |
| Key k2 = new Key("r1", "f1", "q1"); |
| k2.setTimestamp(6); |
| k2.setDeleted(true); |
| |
| Value v1 = new Value("p".getBytes()); |
| Value v2 = new Value("".getBytes()); |
| |
| writer.append(k2, v2); |
| writer.append(k1, v1); |
| writer.close(); |
| |
| Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).build(); |
| assertFalse(scanner.iterator().hasNext()); |
| scanner.close(); |
| |
| scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).withoutSystemIterators() |
| .build(); |
| assertEquals(ImmutableMap.of(k2, v2, k1, v1), toMap(scanner)); |
| scanner.setRange(new Range("r2")); |
| assertFalse(scanner.iterator().hasNext()); |
| scanner.close(); |
| } |
| |
| @Test |
| public void testBounds() throws Exception { |
| LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); |
| SortedMap<Key,Value> testData = createTestData(10, 10, 10); |
| String testFile = createRFile(testData); |
| |
| // set a lower bound row |
| Range bounds = new Range(rowStr(3), false, null, true); |
| Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).withBounds(bounds) |
| .build(); |
| assertEquals(createTestData(4, 6, 0, 10, 10), toMap(scanner)); |
| scanner.close(); |
| |
| // set an upper bound row |
| bounds = new Range(null, false, rowStr(7), true); |
| scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).withBounds(bounds).build(); |
| assertEquals(createTestData(8, 10, 10), toMap(scanner)); |
| scanner.close(); |
| |
| // set row bounds |
| bounds = new Range(rowStr(3), false, rowStr(7), true); |
| scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).withBounds(bounds).build(); |
| assertEquals(createTestData(4, 4, 0, 10, 10), toMap(scanner)); |
| scanner.close(); |
| |
| // set a row family bound |
| bounds = Range.exact(rowStr(3), colStr(5)); |
| scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).withBounds(bounds).build(); |
| assertEquals(createTestData(3, 1, 5, 1, 10), toMap(scanner)); |
| scanner.close(); |
| } |
| |
| @Test |
| public void testScannerTableProperties() throws Exception { |
| NewTableConfiguration ntc = new NewTableConfiguration(); |
| |
| LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); |
| String testFile = createTmpTestFile(); |
| RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build(); |
| |
| Key k1 = new Key("r1", "f1", "q1"); |
| k1.setTimestamp(3); |
| |
| Key k2 = new Key("r1", "f1", "q1"); |
| k2.setTimestamp(6); |
| |
| Value v1 = new Value("p".getBytes()); |
| Value v2 = new Value("q".getBytes()); |
| |
| writer.append(k2, v2); |
| writer.append(k1, v1); |
| writer.close(); |
| |
| // pass in table config that has versioning iterator configured |
| Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs) |
| .withTableProperties(ntc.getProperties()).build(); |
| assertEquals(ImmutableMap.of(k2, v2), toMap(scanner)); |
| scanner.close(); |
| |
| scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).build(); |
| assertEquals(ImmutableMap.of(k2, v2, k1, v1), toMap(scanner)); |
| scanner.close(); |
| } |
| |
| @Test |
| public void testSampling() throws Exception { |
| |
| SortedMap<Key,Value> testData1 = createTestData(1000, 2, 1); |
| |
| LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); |
| String testFile = createTmpTestFile(); |
| |
| SamplerConfiguration sc = new SamplerConfiguration(RowSampler.class) |
| .setOptions(ImmutableMap.of("hasher", "murmur3_32", "modulus", "19")); |
| |
| RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).withSampler(sc) |
| .build(); |
| writer.append(testData1.entrySet()); |
| writer.close(); |
| |
| Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).build(); |
| scanner.setSamplerConfiguration(sc); |
| |
| RowSampler rowSampler = new RowSampler(); |
| rowSampler.init(sc); |
| |
| SortedMap<Key,Value> sampleData = new TreeMap<>(); |
| for (Entry<Key,Value> e : testData1.entrySet()) { |
| if (rowSampler.accept(e.getKey())) { |
| sampleData.put(e.getKey(), e.getValue()); |
| } |
| } |
| |
| assertTrue(sampleData.size() < testData1.size()); |
| |
| assertEquals(sampleData, toMap(scanner)); |
| |
| scanner.clearSamplerConfiguration(); |
| |
| assertEquals(testData1, toMap(scanner)); |
| |
| } |
| |
| @Test |
| public void testAppendScanner() throws Exception { |
| SortedMap<Key,Value> testData = createTestData(10000, 1, 1); |
| String testFile = createRFile(testData); |
| |
| LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); |
| |
| Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).build(); |
| |
| String testFile2 = createTmpTestFile(); |
| RFileWriter writer = RFile.newWriter().to(testFile2).build(); |
| writer.append(scanner); |
| writer.close(); |
| scanner.close(); |
| |
| scanner = RFile.newScanner().from(testFile2).withFileSystem(localFs).build(); |
| assertEquals(testData, toMap(scanner)); |
| scanner.close(); |
| } |
| |
| @Test |
| public void testCache() throws Exception { |
| SortedMap<Key,Value> testData = createTestData(10000, 1, 1); |
| String testFile = createRFile(testData); |
| |
| LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); |
| Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs) |
| .withIndexCache(1000000).withDataCache(10000000).build(); |
| |
| Random rand = new SecureRandom(); |
| |
| for (int i = 0; i < 100; i++) { |
| int r = rand.nextInt(10000); |
| scanner.setRange(new Range(rowStr(r))); |
| Iterator<Entry<Key,Value>> iter = scanner.iterator(); |
| assertTrue(iter.hasNext()); |
| assertEquals(rowStr(r), iter.next().getKey().getRow().toString()); |
| assertFalse(iter.hasNext()); |
| } |
| |
| scanner.close(); |
| } |
| |
| @Test |
| public void testSummaries() throws Exception { |
| SummarizerConfiguration sc1 = SummarizerConfiguration.builder(VisibilitySummarizer.class) |
| .build(); |
| SummarizerConfiguration sc2 = SummarizerConfiguration.builder(FamilySummarizer.class).build(); |
| |
| LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); |
| String testFile = createTmpTestFile(); |
| |
| SortedMap<Key,Value> testData1 = createTestData(0, 100, 0, 4, 1, "A&B", "A&B&C"); |
| |
| RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs) |
| .withSummarizers(sc1, sc2).build(); |
| writer.append(testData1.entrySet()); |
| writer.close(); |
| |
| // verify summary data |
| Collection<Summary> summaries = RFile.summaries().from(testFile).withFileSystem(localFs).read(); |
| assertEquals(2, summaries.size()); |
| for (Summary summary : summaries) { |
| assertEquals(0, summary.getFileStatistics().getInaccurate()); |
| assertEquals(1, summary.getFileStatistics().getTotal()); |
| String className = summary.getSummarizerConfiguration().getClassName(); |
| CounterSummary counterSummary = new CounterSummary(summary); |
| if (className.equals(FamilySummarizer.class.getName())) { |
| Map<String,Long> counters = counterSummary.getCounters(); |
| Map<String,Long> expected = ImmutableMap.of("0000", 200L, "0001", 200L, "0002", 200L, |
| "0003", 200L); |
| assertEquals(expected, counters); |
| } else if (className.equals(VisibilitySummarizer.class.getName())) { |
| Map<String,Long> counters = counterSummary.getCounters(); |
| Map<String,Long> expected = ImmutableMap.of("A&B", 400L, "A&B&C", 400L); |
| assertEquals(expected, counters); |
| } else { |
| fail("Unexpected classname " + className); |
| } |
| } |
| |
| // check if writing summary data impacted normal rfile functionality |
| Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs) |
| .withAuthorizations(new Authorizations("A", "B", "C")).build(); |
| assertEquals(testData1, toMap(scanner)); |
| scanner.close(); |
| |
| String testFile2 = createTmpTestFile(); |
| SortedMap<Key,Value> testData2 = createTestData(100, 100, 0, 4, 1, "A&B", "A&B&C"); |
| writer = RFile.newWriter().to(testFile2).withFileSystem(localFs).withSummarizers(sc1, sc2) |
| .build(); |
| writer.append(testData2.entrySet()); |
| writer.close(); |
| |
| // verify reading summaries from multiple files works |
| summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).read(); |
| assertEquals(2, summaries.size()); |
| for (Summary summary : summaries) { |
| assertEquals(0, summary.getFileStatistics().getInaccurate()); |
| assertEquals(2, summary.getFileStatistics().getTotal()); |
| String className = summary.getSummarizerConfiguration().getClassName(); |
| CounterSummary counterSummary = new CounterSummary(summary); |
| if (className.equals(FamilySummarizer.class.getName())) { |
| Map<String,Long> counters = counterSummary.getCounters(); |
| Map<String,Long> expected = ImmutableMap.of("0000", 400L, "0001", 400L, "0002", 400L, |
| "0003", 400L); |
| assertEquals(expected, counters); |
| } else if (className.equals(VisibilitySummarizer.class.getName())) { |
| Map<String,Long> counters = counterSummary.getCounters(); |
| Map<String,Long> expected = ImmutableMap.of("A&B", 800L, "A&B&C", 800L); |
| assertEquals(expected, counters); |
| } else { |
| fail("Unexpected classname " + className); |
| } |
| } |
| |
| // verify reading a subset of summaries works |
| summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs) |
| .selectSummaries(sc -> sc.equals(sc1)).read(); |
| checkSummaries(summaries, ImmutableMap.of("A&B", 800L, "A&B&C", 800L), 0); |
| |
| // the following test check boundary conditions for start row and end row |
| summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs) |
| .selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(99)).read(); |
| checkSummaries(summaries, ImmutableMap.of("A&B", 400L, "A&B&C", 400L), 0); |
| |
| summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs) |
| .selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(98)).read(); |
| checkSummaries(summaries, ImmutableMap.of("A&B", 800L, "A&B&C", 800L), 1); |
| |
| summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs) |
| .selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(0)).read(); |
| checkSummaries(summaries, ImmutableMap.of("A&B", 800L, "A&B&C", 800L), 1); |
| |
| summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs) |
| .selectSummaries(sc -> sc.equals(sc1)).startRow("#").read(); |
| checkSummaries(summaries, ImmutableMap.of("A&B", 800L, "A&B&C", 800L), 0); |
| |
| summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs) |
| .selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(100)).read(); |
| checkSummaries(summaries, ImmutableMap.of("A&B", 400L, "A&B&C", 400L), 1); |
| |
| summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs) |
| .selectSummaries(sc -> sc.equals(sc1)).endRow(rowStr(99)).read(); |
| checkSummaries(summaries, ImmutableMap.of("A&B", 400L, "A&B&C", 400L), 0); |
| |
| summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs) |
| .selectSummaries(sc -> sc.equals(sc1)).endRow(rowStr(100)).read(); |
| checkSummaries(summaries, ImmutableMap.of("A&B", 800L, "A&B&C", 800L), 1); |
| |
| summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs) |
| .selectSummaries(sc -> sc.equals(sc1)).endRow(rowStr(199)).read(); |
| checkSummaries(summaries, ImmutableMap.of("A&B", 800L, "A&B&C", 800L), 0); |
| |
| summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs) |
| .selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(50)).endRow(rowStr(150)).read(); |
| checkSummaries(summaries, ImmutableMap.of("A&B", 800L, "A&B&C", 800L), 2); |
| |
| summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs) |
| .selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(120)).endRow(rowStr(150)).read(); |
| checkSummaries(summaries, ImmutableMap.of("A&B", 400L, "A&B&C", 400L), 1); |
| |
| summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs) |
| .selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(50)).endRow(rowStr(199)).read(); |
| checkSummaries(summaries, ImmutableMap.of("A&B", 800L, "A&B&C", 800L), 1); |
| |
| summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs) |
| .selectSummaries(sc -> sc.equals(sc1)).startRow("#").endRow(rowStr(150)).read(); |
| checkSummaries(summaries, ImmutableMap.of("A&B", 800L, "A&B&C", 800L), 1); |
| |
| summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs) |
| .selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(199)).read(); |
| checkSummaries(summaries, ImmutableMap.of(), 0); |
| summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs) |
| .selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(200)).read(); |
| checkSummaries(summaries, ImmutableMap.of(), 0); |
| |
| summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs) |
| .selectSummaries(sc -> sc.equals(sc1)).endRow("#").read(); |
| checkSummaries(summaries, ImmutableMap.of(), 0); |
| |
| summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs) |
| .selectSummaries(sc -> sc.equals(sc1)).endRow(rowStr(0)).read(); |
| checkSummaries(summaries, ImmutableMap.of("A&B", 400L, "A&B&C", 400L), 1); |
| } |
| |
| private void checkSummaries(Collection<Summary> summaries, Map<String,Long> expected, int extra) { |
| assertEquals(1, summaries.size()); |
| for (Summary summary : summaries) { |
| assertEquals(extra, summary.getFileStatistics().getInaccurate()); |
| assertEquals(extra, summary.getFileStatistics().getExtra()); |
| assertEquals(2, summary.getFileStatistics().getTotal()); |
| String className = summary.getSummarizerConfiguration().getClassName(); |
| CounterSummary counterSummary = new CounterSummary(summary); |
| if (className.equals(VisibilitySummarizer.class.getName())) { |
| Map<String,Long> counters = counterSummary.getCounters(); |
| |
| assertEquals(expected, counters); |
| } else { |
| fail("Unexpected classname " + className); |
| } |
| } |
| } |
| |
| @Test(expected = IllegalArgumentException.class) |
| public void testOutOfOrder() throws Exception { |
| // test that exception declared in API is thrown |
| Key k1 = new Key("r1", "f1", "q1"); |
| Value v1 = new Value("1".getBytes()); |
| |
| Key k2 = new Key("r2", "f1", "q1"); |
| Value v2 = new Value("2".getBytes()); |
| |
| LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); |
| String testFile = createTmpTestFile(); |
| try (RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build()) { |
| writer.append(k2, v2); |
| writer.append(k1, v1); |
| } |
| } |
| |
| @Test(expected = IllegalArgumentException.class) |
| public void testOutOfOrderIterable() throws Exception { |
| // test that exception declared in API is thrown |
| Key k1 = new Key("r1", "f1", "q1"); |
| Value v1 = new Value("1".getBytes()); |
| |
| Key k2 = new Key("r2", "f1", "q1"); |
| Value v2 = new Value("2".getBytes()); |
| |
| ArrayList<Entry<Key,Value>> data = new ArrayList<>(); |
| data.add(new AbstractMap.SimpleEntry<>(k2, v2)); |
| data.add(new AbstractMap.SimpleEntry<>(k1, v1)); |
| |
| LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); |
| String testFile = createTmpTestFile(); |
| try (RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build()) { |
| writer.append(data); |
| } |
| } |
| |
| @Test(expected = IllegalArgumentException.class) |
| public void testBadVis() throws Exception { |
| // this test has two purposes ensure an exception is thrown and ensure the exception document in |
| // the javadoc is thrown |
| LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); |
| String testFile = createTmpTestFile(); |
| try (RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build()) { |
| writer.startDefaultLocalityGroup(); |
| Key k1 = new Key("r1", "f1", "q1", "(A&(B"); |
| writer.append(k1, new Value("".getBytes())); |
| } |
| } |
| |
| @Test(expected = IllegalArgumentException.class) |
| public void testBadVisIterable() throws Exception { |
| // test append(iterable) method |
| LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); |
| String testFile = createTmpTestFile(); |
| try (RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build()) { |
| writer.startDefaultLocalityGroup(); |
| Key k1 = new Key("r1", "f1", "q1", "(A&(B"); |
| Entry<Key,Value> entry = new AbstractMap.SimpleEntry<>(k1, new Value("".getBytes())); |
| writer.append(Collections.singletonList(entry)); |
| } |
| } |
| |
| @Test(expected = IllegalStateException.class) |
| public void testDoubleStart() throws Exception { |
| LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); |
| String testFile = createTmpTestFile(); |
| try (RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build()) { |
| writer.startDefaultLocalityGroup(); |
| writer.startDefaultLocalityGroup(); |
| } |
| } |
| |
| @Test(expected = IllegalStateException.class) |
| public void testAppendStartDefault() throws Exception { |
| LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); |
| String testFile = createTmpTestFile(); |
| try (RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build()) { |
| writer.append(new Key("r1", "f1", "q1"), new Value("1".getBytes())); |
| writer.startDefaultLocalityGroup(); |
| } |
| } |
| |
| @Test(expected = IllegalStateException.class) |
| public void testStartAfter() throws Exception { |
| LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); |
| String testFile = createTmpTestFile(); |
| try (RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build()) { |
| Key k1 = new Key("r1", "f1", "q1"); |
| writer.append(k1, new Value("".getBytes())); |
| writer.startNewLocalityGroup("lg1", "fam1"); |
| } |
| } |
| |
| @Test(expected = IllegalArgumentException.class) |
| public void testIllegalColumn() throws Exception { |
| LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); |
| String testFile = createTmpTestFile(); |
| try (RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build()) { |
| writer.startNewLocalityGroup("lg1", "fam1"); |
| Key k1 = new Key("r1", "f1", "q1"); |
| // should not be able to append the column family f1 |
| writer.append(k1, new Value("".getBytes())); |
| } |
| } |
| |
| @Test(expected = IllegalArgumentException.class) |
| public void testWrongGroup() throws Exception { |
| LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); |
| String testFile = createTmpTestFile(); |
| try (RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build()) { |
| writer.startNewLocalityGroup("lg1", "fam1"); |
| Key k1 = new Key("r1", "fam1", "q1"); |
| writer.append(k1, new Value("".getBytes())); |
| writer.startDefaultLocalityGroup(); |
| // should not be able to append the column family fam1 to default locality group |
| Key k2 = new Key("r1", "fam1", "q2"); |
| writer.append(k2, new Value("".getBytes())); |
| } |
| } |
| |
| private Reader getReader(LocalFileSystem localFs, String testFile) throws IOException { |
| return (Reader) FileOperations.getInstance().newReaderBuilder() |
| .forFile(testFile, localFs, localFs.getConf()) |
| .withTableConfiguration(DefaultConfiguration.getInstance()) |
| .withCryptoService(new NoCryptoService()).build(); |
| } |
| |
| @Test |
| public void testMultipleFilesAndCache() throws Exception { |
| SortedMap<Key,Value> testData = createTestData(100, 10, 10); |
| List<String> files = Arrays.asList(createTmpTestFile(), createTmpTestFile(), |
| createTmpTestFile()); |
| |
| LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); |
| |
| for (int i = 0; i < files.size(); i++) { |
| try ( |
| RFileWriter writer = RFile.newWriter().to(files.get(i)).withFileSystem(localFs).build()) { |
| for (Entry<Key,Value> entry : testData.entrySet()) { |
| if (entry.getKey().hashCode() % files.size() == i) { |
| writer.append(entry.getKey(), entry.getValue()); |
| } |
| } |
| } |
| } |
| |
| Scanner scanner = RFile.newScanner().from(files.toArray(new String[files.size()])) |
| .withFileSystem(localFs).withIndexCache(1000000).withDataCache(10000000).build(); |
| assertEquals(testData, toMap(scanner)); |
| scanner.close(); |
| } |
| } |