blob: cdcd0e61b725b1e1b3e0e6b6db62ff6041f3cfff [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.testing.randomwalk.shard;
import java.net.InetAddress;
import java.util.Random;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.MultiTableBatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.testing.randomwalk.Fixture;
import org.apache.accumulo.testing.randomwalk.RandWalkEnv;
import org.apache.accumulo.testing.randomwalk.State;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
public class ShardFixture extends Fixture {
static SortedSet<Text> genSplits(long max, int numTablets, String format) {
int numSplits = numTablets - 1;
long distance = max / numTablets;
long split = distance;
TreeSet<Text> splits = new TreeSet<>();
for (int i = 0; i < numSplits; i++) {
splits.add(new Text(String.format(format, split)));
split += distance;
}
return splits;
}
static void createIndexTable(Logger log, State state, RandWalkEnv env, String suffix, Random rand)
throws Exception {
AccumuloClient client = env.getAccumuloClient();
String name = state.get("indexTableName") + suffix;
int numPartitions = (Integer) state.get("numPartitions");
boolean enableCache = (Boolean) state.get("cacheIndex");
client.tableOperations().create(name);
String tableId = client.tableOperations().tableIdMap().get(name);
log.info("Created index table " + name + "(id:" + tableId + ")");
SortedSet<Text> splits = genSplits(numPartitions, rand.nextInt(numPartitions) + 1, "%06x");
client.tableOperations().addSplits(name, splits);
log.info("Added " + splits.size() + " splits to " + name);
if (enableCache) {
client.tableOperations().setProperty(name, Property.TABLE_INDEXCACHE_ENABLED.getKey(),
"true");
client.tableOperations().setProperty(name, Property.TABLE_BLOCKCACHE_ENABLED.getKey(),
"true");
log.info("Enabled caching for table " + name);
}
}
@Override
public void setUp(State state, RandWalkEnv env) throws Exception {
String hostname = InetAddress.getLocalHost().getHostName().replaceAll("[-.]", "_");
String pid = env.getPid();
Random rand = new Random();
int numPartitions = rand.nextInt(90) + 10;
state.set("indexTableName",
String.format("ST_index_%s_%s_%d", hostname, pid, System.currentTimeMillis()));
state.set("docTableName",
String.format("ST_docs_%s_%s_%d", hostname, pid, System.currentTimeMillis()));
state.set("numPartitions", Integer.valueOf(numPartitions));
state.set("cacheIndex", rand.nextDouble() < .5);
state.set("rand", rand);
state.set("nextDocID", Long.valueOf(0));
AccumuloClient client = env.getAccumuloClient();
createIndexTable(this.log, state, env, "", rand);
String docTableName = (String) state.get("docTableName");
client.tableOperations().create(docTableName);
String tableId = client.tableOperations().tableIdMap().get(docTableName);
log.info("Created doc table " + docTableName + " (id:" + tableId + ")");
SortedSet<Text> splits = genSplits(0xff, rand.nextInt(32) + 1, "%02x");
client.tableOperations().addSplits(docTableName, splits);
log.info("Added " + splits.size() + " splits to " + docTableName);
if (rand.nextDouble() < .5) {
client.tableOperations().setProperty((String) state.get("docTableName"),
Property.TABLE_BLOOM_ENABLED.getKey(), "true");
log.info("Enabled bloom filters for table " + (String) state.get("docTableName"));
}
}
@Override
public void tearDown(State state, RandWalkEnv env) throws Exception {
// We have resources we need to clean up
if (env.isMultiTableBatchWriterInitialized()) {
MultiTableBatchWriter mtbw = env.getMultiTableBatchWriter();
try {
mtbw.close();
} catch (MutationsRejectedException e) {
log.error("Ignoring mutations that weren't flushed", e);
}
// Reset the MTBW on the state to null
env.resetMultiTableBatchWriter();
}
AccumuloClient client = env.getAccumuloClient();
log.info("Deleting index and doc tables");
client.tableOperations().delete((String) state.get("indexTableName"));
client.tableOperations().delete((String) state.get("docTableName"));
log.debug("Exiting shard test");
}
}