blob: 11530518af903a2d3709cabde0a151bede979761 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.kudu.client;
import static org.apache.kudu.test.ClientTestUtil.getBasicSchema;
import static org.apache.kudu.test.ClientTestUtil.getBasicTableOptionsWithNonCoveredRange;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.apache.kudu.Schema;
import org.apache.kudu.test.KuduTestHarness;
public class TestKuduPartitioner {
private KuduClient client;
@Rule
public KuduTestHarness harness = new KuduTestHarness();
@Before
public void setUp() {
client = harness.getClient();
}
@Test
public void testPartitioner() throws Exception {
// Create a table with the following 9 partitions:
//
// hash bucket
// key 0 1 2
// -----------------
// <3333 x x x
// 3333-6666 x x x
// >=6666 x x x
Schema basicSchema = getBasicSchema();
int numRanges = 3;
int numHashPartitions = 3;
String tableName = "TestPartitioner";
List<PartialRow> splitRows = new ArrayList<>();
for (int split : Arrays.asList(3333, 6666)) {
PartialRow row = basicSchema.newPartialRow();
row.addInt("key", split);
splitRows.add(row);
}
CreateTableOptions createOptions = new CreateTableOptions();
createOptions.addHashPartitions(Collections.singletonList("key"), numHashPartitions);
createOptions.setRangePartitionColumns(Collections.singletonList("key"));
for (PartialRow row : splitRows) {
createOptions.addSplitRow(row);
}
KuduTable table = client.createTable(tableName, basicSchema, createOptions);
Schema schema = table.getSchema();
KuduPartitioner part = new KuduPartitioner.KuduPartitionerBuilder(table).build();
assertEquals(numRanges * numHashPartitions, part.numPartitions());
// Partition a bunch of rows, counting how many fall into each partition.
int numRowsToPartition = 10000;
int[] countsByPartition = new int[part.numPartitions()];
Arrays.fill(countsByPartition, 0);
for (int i = 0; i < numRowsToPartition; i++) {
PartialRow row = schema.newPartialRow();
row.addInt("key", i);
int partIndex = part.partitionRow(row);
countsByPartition[partIndex]++;
}
// We don't expect a completely even division of rows into partitions, but
// we should be within 10% of that.
int expectedPerPartition = numRowsToPartition / part.numPartitions();
int fuzziness = expectedPerPartition / 10;
int minPerPartition = expectedPerPartition - fuzziness;
int maxPerPartition = expectedPerPartition + fuzziness;
for (int i = 0; i < part.numPartitions(); i++) {
assertTrue(minPerPartition <= countsByPartition[i]);
assertTrue(maxPerPartition >= countsByPartition[i]);
}
// Drop the first and third range partition.
AlterTableOptions alterOptions = new AlterTableOptions();
alterOptions.dropRangePartition(basicSchema.newPartialRow(), splitRows.get(0));
alterOptions.dropRangePartition(splitRows.get(1), basicSchema.newPartialRow());
client.alterTable(tableName, alterOptions);
// The existing partitioner should still return results based on the table
// state at the time it was created, and successfully return partitions
// for rows in the now-dropped range.
assertEquals(numRanges * numHashPartitions, part.numPartitions());
PartialRow row = schema.newPartialRow();
row.addInt("key", 1000);
assertEquals(0, part.partitionRow(row));
// If we recreate the partitioner, it should get the new partitioning info.
part = new KuduPartitioner.KuduPartitionerBuilder(table).build();
numRanges = 1;
assertEquals(numRanges * numHashPartitions, part.numPartitions());
}
@Test
public void testPartitionerNonCoveredRange() throws Exception {
Schema basicSchema = getBasicSchema();
int numHashPartitions = 3;
String tableName = "TestPartitionerNonCoveredRange";
CreateTableOptions createOptions = new CreateTableOptions();
createOptions.addHashPartitions(Collections.singletonList("key"), numHashPartitions);
createOptions.setRangePartitionColumns(Collections.singletonList("key"));
// Cover a range where 1000 <= key < 2000
PartialRow lower = basicSchema.newPartialRow();
lower.addInt("key", 1000);
PartialRow upper = basicSchema.newPartialRow();
upper.addInt("key", 2000);
createOptions.addRangePartition(lower, upper);
KuduTable table = client.createTable(tableName, basicSchema, createOptions);
Schema schema = table.getSchema();
KuduPartitioner part = new KuduPartitioner.KuduPartitionerBuilder(table).build();
try {
PartialRow under = schema.newPartialRow();
under.addInt("key", 999);
part.partitionRow(under);
fail("partitionRow did not throw a NonCoveredRangeException");
} catch (NonCoveredRangeException ex) {
// Expected
}
try {
PartialRow over = schema.newPartialRow();
over.addInt("key", 999);
part.partitionRow(over);
fail("partitionRow did not throw a NonCoveredRangeException");
} catch (NonCoveredRangeException ex) {
// Expected
}
}
@Test
public void testBuildTimeout() throws Exception {
Schema basicSchema = getBasicSchema();
String tableName = "TestBuildTimeout";
CreateTableOptions createOptions = new CreateTableOptions();
createOptions.addHashPartitions(Collections.singletonList("key"), 3);
createOptions.setRangePartitionColumns(Collections.singletonList("key"));
KuduTable table = client.createTable(tableName, basicSchema, createOptions);
// Ensure the table information can't be found to force a timeout.
harness.killAllMasterServers();
int timeoutMs = 2000;
long now = System.currentTimeMillis();
try {
new KuduPartitioner.KuduPartitionerBuilder(table).buildTimeout(timeoutMs).build();
fail("No NonRecoverableException was thrown");
} catch (NonRecoverableException ex) {
assertTrue(ex.getMessage().startsWith("cannot complete before timeout"));
}
long elapsed = System.currentTimeMillis() - now;
long upperBound = timeoutMs * 2L; // Add 100% to avoid flakiness.
assertTrue(String.format("Elapsed time %d exceeded upper bound %d", elapsed, upperBound),
elapsed <= upperBound);
}
@Test
public void testTableCache() throws Exception {
String tableName = "TestTableCache";
KuduTable table =
client.createTable(tableName, getBasicSchema(), getBasicTableOptionsWithNonCoveredRange());
// Populate the table cache by building the partitioner once.
KuduPartitioner partitioner = new KuduPartitioner.KuduPartitionerBuilder(table).build();
// Ensure the remote table information can't be found.
harness.killAllMasterServers();
// This partitioner should build correctly because the table cache holds the partitions
// from the previous partitioner.
KuduPartitioner partitionerFromCache = new KuduPartitioner.KuduPartitionerBuilder(table).build();
assertEquals(partitioner.numPartitions(), partitionerFromCache.numPartitions());
}
}