blob: bf34d112a6889402550d722776b1ad445e8353bf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.crypto.CryptoServiceFactory;
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.test.categories.SunnyDayTests;
import org.apache.accumulo.test.functional.NativeMapIT;
import org.apache.accumulo.tserver.InMemoryMap;
import org.apache.accumulo.tserver.MemKey;
import org.apache.accumulo.tserver.NativeMap;
import org.easymock.EasyMock;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Integration Test for https://issues.apache.org/jira/browse/ACCUMULO-4148
* <p>
* User had problem writing one Mutation with multiple KV pairs that had the same key. Doing so
* should write out all pairs in all mutations with a unique id. In typical operation, you would
* only see the last one when scanning. User had a combiner on the table, and they noticed that when
* using InMemoryMap with NativeMapWrapper, only the last KV pair was ever written. When InMemoryMap
* used DefaultMap, all KV pairs were added and the behavior worked as expected.
*
* This IT inserts a variety of Mutations with and without the same KV pairs and then inspects
* result of InMemoryMap mutate, looking for unique id stored with each key. This unique id, shown
* as mc= in the MemKey toString, was originally used for scan Isolation. Writing the same key
* multiple times in the same mutation is a secondary use case, discussed in
* https://issues.apache.org/jira/browse/ACCUMULO-227. In addition to NativeMapWrapper and
* DefaultMap, LocalityGroupMap was add in https://issues.apache.org/jira/browse/ACCUMULO-112.
*
* This test has to be an IT in accumulo-test, because libaccumulo is built in 'integration-test'
* phase of accumulo-native, which currently runs right before accumulo-test. The tests for
* DefaultMap could move to a unit test in tserver, but they are here for convenience of viewing
* both at the same time.
*/
@Category(SunnyDayTests.class)
public class InMemoryMapIT {
private static final Logger log = LoggerFactory.getLogger(InMemoryMapIT.class);
@Rule
public TemporaryFolder tempFolder =
new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
@BeforeClass
public static void ensureNativeLibrary() {
File nativeMapLocation = NativeMapIT.nativeMapLocation();
System.setProperty("accumulo.native.lib.path", nativeMapLocation.getAbsolutePath());
if (!NativeMap.isLoaded()) {
fail("Missing the native library from " + nativeMapLocation.getAbsolutePath()
+ "\nYou need to build the libaccumulo binary first. "
+ "\nTry running 'mvn clean verify -Dit.test=InMemoryMapIT -Dtest=foo"
+ " -DfailIfNoTests=false -Dspotbugs.skip -Dcheckstyle.skip'");
}
}
public static ServerContext getServerContext() {
ServerContext context = EasyMock.createMock(ServerContext.class);
EasyMock.expect(context.getCryptoService()).andReturn(CryptoServiceFactory.newDefaultInstance())
.anyTimes();
EasyMock.replay(context);
return context;
}
@Test
public void testOneMutationOneKey() {
Mutation m = new Mutation("a");
m.put("1cf", "1cq", "vala");
assertEquivalentMutate(m);
}
@Test
public void testOneMutationManyKeys() {
Mutation m = new Mutation("a");
for (int i = 1; i < 6; i++) {
m.put("2cf" + i, "2cq" + i, Integer.toString(i));
}
assertEquivalentMutate(m);
}
@Test
public void testOneMutationManySameKeys() {
Mutation m = new Mutation("a");
for (int i = 1; i <= 5; i++) {
// same keys
m.put("3cf", "3cq", Integer.toString(i));
}
assertEquivalentMutate(m);
}
@Test
public void testMultipleMutationsOneKey() {
Mutation m1 = new Mutation("a");
m1.put("4cf", "4cq", "vala");
Mutation m2 = new Mutation("b");
m2.put("4cf", "4cq", "vala");
assertEquivalentMutate(Arrays.asList(m1, m2));
}
@Test
public void testMultipleMutationsSameOneKey() {
Mutation m1 = new Mutation("a");
m1.put("5cf", "5cq", "vala");
Mutation m2 = new Mutation("a");
m2.put("5cf", "5cq", "vala");
assertEquivalentMutate(Arrays.asList(m1, m2));
}
@Test
public void testMutlipleMutationsMultipleKeys() {
Mutation m1 = new Mutation("a");
for (int i = 1; i < 6; i++) {
m1.put("6cf" + i, "6cq" + i, Integer.toString(i));
}
Mutation m2 = new Mutation("b");
for (int i = 1; i < 3; i++) {
m2.put("6cf" + i, "6cq" + i, Integer.toString(i));
}
assertEquivalentMutate(Arrays.asList(m1, m2));
}
@Test
public void testMultipleMutationsMultipleSameKeys() {
Mutation m1 = new Mutation("a");
for (int i = 1; i < 3; i++) {
m1.put("7cf", "7cq", Integer.toString(i));
}
Mutation m2 = new Mutation("a");
for (int i = 1; i < 4; i++) {
m2.put("7cf", "7cq", Integer.toString(i));
}
assertEquivalentMutate(Arrays.asList(m1, m2));
}
@Test
public void testMultipleMutationsMultipleKeysSomeSame() {
Mutation m1 = new Mutation("a");
for (int i = 1; i < 2; i++) {
m1.put("8cf", "8cq", Integer.toString(i));
}
for (int i = 1; i < 3; i++) {
m1.put("8cf" + i, "8cq" + i, Integer.toString(i));
}
for (int i = 1; i < 2; i++) {
m1.put("8cf" + i, "8cq" + i, Integer.toString(i));
}
Mutation m2 = new Mutation("a");
for (int i = 1; i < 3; i++) {
m2.put("8cf", "8cq", Integer.toString(i));
}
for (int i = 1; i < 4; i++) {
m2.put("8cf" + i, "8cq" + i, Integer.toString(i));
}
Mutation m3 = new Mutation("b");
for (int i = 1; i < 3; i++) {
m3.put("8cf" + i, "8cq" + i, Integer.toString(i));
}
assertEquivalentMutate(Arrays.asList(m1, m2, m3));
}
private void assertEquivalentMutate(Mutation m) {
assertEquivalentMutate(Collections.singletonList(m));
}
private void assertEquivalentMutate(List<Mutation> mutations) {
InMemoryMap defaultMap = null;
InMemoryMap nativeMapWrapper = null;
InMemoryMap localityGroupMap = null;
InMemoryMap localityGroupMapWithNative = null;
try {
Map<String,String> defaultMapConfig = new HashMap<>();
defaultMapConfig.put(Property.TSERV_NATIVEMAP_ENABLED.getKey(), "false");
defaultMapConfig.put(Property.TSERV_MEMDUMP_DIR.getKey(),
tempFolder.newFolder().getAbsolutePath());
defaultMapConfig.put(Property.TABLE_LOCALITY_GROUPS.getKey(), "");
Map<String,String> nativeMapConfig = new HashMap<>();
nativeMapConfig.put(Property.TSERV_NATIVEMAP_ENABLED.getKey(), "true");
nativeMapConfig.put(Property.TSERV_MEMDUMP_DIR.getKey(),
tempFolder.newFolder().getAbsolutePath());
nativeMapConfig.put(Property.TABLE_LOCALITY_GROUPS.getKey(), "");
Map<String,String> localityGroupConfig = new HashMap<>();
localityGroupConfig.put(Property.TSERV_NATIVEMAP_ENABLED.getKey(), "false");
localityGroupConfig.put(Property.TSERV_MEMDUMP_DIR.getKey(),
tempFolder.newFolder().getAbsolutePath());
Map<String,String> localityGroupNativeConfig = new HashMap<>();
localityGroupNativeConfig.put(Property.TSERV_NATIVEMAP_ENABLED.getKey(), "true");
localityGroupNativeConfig.put(Property.TSERV_MEMDUMP_DIR.getKey(),
tempFolder.newFolder().getAbsolutePath());
TableId testId = TableId.of("TEST");
defaultMap =
new InMemoryMap(new ConfigurationCopy(defaultMapConfig), getServerContext(), testId);
nativeMapWrapper =
new InMemoryMap(new ConfigurationCopy(nativeMapConfig), getServerContext(), testId);
localityGroupMap = new InMemoryMap(
updateConfigurationForLocalityGroups(new ConfigurationCopy(localityGroupConfig)),
getServerContext(), testId);
localityGroupMapWithNative = new InMemoryMap(
updateConfigurationForLocalityGroups(new ConfigurationCopy(localityGroupNativeConfig)),
getServerContext(), testId);
} catch (Exception e) {
log.error("Error getting new InMemoryMap ", e);
fail(e.getMessage());
}
// ensure the maps are correct type
assertEquals("Not a DefaultMap", InMemoryMap.TYPE_DEFAULT_MAP, defaultMap.getMapType());
assertEquals("Not a NativeMapWrapper", InMemoryMap.TYPE_NATIVE_MAP_WRAPPER,
nativeMapWrapper.getMapType());
assertEquals("Not a LocalityGroupMap", InMemoryMap.TYPE_LOCALITY_GROUP_MAP,
localityGroupMap.getMapType());
assertEquals("Not a LocalityGroupMap with native", InMemoryMap.TYPE_LOCALITY_GROUP_MAP_NATIVE,
localityGroupMapWithNative.getMapType());
int count = 0;
for (Mutation m : mutations) {
count += m.size();
}
defaultMap.mutate(mutations, count);
nativeMapWrapper.mutate(mutations, count);
localityGroupMap.mutate(mutations, count);
localityGroupMapWithNative.mutate(mutations, count);
// let's use the transitive property to assert all four are equivalent
assertMutatesEquivalent(mutations, defaultMap, nativeMapWrapper);
assertMutatesEquivalent(mutations, defaultMap, localityGroupMap);
assertMutatesEquivalent(mutations, defaultMap, localityGroupMapWithNative);
}
/**
* Assert that a set of mutations mutate to equivalent map in both of the InMemoryMaps.
* <p>
* In this case, equivalent means 2 things.
* <ul>
* <li>The size of both maps generated is equal to the number of key value pairs in all mutations
* passed</li>
* <li>The size of the map generated from the first InMemoryMap equals the size of the map
* generated from the second</li>
* <li>Each key value pair in each mutated map has a unique id (kvCount)</li>
* </ul>
*
* @param mutations
* List of mutations
* @param imm1
* InMemoryMap to compare
* @param imm2
* InMemoryMap to compare
*/
private void assertMutatesEquivalent(List<Mutation> mutations, InMemoryMap imm1,
InMemoryMap imm2) {
int mutationKVPairs = countKVPairs(mutations);
List<MemKey> memKeys1 = getArrayOfMemKeys(imm1);
List<MemKey> memKeys2 = getArrayOfMemKeys(imm2);
assertEquals("Not all key value pairs included: " + dumpInMemoryMap(imm1, memKeys1),
mutationKVPairs, memKeys1.size());
assertEquals("InMemoryMaps differ in size: " + dumpInMemoryMap(imm1, memKeys1) + "\n"
+ dumpInMemoryMap(imm2, memKeys2), memKeys1.size(), memKeys2.size());
assertEquals("InMemoryMap did not have distinct kvCounts " + dumpInMemoryMap(imm1, memKeys1),
mutationKVPairs, getUniqKVCount(memKeys1));
assertEquals("InMemoryMap did not have distinct kvCounts " + dumpInMemoryMap(imm2, memKeys2),
mutationKVPairs, getUniqKVCount(memKeys2));
}
private int countKVPairs(List<Mutation> mutations) {
int count = 0;
for (Mutation m : mutations) {
count += m.size();
}
return count;
}
private List<MemKey> getArrayOfMemKeys(InMemoryMap imm) {
SortedKeyValueIterator<Key,Value> skvi = imm.compactionIterator();
List<MemKey> memKeys = new ArrayList<>();
try {
skvi.seek(new Range(), new ArrayList<>(), false); // everything
while (skvi.hasTop()) {
memKeys.add((MemKey) skvi.getTopKey());
skvi.next();
}
} catch (IOException ex) {
log.error("Error getting memkeys", ex);
throw new UncheckedIOException(ex);
}
return memKeys;
}
private String dumpInMemoryMap(InMemoryMap map, List<MemKey> memkeys) {
StringBuilder sb = new StringBuilder();
sb.append("InMemoryMap type ");
sb.append(map.getMapType());
sb.append("\n");
for (MemKey mk : memkeys) {
sb.append(" ");
sb.append(mk);
sb.append("\n");
}
return sb.toString();
}
private int getUniqKVCount(List<MemKey> memKeys) {
List<Integer> kvCounts = new ArrayList<>();
for (MemKey m : memKeys) {
kvCounts.add(m.getKVCount());
}
return Set.copyOf(kvCounts).size();
}
private ConfigurationCopy updateConfigurationForLocalityGroups(ConfigurationCopy configuration) {
Map<String,Set<ByteSequence>> locGroups = getLocalityGroups();
StringBuilder enabledLGs = new StringBuilder();
for (Entry<String,Set<ByteSequence>> entry : locGroups.entrySet()) {
if (enabledLGs.length() > 0) {
enabledLGs.append(",");
}
StringBuilder value = new StringBuilder();
for (ByteSequence bytes : entry.getValue()) {
if (value.length() > 0) {
value.append(",");
}
value.append(new String(bytes.toArray()));
}
configuration.set("table.group." + entry.getKey(), value.toString());
enabledLGs.append(entry.getKey());
}
configuration.set(Property.TABLE_LOCALITY_GROUPS, enabledLGs.toString());
return configuration;
}
private Map<String,Set<ByteSequence>> getLocalityGroups() {
Map<String,Set<ByteSequence>> locgro = new HashMap<>();
locgro.put("a", newCFSet("cf", "cf2"));
locgro.put("b", newCFSet("cf3", "cf4"));
return locgro;
}
// from InMemoryMapTest
private Set<ByteSequence> newCFSet(String... cfs) {
HashSet<ByteSequence> cfSet = new HashSet<>();
for (String cf : cfs) {
cfSet.add(new ArrayByteSequence(cf));
}
return cfSet;
}
}