blob: 53da97d21cdcd0e20a6d250e6e2162e0c7c3671d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.examples.filedata;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import junit.framework.TestCase;
public class ChunkCombinerTest extends TestCase {
public static class MapIterator implements SortedKeyValueIterator<Key,Value> {
private Iterator<Entry<Key,Value>> iter;
private Entry<Key,Value> entry;
Collection<ByteSequence> columnFamilies;
private SortedMap<Key,Value> map;
private Range range;
@Override
public MapIterator deepCopy(IteratorEnvironment env) {
return new MapIterator(map);
}
private MapIterator(SortedMap<Key,Value> map) {
this.map = map;
iter = map.entrySet().iterator();
this.range = new Range();
if (iter.hasNext())
entry = iter.next();
else
entry = null;
}
@Override
public Key getTopKey() {
return entry.getKey();
}
@Override
public Value getTopValue() {
return entry.getValue();
}
@Override
public boolean hasTop() {
return entry != null;
}
@Override
public void next() throws IOException {
entry = null;
while (iter.hasNext()) {
entry = iter.next();
if (columnFamilies.size() > 0
&& !columnFamilies.contains(entry.getKey().getColumnFamilyData())) {
entry = null;
continue;
}
if (range.afterEndKey(entry.getKey()))
entry = null;
break;
}
}
@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
throws IOException {
if (!inclusive) {
throw new IllegalArgumentException("can only do inclusive colf filtering");
}
this.columnFamilies = columnFamilies;
this.range = range;
Key key = range.getStartKey();
if (key == null) {
key = new Key();
}
iter = map.tailMap(key).entrySet().iterator();
next();
while (hasTop() && range.beforeStartKey(getTopKey())) {
next();
}
}
@Override
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
IteratorEnvironment env) throws IOException {
throw new UnsupportedOperationException();
}
}
private TreeMap<Key,Value> row1;
private TreeMap<Key,Value> row2;
private TreeMap<Key,Value> row3;
private TreeMap<Key,Value> allRows;
private TreeMap<Key,Value> cRow1;
private TreeMap<Key,Value> cRow2;
private TreeMap<Key,Value> cRow3;
private TreeMap<Key,Value> allCRows;
private TreeMap<Key,Value> cOnlyRow1;
private TreeMap<Key,Value> cOnlyRow2;
private TreeMap<Key,Value> cOnlyRow3;
private TreeMap<Key,Value> allCOnlyRows;
private TreeMap<Key,Value> badrow;
@Override
protected void setUp() {
row1 = new TreeMap<>();
row2 = new TreeMap<>();
row3 = new TreeMap<>();
allRows = new TreeMap<>();
cRow1 = new TreeMap<>();
cRow2 = new TreeMap<>();
cRow3 = new TreeMap<>();
allCRows = new TreeMap<>();
cOnlyRow1 = new TreeMap<>();
cOnlyRow2 = new TreeMap<>();
cOnlyRow3 = new TreeMap<>();
allCOnlyRows = new TreeMap<>();
badrow = new TreeMap<>();
String refs = FileDataIngest.REFS_CF.toString();
String fileext = FileDataIngest.REFS_FILE_EXT;
String filename = FileDataIngest.REFS_ORIG_FILE;
String chunk_cf = FileDataIngest.CHUNK_CF.toString();
row1.put(new Key("row1", refs, "hash1\0" + fileext, "C"), new Value("jpg".getBytes()));
row1.put(new Key("row1", refs, "hash1\0" + filename, "D"), new Value("foo1.jpg".getBytes()));
row1.put(new Key("row1", chunk_cf, "0000", "A"), new Value("V1".getBytes()));
row1.put(new Key("row1", chunk_cf, "0000", "B"), new Value("V1".getBytes()));
row1.put(new Key("row1", chunk_cf, "0001", "A"), new Value("V2".getBytes()));
row1.put(new Key("row1", chunk_cf, "0001", "B"), new Value("V2".getBytes()));
cRow1.put(new Key("row1", refs, "hash1\0" + fileext, "C"), new Value("jpg".getBytes()));
cRow1.put(new Key("row1", refs, "hash1\0" + filename, "D"), new Value("foo1.jpg".getBytes()));
cRow1.put(new Key("row1", chunk_cf, "0000", "(C)|(D)"), new Value("V1".getBytes()));
cRow1.put(new Key("row1", chunk_cf, "0001", "(C)|(D)"), new Value("V2".getBytes()));
cOnlyRow1.put(new Key("row1", chunk_cf, "0000", "(C)|(D)"), new Value("V1".getBytes()));
cOnlyRow1.put(new Key("row1", chunk_cf, "0001", "(C)|(D)"), new Value("V2".getBytes()));
row2.put(new Key("row2", refs, "hash1\0" + fileext, "A"), new Value("jpg".getBytes()));
row2.put(new Key("row2", refs, "hash1\0" + filename, "B"), new Value("foo1.jpg".getBytes()));
row2.put(new Key("row2", chunk_cf, "0000", "A|B"), new Value("V1".getBytes()));
row2.put(new Key("row2", chunk_cf, "0000", "A"), new Value("V1".getBytes()));
row2.put(new Key("row2", chunk_cf, "0000", "(A)|(B)"), new Value("V1".getBytes()));
row2.put(new Key("row2a", chunk_cf, "0000", "C"), new Value("V1".getBytes()));
cRow2.put(new Key("row2", refs, "hash1\0" + fileext, "A"), new Value("jpg".getBytes()));
cRow2.put(new Key("row2", refs, "hash1\0" + filename, "B"), new Value("foo1.jpg".getBytes()));
cRow2.put(new Key("row2", chunk_cf, "0000", "(A)|(B)"), new Value("V1".getBytes()));
cOnlyRow2.put(new Key("row2", chunk_cf, "0000", "(A)|(B)"), new Value("V1".getBytes()));
row3.put(new Key("row3", refs, "hash1\0w", "(A&B)|(C&(D|E))"), new Value("".getBytes()));
row3.put(new Key("row3", refs, "hash1\0x", "A&B"), new Value("".getBytes()));
row3.put(new Key("row3", refs, "hash1\0y", "(A&B)"), new Value("".getBytes()));
row3.put(new Key("row3", refs, "hash1\0z", "(F|G)&(D|E)"), new Value("".getBytes()));
row3.put(new Key("row3", chunk_cf, "0000", "(A&B)|(C&(D|E))", 10), new Value("V1".getBytes()));
row3.put(new Key("row3", chunk_cf, "0000", "A&B", 20), new Value("V1".getBytes()));
row3.put(new Key("row3", chunk_cf, "0000", "(A&B)", 10), new Value("V1".getBytes()));
row3.put(new Key("row3", chunk_cf, "0000", "(F|G)&(D|E)", 10), new Value("V1".getBytes()));
cRow3.put(new Key("row3", refs, "hash1\0w", "(A&B)|(C&(D|E))"), new Value("".getBytes()));
cRow3.put(new Key("row3", refs, "hash1\0x", "A&B"), new Value("".getBytes()));
cRow3.put(new Key("row3", refs, "hash1\0y", "(A&B)"), new Value("".getBytes()));
cRow3.put(new Key("row3", refs, "hash1\0z", "(F|G)&(D|E)"), new Value("".getBytes()));
cRow3.put(new Key("row3", chunk_cf, "0000", "((F|G)&(D|E))|(A&B)|(C&(D|E))", 20),
new Value("V1".getBytes()));
cOnlyRow3.put(new Key("row3", chunk_cf, "0000", "((F|G)&(D|E))|(A&B)|(C&(D|E))", 20),
new Value("V1".getBytes()));
badrow.put(new Key("row1", chunk_cf, "0000", "A"), new Value("V1".getBytes()));
badrow.put(new Key("row1", chunk_cf, "0000", "B"), new Value("V2".getBytes()));
allRows.putAll(row1);
allRows.putAll(row2);
allRows.putAll(row3);
allCRows.putAll(cRow1);
allCRows.putAll(cRow2);
allCRows.putAll(cRow3);
allCOnlyRows.putAll(cOnlyRow1);
allCOnlyRows.putAll(cOnlyRow2);
allCOnlyRows.putAll(cOnlyRow3);
}
private static final Collection<ByteSequence> emptyColfs = new HashSet<>();
public void test1() throws IOException {
runTest(false, allRows, allCRows, emptyColfs);
runTest(true, allRows, allCRows, emptyColfs);
runTest(false, allRows, allCOnlyRows, Collections.singleton(FileDataIngest.CHUNK_CF_BS));
runTest(true, allRows, allCOnlyRows, Collections.singleton(FileDataIngest.CHUNK_CF_BS));
try {
runTest(true, badrow, null, emptyColfs);
assertNotNull(null);
} catch (RuntimeException e) {
assertNull(null);
}
}
private void runTest(boolean reseek, TreeMap<Key,Value> source, TreeMap<Key,Value> result,
Collection<ByteSequence> cols) throws IOException {
MapIterator src = new MapIterator(source);
SortedKeyValueIterator<Key,Value> iter = new ChunkCombiner();
iter.init(src, null, null);
iter = iter.deepCopy(null);
iter.seek(new Range(), cols, true);
TreeMap<Key,Value> seen = new TreeMap<>();
while (iter.hasTop()) {
assertFalse("already contains " + iter.getTopKey(), seen.containsKey(iter.getTopKey()));
seen.put(new Key(iter.getTopKey()), new Value(iter.getTopValue()));
if (reseek)
iter.seek(new Range(iter.getTopKey().followingKey(PartialKey.ROW_COLFAM_COLQUAL), true,
null, true), cols, true);
else
iter.next();
}
assertEquals(result, seen);
}
}