blob: dfd52abd533d6e85a47ec479f6a50cafb4d61f1a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.core.client.rfile;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.security.SecureRandom;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.sample.RowSampler;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
import org.apache.accumulo.core.client.summary.CounterSummary;
import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
import org.apache.accumulo.core.client.summary.Summary;
import org.apache.accumulo.core.client.summary.summarizers.FamilySummarizer;
import org.apache.accumulo.core.client.summary.summarizers.VisibilitySummarizer;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVIterator;
import org.apache.accumulo.core.file.rfile.RFile.Reader;
import org.apache.accumulo.core.iterators.user.RegExFilter;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.crypto.impl.NoCryptoService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.io.Text;
import org.junit.Test;
import com.google.common.collect.ImmutableMap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
public class RFileTest {
@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "path is set by test, not user")
private String createTmpTestFile() throws IOException {
File dir = new File(System.getProperty("user.dir") + "/target/rfile-test");
assertTrue(dir.mkdirs() || dir.isDirectory());
File testFile = File.createTempFile("test", ".rf", dir);
assertTrue(testFile.delete() || !testFile.exists());
return testFile.getAbsolutePath();
}
String rowStr(int r) {
return String.format("%06x", r);
}
String colStr(int c) {
return String.format("%04x", c);
}
private SortedMap<Key,Value> createTestData(int rows, int families, int qualifiers) {
return createTestData(0, rows, 0, families, qualifiers);
}
private SortedMap<Key,Value> createTestData(int startRow, int rows, int startFamily, int families,
int qualifiers) {
return createTestData(startRow, rows, startFamily, families, qualifiers, "");
}
private SortedMap<Key,Value> createTestData(int startRow, int rows, int startFamily, int families,
int qualifiers, String... vis) {
TreeMap<Key,Value> testData = new TreeMap<>();
for (int r = 0; r < rows; r++) {
String row = rowStr(r + startRow);
for (int f = 0; f < families; f++) {
String fam = colStr(f + startFamily);
for (int q = 0; q < qualifiers; q++) {
String qual = colStr(q);
for (String v : vis) {
Key k = new Key(row, fam, qual, v);
testData.put(k, new Value((k.hashCode() + "").getBytes()));
}
}
}
}
return testData;
}
private String createRFile(SortedMap<Key,Value> testData) throws Exception {
String testFile = createTmpTestFile();
try (RFileWriter writer = RFile.newWriter().to(testFile)
.withFileSystem(FileSystem.getLocal(new Configuration())).build()) {
writer.append(testData.entrySet());
// TODO ensure compressors are returned
}
return testFile;
}
@Test
public void testIndependance() throws Exception {
// test to ensure two iterators allocated from same RFile scanner are independent.
LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
SortedMap<Key,Value> testData = createTestData(10, 10, 10);
String testFile = createRFile(testData);
Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).build();
Range range1 = Range.exact(rowStr(5));
scanner.setRange(range1);
Iterator<Entry<Key,Value>> scnIter1 = scanner.iterator();
Iterator<Entry<Key,Value>> mapIter1 = testData.subMap(range1.getStartKey(), range1.getEndKey())
.entrySet().iterator();
Range range2 = new Range(rowStr(3), true, rowStr(4), true);
scanner.setRange(range2);
Iterator<Entry<Key,Value>> scnIter2 = scanner.iterator();
Iterator<Entry<Key,Value>> mapIter2 = testData.subMap(range2.getStartKey(), range2.getEndKey())
.entrySet().iterator();
while (scnIter1.hasNext() || scnIter2.hasNext()) {
if (scnIter1.hasNext()) {
assertTrue(mapIter1.hasNext());
assertEquals(scnIter1.next(), mapIter1.next());
} else {
assertFalse(mapIter1.hasNext());
}
if (scnIter2.hasNext()) {
assertTrue(mapIter2.hasNext());
assertEquals(scnIter2.next(), mapIter2.next());
} else {
assertFalse(mapIter2.hasNext());
}
}
assertFalse(mapIter1.hasNext());
assertFalse(mapIter2.hasNext());
scanner.close();
}
SortedMap<Key,Value> toMap(Scanner scanner) {
TreeMap<Key,Value> map = new TreeMap<>();
for (Entry<Key,Value> entry : scanner) {
map.put(entry.getKey(), entry.getValue());
}
return map;
}
@Test
public void testMultipleSources() throws Exception {
SortedMap<Key,Value> testData1 = createTestData(10, 10, 10);
SortedMap<Key,Value> testData2 = createTestData(0, 10, 0, 10, 10);
String testFile1 = createRFile(testData1);
String testFile2 = createRFile(testData2);
LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
Scanner scanner = RFile.newScanner().from(testFile1, testFile2).withFileSystem(localFs).build();
TreeMap<Key,Value> expected = new TreeMap<>(testData1);
expected.putAll(testData2);
assertEquals(expected, toMap(scanner));
Range range = new Range(rowStr(3), true, rowStr(14), true);
scanner.setRange(range);
assertEquals(expected.subMap(range.getStartKey(), range.getEndKey()), toMap(scanner));
scanner.close();
}
@Test
public void testWriterTableProperties() throws Exception {
LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
String testFile = createTmpTestFile();
Map<String,String> props = new HashMap<>();
props.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "1K");
props.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX.getKey(), "1K");
RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs)
.withTableProperties(props).build();
SortedMap<Key,Value> testData1 = createTestData(10, 10, 10);
writer.append(testData1.entrySet());
writer.close();
Reader reader = getReader(localFs, testFile);
FileSKVIterator iiter = reader.getIndex();
int count = 0;
while (iiter.hasTop()) {
count++;
iiter.next();
}
// if settings are used then should create multiple index entries
assertTrue(count > 10);
reader.close();
Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).build();
assertEquals(testData1, toMap(scanner));
scanner.close();
}
@Test
public void testLocalityGroups() throws Exception {
SortedMap<Key,Value> testData1 = createTestData(0, 10, 0, 2, 10);
SortedMap<Key,Value> testData2 = createTestData(0, 10, 2, 1, 10);
SortedMap<Key,Value> defaultData = createTestData(0, 10, 3, 7, 10);
LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
String testFile = createTmpTestFile();
RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build();
writer.startNewLocalityGroup("z", colStr(0), colStr(1));
writer.append(testData1.entrySet());
writer.startNewLocalityGroup("h", colStr(2));
writer.append(testData2.entrySet());
writer.startDefaultLocalityGroup();
writer.append(defaultData.entrySet());
writer.close();
Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).build();
scanner.fetchColumnFamily(new Text(colStr(0)));
scanner.fetchColumnFamily(new Text(colStr(1)));
assertEquals(testData1, toMap(scanner));
scanner.clearColumns();
scanner.fetchColumnFamily(new Text(colStr(2)));
assertEquals(testData2, toMap(scanner));
scanner.clearColumns();
for (int i = 3; i < 10; i++) {
scanner.fetchColumnFamily(new Text(colStr(i)));
}
assertEquals(defaultData, toMap(scanner));
scanner.clearColumns();
assertEquals(createTestData(10, 10, 10), toMap(scanner));
scanner.close();
Reader reader = getReader(localFs, testFile);
Map<String,ArrayList<ByteSequence>> lGroups = reader.getLocalityGroupCF();
assertTrue(lGroups.containsKey("z"));
assertEquals(2, lGroups.get("z").size());
assertTrue(lGroups.get("z").contains(new ArrayByteSequence(colStr(0))));
assertTrue(lGroups.get("z").contains(new ArrayByteSequence(colStr(1))));
assertTrue(lGroups.containsKey("h"));
assertEquals(Arrays.asList(new ArrayByteSequence(colStr(2))), lGroups.get("h"));
reader.close();
}
@Test
public void testIterators() throws Exception {
LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
SortedMap<Key,Value> testData = createTestData(10, 10, 10);
String testFile = createRFile(testData);
IteratorSetting is = new IteratorSetting(50, "regex", RegExFilter.class);
RegExFilter.setRegexs(is, ".*00000[78].*", null, null, null, false);
Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).build();
scanner.addScanIterator(is);
assertEquals(createTestData(7, 2, 0, 10, 10), toMap(scanner));
scanner.close();
}
@Test
public void testAuths() throws Exception {
LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
String testFile = createTmpTestFile();
RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build();
Key k1 = new Key("r1", "f1", "q1", "A&B");
Key k2 = new Key("r1", "f1", "q2", "A");
Key k3 = new Key("r1", "f1", "q3");
Value v1 = new Value("p".getBytes());
Value v2 = new Value("c".getBytes());
Value v3 = new Value("t".getBytes());
writer.append(k1, v1);
writer.append(k2, v2);
writer.append(k3, v3);
writer.close();
Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs)
.withAuthorizations(new Authorizations("A")).build();
assertEquals(ImmutableMap.of(k2, v2, k3, v3), toMap(scanner));
assertEquals(new Authorizations("A"), scanner.getAuthorizations());
scanner.close();
scanner = RFile.newScanner().from(testFile).withFileSystem(localFs)
.withAuthorizations(new Authorizations("A", "B")).build();
assertEquals(ImmutableMap.of(k1, v1, k2, v2, k3, v3), toMap(scanner));
assertEquals(new Authorizations("A", "B"), scanner.getAuthorizations());
scanner.close();
scanner = RFile.newScanner().from(testFile).withFileSystem(localFs)
.withAuthorizations(new Authorizations("B")).build();
assertEquals(ImmutableMap.of(k3, v3), toMap(scanner));
assertEquals(new Authorizations("B"), scanner.getAuthorizations());
scanner.close();
}
@Test
public void testNoSystemIters() throws Exception {
LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
String testFile = createTmpTestFile();
RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build();
Key k1 = new Key("r1", "f1", "q1");
k1.setTimestamp(3);
Key k2 = new Key("r1", "f1", "q1");
k2.setTimestamp(6);
k2.setDeleted(true);
Value v1 = new Value("p".getBytes());
Value v2 = new Value("".getBytes());
writer.append(k2, v2);
writer.append(k1, v1);
writer.close();
Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).build();
assertFalse(scanner.iterator().hasNext());
scanner.close();
scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).withoutSystemIterators()
.build();
assertEquals(ImmutableMap.of(k2, v2, k1, v1), toMap(scanner));
scanner.setRange(new Range("r2"));
assertFalse(scanner.iterator().hasNext());
scanner.close();
}
@Test
public void testBounds() throws Exception {
LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
SortedMap<Key,Value> testData = createTestData(10, 10, 10);
String testFile = createRFile(testData);
// set a lower bound row
Range bounds = new Range(rowStr(3), false, null, true);
Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).withBounds(bounds)
.build();
assertEquals(createTestData(4, 6, 0, 10, 10), toMap(scanner));
scanner.close();
// set an upper bound row
bounds = new Range(null, false, rowStr(7), true);
scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).withBounds(bounds).build();
assertEquals(createTestData(8, 10, 10), toMap(scanner));
scanner.close();
// set row bounds
bounds = new Range(rowStr(3), false, rowStr(7), true);
scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).withBounds(bounds).build();
assertEquals(createTestData(4, 4, 0, 10, 10), toMap(scanner));
scanner.close();
// set a row family bound
bounds = Range.exact(rowStr(3), colStr(5));
scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).withBounds(bounds).build();
assertEquals(createTestData(3, 1, 5, 1, 10), toMap(scanner));
scanner.close();
}
@Test
public void testScannerTableProperties() throws Exception {
NewTableConfiguration ntc = new NewTableConfiguration();
LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
String testFile = createTmpTestFile();
RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build();
Key k1 = new Key("r1", "f1", "q1");
k1.setTimestamp(3);
Key k2 = new Key("r1", "f1", "q1");
k2.setTimestamp(6);
Value v1 = new Value("p".getBytes());
Value v2 = new Value("q".getBytes());
writer.append(k2, v2);
writer.append(k1, v1);
writer.close();
// pass in table config that has versioning iterator configured
Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs)
.withTableProperties(ntc.getProperties()).build();
assertEquals(ImmutableMap.of(k2, v2), toMap(scanner));
scanner.close();
scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).build();
assertEquals(ImmutableMap.of(k2, v2, k1, v1), toMap(scanner));
scanner.close();
}
@Test
public void testSampling() throws Exception {
SortedMap<Key,Value> testData1 = createTestData(1000, 2, 1);
LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
String testFile = createTmpTestFile();
SamplerConfiguration sc = new SamplerConfiguration(RowSampler.class)
.setOptions(ImmutableMap.of("hasher", "murmur3_32", "modulus", "19"));
RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).withSampler(sc)
.build();
writer.append(testData1.entrySet());
writer.close();
Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).build();
scanner.setSamplerConfiguration(sc);
RowSampler rowSampler = new RowSampler();
rowSampler.init(sc);
SortedMap<Key,Value> sampleData = new TreeMap<>();
for (Entry<Key,Value> e : testData1.entrySet()) {
if (rowSampler.accept(e.getKey())) {
sampleData.put(e.getKey(), e.getValue());
}
}
assertTrue(sampleData.size() < testData1.size());
assertEquals(sampleData, toMap(scanner));
scanner.clearSamplerConfiguration();
assertEquals(testData1, toMap(scanner));
}
@Test
public void testAppendScanner() throws Exception {
SortedMap<Key,Value> testData = createTestData(10000, 1, 1);
String testFile = createRFile(testData);
LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).build();
String testFile2 = createTmpTestFile();
RFileWriter writer = RFile.newWriter().to(testFile2).build();
writer.append(scanner);
writer.close();
scanner.close();
scanner = RFile.newScanner().from(testFile2).withFileSystem(localFs).build();
assertEquals(testData, toMap(scanner));
scanner.close();
}
@Test
public void testCache() throws Exception {
SortedMap<Key,Value> testData = createTestData(10000, 1, 1);
String testFile = createRFile(testData);
LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs)
.withIndexCache(1000000).withDataCache(10000000).build();
Random rand = new SecureRandom();
for (int i = 0; i < 100; i++) {
int r = rand.nextInt(10000);
scanner.setRange(new Range(rowStr(r)));
Iterator<Entry<Key,Value>> iter = scanner.iterator();
assertTrue(iter.hasNext());
assertEquals(rowStr(r), iter.next().getKey().getRow().toString());
assertFalse(iter.hasNext());
}
scanner.close();
}
@Test
public void testSummaries() throws Exception {
SummarizerConfiguration sc1 = SummarizerConfiguration.builder(VisibilitySummarizer.class)
.build();
SummarizerConfiguration sc2 = SummarizerConfiguration.builder(FamilySummarizer.class).build();
LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
String testFile = createTmpTestFile();
SortedMap<Key,Value> testData1 = createTestData(0, 100, 0, 4, 1, "A&B", "A&B&C");
RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs)
.withSummarizers(sc1, sc2).build();
writer.append(testData1.entrySet());
writer.close();
// verify summary data
Collection<Summary> summaries = RFile.summaries().from(testFile).withFileSystem(localFs).read();
assertEquals(2, summaries.size());
for (Summary summary : summaries) {
assertEquals(0, summary.getFileStatistics().getInaccurate());
assertEquals(1, summary.getFileStatistics().getTotal());
String className = summary.getSummarizerConfiguration().getClassName();
CounterSummary counterSummary = new CounterSummary(summary);
if (className.equals(FamilySummarizer.class.getName())) {
Map<String,Long> counters = counterSummary.getCounters();
Map<String,Long> expected = ImmutableMap.of("0000", 200L, "0001", 200L, "0002", 200L,
"0003", 200L);
assertEquals(expected, counters);
} else if (className.equals(VisibilitySummarizer.class.getName())) {
Map<String,Long> counters = counterSummary.getCounters();
Map<String,Long> expected = ImmutableMap.of("A&B", 400L, "A&B&C", 400L);
assertEquals(expected, counters);
} else {
fail("Unexpected classname " + className);
}
}
// check if writing summary data impacted normal rfile functionality
Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs)
.withAuthorizations(new Authorizations("A", "B", "C")).build();
assertEquals(testData1, toMap(scanner));
scanner.close();
String testFile2 = createTmpTestFile();
SortedMap<Key,Value> testData2 = createTestData(100, 100, 0, 4, 1, "A&B", "A&B&C");
writer = RFile.newWriter().to(testFile2).withFileSystem(localFs).withSummarizers(sc1, sc2)
.build();
writer.append(testData2.entrySet());
writer.close();
// verify reading summaries from multiple files works
summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).read();
assertEquals(2, summaries.size());
for (Summary summary : summaries) {
assertEquals(0, summary.getFileStatistics().getInaccurate());
assertEquals(2, summary.getFileStatistics().getTotal());
String className = summary.getSummarizerConfiguration().getClassName();
CounterSummary counterSummary = new CounterSummary(summary);
if (className.equals(FamilySummarizer.class.getName())) {
Map<String,Long> counters = counterSummary.getCounters();
Map<String,Long> expected = ImmutableMap.of("0000", 400L, "0001", 400L, "0002", 400L,
"0003", 400L);
assertEquals(expected, counters);
} else if (className.equals(VisibilitySummarizer.class.getName())) {
Map<String,Long> counters = counterSummary.getCounters();
Map<String,Long> expected = ImmutableMap.of("A&B", 800L, "A&B&C", 800L);
assertEquals(expected, counters);
} else {
fail("Unexpected classname " + className);
}
}
// verify reading a subset of summaries works
summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs)
.selectSummaries(sc -> sc.equals(sc1)).read();
checkSummaries(summaries, ImmutableMap.of("A&B", 800L, "A&B&C", 800L), 0);
// the following test check boundary conditions for start row and end row
summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs)
.selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(99)).read();
checkSummaries(summaries, ImmutableMap.of("A&B", 400L, "A&B&C", 400L), 0);
summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs)
.selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(98)).read();
checkSummaries(summaries, ImmutableMap.of("A&B", 800L, "A&B&C", 800L), 1);
summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs)
.selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(0)).read();
checkSummaries(summaries, ImmutableMap.of("A&B", 800L, "A&B&C", 800L), 1);
summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs)
.selectSummaries(sc -> sc.equals(sc1)).startRow("#").read();
checkSummaries(summaries, ImmutableMap.of("A&B", 800L, "A&B&C", 800L), 0);
summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs)
.selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(100)).read();
checkSummaries(summaries, ImmutableMap.of("A&B", 400L, "A&B&C", 400L), 1);
summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs)
.selectSummaries(sc -> sc.equals(sc1)).endRow(rowStr(99)).read();
checkSummaries(summaries, ImmutableMap.of("A&B", 400L, "A&B&C", 400L), 0);
summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs)
.selectSummaries(sc -> sc.equals(sc1)).endRow(rowStr(100)).read();
checkSummaries(summaries, ImmutableMap.of("A&B", 800L, "A&B&C", 800L), 1);
summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs)
.selectSummaries(sc -> sc.equals(sc1)).endRow(rowStr(199)).read();
checkSummaries(summaries, ImmutableMap.of("A&B", 800L, "A&B&C", 800L), 0);
summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs)
.selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(50)).endRow(rowStr(150)).read();
checkSummaries(summaries, ImmutableMap.of("A&B", 800L, "A&B&C", 800L), 2);
summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs)
.selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(120)).endRow(rowStr(150)).read();
checkSummaries(summaries, ImmutableMap.of("A&B", 400L, "A&B&C", 400L), 1);
summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs)
.selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(50)).endRow(rowStr(199)).read();
checkSummaries(summaries, ImmutableMap.of("A&B", 800L, "A&B&C", 800L), 1);
summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs)
.selectSummaries(sc -> sc.equals(sc1)).startRow("#").endRow(rowStr(150)).read();
checkSummaries(summaries, ImmutableMap.of("A&B", 800L, "A&B&C", 800L), 1);
summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs)
.selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(199)).read();
checkSummaries(summaries, ImmutableMap.of(), 0);
summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs)
.selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(200)).read();
checkSummaries(summaries, ImmutableMap.of(), 0);
summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs)
.selectSummaries(sc -> sc.equals(sc1)).endRow("#").read();
checkSummaries(summaries, ImmutableMap.of(), 0);
summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs)
.selectSummaries(sc -> sc.equals(sc1)).endRow(rowStr(0)).read();
checkSummaries(summaries, ImmutableMap.of("A&B", 400L, "A&B&C", 400L), 1);
}
private void checkSummaries(Collection<Summary> summaries, Map<String,Long> expected, int extra) {
assertEquals(1, summaries.size());
for (Summary summary : summaries) {
assertEquals(extra, summary.getFileStatistics().getInaccurate());
assertEquals(extra, summary.getFileStatistics().getExtra());
assertEquals(2, summary.getFileStatistics().getTotal());
String className = summary.getSummarizerConfiguration().getClassName();
CounterSummary counterSummary = new CounterSummary(summary);
if (className.equals(VisibilitySummarizer.class.getName())) {
Map<String,Long> counters = counterSummary.getCounters();
assertEquals(expected, counters);
} else {
fail("Unexpected classname " + className);
}
}
}
@Test(expected = IllegalArgumentException.class)
public void testOutOfOrder() throws Exception {
// test that exception declared in API is thrown
Key k1 = new Key("r1", "f1", "q1");
Value v1 = new Value("1".getBytes());
Key k2 = new Key("r2", "f1", "q1");
Value v2 = new Value("2".getBytes());
LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
String testFile = createTmpTestFile();
try (RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build()) {
writer.append(k2, v2);
writer.append(k1, v1);
}
}
@Test(expected = IllegalArgumentException.class)
public void testOutOfOrderIterable() throws Exception {
// test that exception declared in API is thrown
Key k1 = new Key("r1", "f1", "q1");
Value v1 = new Value("1".getBytes());
Key k2 = new Key("r2", "f1", "q1");
Value v2 = new Value("2".getBytes());
ArrayList<Entry<Key,Value>> data = new ArrayList<>();
data.add(new AbstractMap.SimpleEntry<>(k2, v2));
data.add(new AbstractMap.SimpleEntry<>(k1, v1));
LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
String testFile = createTmpTestFile();
try (RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build()) {
writer.append(data);
}
}
@Test(expected = IllegalArgumentException.class)
public void testBadVis() throws Exception {
// this test has two purposes ensure an exception is thrown and ensure the exception document in
// the javadoc is thrown
LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
String testFile = createTmpTestFile();
try (RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build()) {
writer.startDefaultLocalityGroup();
Key k1 = new Key("r1", "f1", "q1", "(A&(B");
writer.append(k1, new Value("".getBytes()));
}
}
@Test(expected = IllegalArgumentException.class)
public void testBadVisIterable() throws Exception {
// test append(iterable) method
LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
String testFile = createTmpTestFile();
try (RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build()) {
writer.startDefaultLocalityGroup();
Key k1 = new Key("r1", "f1", "q1", "(A&(B");
Entry<Key,Value> entry = new AbstractMap.SimpleEntry<>(k1, new Value("".getBytes()));
writer.append(Collections.singletonList(entry));
}
}
@Test(expected = IllegalStateException.class)
public void testDoubleStart() throws Exception {
LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
String testFile = createTmpTestFile();
try (RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build()) {
writer.startDefaultLocalityGroup();
writer.startDefaultLocalityGroup();
}
}
@Test(expected = IllegalStateException.class)
public void testAppendStartDefault() throws Exception {
LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
String testFile = createTmpTestFile();
try (RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build()) {
writer.append(new Key("r1", "f1", "q1"), new Value("1".getBytes()));
writer.startDefaultLocalityGroup();
}
}
@Test(expected = IllegalStateException.class)
public void testStartAfter() throws Exception {
LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
String testFile = createTmpTestFile();
try (RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build()) {
Key k1 = new Key("r1", "f1", "q1");
writer.append(k1, new Value("".getBytes()));
writer.startNewLocalityGroup("lg1", "fam1");
}
}
@Test(expected = IllegalArgumentException.class)
public void testIllegalColumn() throws Exception {
LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
String testFile = createTmpTestFile();
try (RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build()) {
writer.startNewLocalityGroup("lg1", "fam1");
Key k1 = new Key("r1", "f1", "q1");
// should not be able to append the column family f1
writer.append(k1, new Value("".getBytes()));
}
}
@Test(expected = IllegalArgumentException.class)
public void testWrongGroup() throws Exception {
LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
String testFile = createTmpTestFile();
try (RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build()) {
writer.startNewLocalityGroup("lg1", "fam1");
Key k1 = new Key("r1", "fam1", "q1");
writer.append(k1, new Value("".getBytes()));
writer.startDefaultLocalityGroup();
// should not be able to append the column family fam1 to default locality group
Key k2 = new Key("r1", "fam1", "q2");
writer.append(k2, new Value("".getBytes()));
}
}
private Reader getReader(LocalFileSystem localFs, String testFile) throws IOException {
return (Reader) FileOperations.getInstance().newReaderBuilder()
.forFile(testFile, localFs, localFs.getConf())
.withTableConfiguration(DefaultConfiguration.getInstance())
.withCryptoService(new NoCryptoService()).build();
}
@Test
public void testMultipleFilesAndCache() throws Exception {
SortedMap<Key,Value> testData = createTestData(100, 10, 10);
List<String> files = Arrays.asList(createTmpTestFile(), createTmpTestFile(),
createTmpTestFile());
LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
for (int i = 0; i < files.size(); i++) {
try (
RFileWriter writer = RFile.newWriter().to(files.get(i)).withFileSystem(localFs).build()) {
for (Entry<Key,Value> entry : testData.entrySet()) {
if (entry.getKey().hashCode() % files.size() == i) {
writer.append(entry.getKey(), entry.getValue());
}
}
}
}
Scanner scanner = RFile.newScanner().from(files.toArray(new String[files.size()]))
.withFileSystem(localFs).withIndexCache(1000000).withDataCache(10000000).build();
assertEquals(testData, toMap(scanner));
scanner.close();
}
}