blob: 152b6e892ca7e8e4d4b742bc9f54ff814153340b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.test;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.clientImpl.AccumuloServerException;
import org.apache.accumulo.core.clientImpl.Namespace;
import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.functional.ConfigurableMacBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LargeSplitRowIT extends ConfigurableMacBase {
private static final Logger log = LoggerFactory.getLogger(LargeSplitRowIT.class);
@Override
public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
cfg.setNumTservers(1);
Map<String,String> siteConfig = new HashMap<>();
siteConfig.put(Property.TSERV_MAJC_DELAY.getKey(), "50ms");
cfg.setSiteConfig(siteConfig);
}
@Override
protected int defaultTimeoutSeconds() {
return 60;
}
// User added split
@Test
public void userAddedSplit() throws Exception {
log.info("User added split");
// make a table and lower the TABLE_END_ROW_MAX_SIZE property
final String tableName = getUniqueNames(1)[0];
try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) {
client.tableOperations().create(tableName);
client.tableOperations().setProperty(tableName, Property.TABLE_MAX_END_ROW_SIZE.getKey(),
"1000");
// Create a BatchWriter and add a mutation to the table
BatchWriter batchWriter = client.createBatchWriter(tableName, new BatchWriterConfig());
Mutation m = new Mutation("Row");
m.put("cf", "cq", "value");
batchWriter.addMutation(m);
batchWriter.close();
// Create a split point that is too large to be an end row and fill it with all 'm'
SortedSet<Text> partitionKeys = new TreeSet<>();
byte[] data = new byte[(int) (ConfigurationTypeHelper
.getFixedMemoryAsBytes(Property.TABLE_MAX_END_ROW_SIZE.getDefaultValue()) + 2)];
for (int i = 0; i < data.length; i++) {
data[i] = 'm';
}
partitionKeys.add(new Text(data));
// try to add the split point that is too large, if the split point is created the test fails.
try {
client.tableOperations().addSplits(tableName, partitionKeys);
fail();
} catch (AccumuloServerException e) {}
// Make sure that the information that was written to the table before we tried to add the
// split
// point is still correct
int counter = 0;
try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) {
for (Entry<Key,Value> entry : scanner) {
counter++;
Key k = entry.getKey();
assertEquals("Row", k.getRow().toString());
assertEquals("cf", k.getColumnFamily().toString());
assertEquals("cq", k.getColumnQualifier().toString());
assertEquals("value", entry.getValue().toString());
}
}
// Make sure there is only one line in the table
assertEquals(1, counter);
}
}
// Test tablet server split with 250 entries with all the same prefix
@Test(timeout = 60 * 1000)
public void automaticSplitWith250Same() throws Exception {
log.info("Automatic with 250 with same prefix");
// make a table and lower the configure properties
final String tableName = getUniqueNames(1)[0];
try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) {
client.tableOperations().create(tableName);
client.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(),
"10K");
client.tableOperations().setProperty(tableName, Property.TABLE_FILE_COMPRESSION_TYPE.getKey(),
"none");
client.tableOperations().setProperty(tableName,
Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "64");
client.tableOperations().setProperty(tableName, Property.TABLE_MAX_END_ROW_SIZE.getKey(),
"1000");
// Create a BatchWriter and key for a table entry that is longer than the allowed size for an
// end row
// Fill this key with all m's except the last spot
BatchWriter batchWriter = client.createBatchWriter(tableName, new BatchWriterConfig());
byte[] data = new byte[(int) (ConfigurationTypeHelper
.getFixedMemoryAsBytes(Property.TABLE_MAX_END_ROW_SIZE.getDefaultValue()) + 2)];
for (int i = 0; i < data.length - 1; i++) {
data[i] = (byte) 'm';
}
// Make the last place in the key different for every entry added to the table
for (int i = 0; i < 250; i++) {
data[data.length - 1] = (byte) i;
Mutation m = new Mutation(data);
m.put("cf", "cq", "value");
batchWriter.addMutation(m);
}
// Flush the BatchWriter and table and sleep for a bit to make sure that there is enough time
// for the table to split if need be.
batchWriter.close();
client.tableOperations().flush(tableName, new Text(), new Text("z"), true);
Thread.sleep(500);
// Make sure all the data that was put in the table is still correct
int count = 0;
try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) {
for (Entry<Key,Value> entry : scanner) {
Key k = entry.getKey();
data[data.length - 1] = (byte) count;
String expected = new String(data, UTF_8);
assertEquals(expected, k.getRow().toString());
assertEquals("cf", k.getColumnFamily().toString());
assertEquals("cq", k.getColumnQualifier().toString());
assertEquals("value", entry.getValue().toString());
count++;
}
}
assertEquals(250, count);
// Make sure no splits occurred in the table
assertEquals(0, client.tableOperations().listSplits(tableName).size());
}
}
// 10 0's; 10 2's; 10 4's... 10 30's etc
@Test(timeout = 60 * 1000)
public void automaticSplitWithGaps() throws Exception {
log.info("Automatic Split With Gaps");
try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) {
automaticSplit(client, 30, 2);
}
}
// 10 0's; 10 1's; 10 2's... 10 15's etc
@Test(timeout = 60 * 1000)
public void automaticSplitWithoutGaps() throws Exception {
log.info("Automatic Split Without Gaps");
try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) {
automaticSplit(client, 15, 1);
}
}
@Test(timeout = 60 * 1000)
public void automaticSplitLater() throws Exception {
log.info("Split later");
try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) {
automaticSplit(client, 15, 1);
String tableName = new String();
for (String curr : client.tableOperations().list()) {
if (!curr.startsWith(Namespace.ACCUMULO.name() + ".")) {
tableName = curr;
}
}
// Create a BatchWriter and key for a table entry that is longer than the allowed size for an
// end row
BatchWriter batchWriter = client.createBatchWriter(tableName, new BatchWriterConfig());
byte[] data = new byte[10];
// Fill key with all j's except for last spot which alternates through 1 through 10 for every
// j
// value
for (int j = 15; j < 150; j += 1) {
for (int i = 0; i < data.length - 1; i++) {
data[i] = (byte) j;
}
for (int i = 0; i < 25; i++) {
data[data.length - 1] = (byte) i;
Mutation m = new Mutation(data);
m.put("cf", "cq", "value");
batchWriter.addMutation(m);
}
}
// Flush the BatchWriter and table and sleep for a bit to make sure that there is enough time
// for the table to split if need be.
batchWriter.close();
client.tableOperations().flush(tableName, new Text(), new Text("z"), true);
// Make sure a split occurs
while (client.tableOperations().listSplits(tableName).isEmpty()) {
Thread.sleep(250);
}
assertTrue(!client.tableOperations().listSplits(tableName).isEmpty());
}
}
private void automaticSplit(AccumuloClient client, int max, int spacing) throws Exception {
// make a table and lower the configure properties
final String tableName = getUniqueNames(1)[0];
client.tableOperations().create(tableName);
client.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "10K");
client.tableOperations().setProperty(tableName, Property.TABLE_FILE_COMPRESSION_TYPE.getKey(),
"none");
client.tableOperations().setProperty(tableName,
Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "64");
client.tableOperations().setProperty(tableName, Property.TABLE_MAX_END_ROW_SIZE.getKey(),
"1000");
// Create a BatchWriter and key for a table entry that is longer than the allowed size for an
// end row
BatchWriter batchWriter = client.createBatchWriter(tableName, new BatchWriterConfig());
byte[] data = new byte[(int) (ConfigurationTypeHelper
.getFixedMemoryAsBytes(Property.TABLE_MAX_END_ROW_SIZE.getDefaultValue()) + 2)];
// Fill key with all j's except for last spot which alternates through 1 through 10 for every j
// value
for (int j = 0; j < max; j += spacing) {
for (int i = 0; i < data.length - 1; i++) {
data[i] = (byte) j;
}
for (int i = 0; i < 10; i++) {
data[data.length - 1] = (byte) i;
Mutation m = new Mutation(data);
m.put("cf", "cq", "value");
batchWriter.addMutation(m);
}
}
// Flush the BatchWriter and table and sleep for a bit to make sure that there is enough time
// for the table to split if need be.
batchWriter.close();
client.tableOperations().flush(tableName, new Text(), new Text("z"), true);
Thread.sleep(500);
// Make sure all the data that was put in the table is still correct
int count = 0;
int extra = 10;
try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) {
for (Entry<Key,Value> entry : scanner) {
if (extra == 10) {
extra = 0;
for (int i = 0; i < data.length - 1; i++) {
data[i] = (byte) count;
}
count += spacing;
}
Key k = entry.getKey();
data[data.length - 1] = (byte) extra;
String expected = new String(data, UTF_8);
assertEquals(expected, k.getRow().toString());
assertEquals("cf", k.getColumnFamily().toString());
assertEquals("cq", k.getColumnQualifier().toString());
assertEquals("value", entry.getValue().toString());
extra++;
}
}
assertEquals(10, extra);
assertEquals(max, count);
// Make sure no splits occurred in the table
assertEquals(0, client.tableOperations().listSplits(tableName).size());
}
}