blob: 7638c6f538a1291035152244ff69a81c6dedcf2d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.testing.gcs;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeSet;
import java.util.function.Function;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.IsolatedScanner;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.hadoop.io.Text;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.hash.Hashing;
public class Persistence {
private BatchWriter writer;
private String table;
private AccumuloClient client;
Persistence(GcsEnv env) {
this.client = env.getAccumuloClient();
this.table = env.getTableName();
try {
this.writer = client.createBatchWriter(table);
} catch (TableNotFoundException e) {
throw new RuntimeException(e);
}
}
static String toHex(int i) {
return Strings.padStart(Integer.toHexString(i), 8, '0');
}
static String toHex(long l) {
return Strings.padStart(Long.toHexString(l), 16, '0');
}
private void write(Mutation m) {
try {
writer.addMutation(m);
} catch (MutationsRejectedException e) {
throw new RuntimeException(e);
}
}
private String toHex(long l1, long l2, long l3) {
return toHex(l1) + ":" + toHex(l2) + ":" + toHex(l3);
}
private String toHexWithHash(long l1, long l2) {
int hc = Hashing.murmur3_32().newHasher().putLong(l1).putLong(l2).hash().asInt();
return toHex(hc) + ":" + toHex(l1) + ":" + toHex(l2);
}
private String toHexWithHash(long l1, long l2, long l3) {
int hc = Hashing.murmur3_32().newHasher().putLong(l1).putLong(l2).putLong(l3).hash().asInt();
return toHex(hc) + ":" + toHex(l1) + ":" + toHex(l2) + ":" + toHex(l3);
}
void save(Item item, ItemState state) {
Mutation m = new Mutation("I:" + toHexWithHash(item.clientId, item.groupId));
m.put("item", toHex(item.itemId), state.name());
write(m);
}
void save(ItemRef itemRef) {
Mutation m = new Mutation("R:" + toHex(itemRef.bucket));
m.put("ref", toHex(itemRef.clientId, itemRef.groupId, itemRef.itemId), "");
write(m);
}
public void save(Collection<ItemRef> refsToAdd) {
Map<Integer,Mutation> mutations = new HashMap<>();
for (ItemRef itemRef : refsToAdd) {
Mutation m = mutations.computeIfAbsent(itemRef.bucket,
bucket -> new Mutation("R:" + toHex(bucket)));
m.put("ref", toHex(itemRef.clientId, itemRef.groupId, itemRef.itemId), "");
}
mutations.values().forEach(m -> write(m));
}
public void replace(Collection<ItemRef> refsToDelete, ItemRef refToAdd) {
Mutation m = new Mutation("R:" + toHex(refToAdd.bucket));
m.put("ref", toHex(refToAdd.clientId, refToAdd.groupId, refToAdd.itemId), "");
for (ItemRef ir : refsToDelete) {
Preconditions.checkArgument(refToAdd.bucket == ir.bucket);
m.putDelete("ref", toHex(ir.clientId, ir.groupId, ir.itemId));
}
write(m);
}
void save(GroupRef groupRef) {
Mutation m = new Mutation("G:" + toHexWithHash(groupRef.clientId, groupRef.groupId));
m.put("", "", "");
write(m);
}
void save(Candidate c) {
Mutation m = new Mutation("C:" + toHexWithHash(c.clientId, c.groupId, c.itemId));
m.put("", "", "");
write(m);
}
public void flush() {
try {
writer.flush();
} catch (MutationsRejectedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void delete(ItemRef itemRef) {
Mutation m = new Mutation("R:" + toHex(itemRef.bucket));
m.putDelete("ref", toHex(itemRef.clientId, itemRef.groupId, itemRef.itemId));
write(m);
}
public void delete(Collection<ItemRef> refsToDelete) {
Map<Integer,Mutation> mutations = new HashMap<>();
for (ItemRef itemRef : refsToDelete) {
Mutation m = mutations.computeIfAbsent(itemRef.bucket,
bucket -> new Mutation("R:" + toHex(bucket)));
m.putDelete("ref", toHex(itemRef.clientId, itemRef.groupId, itemRef.itemId));
}
mutations.values().forEach(m -> write(m));
}
public void delete(GroupRef groupRef) {
Mutation m = new Mutation("G:" + toHexWithHash(groupRef.clientId, groupRef.groupId));
m.putDelete("", "");
write(m);
}
public void delete(Item item) {
Mutation m = new Mutation("I:" + toHexWithHash(item.clientId, item.groupId));
m.putDelete("item", toHex(item.itemId));
write(m);
}
public void delete(Candidate c) {
Mutation m = new Mutation("C:" + toHexWithHash(c.clientId, c.groupId, c.itemId));
m.putDelete("", "");
write(m);
}
private Scanner createScanner(String prefix) {
Scanner scanner;
try {
scanner = new IsolatedScanner(client.createScanner(table));
} catch (TableNotFoundException | AccumuloSecurityException | AccumuloException e) {
throw new RuntimeException(e);
}
scanner.setRange(Range.prefix(prefix));
return scanner;
}
private <T> Iterable<T> transformRange(String prefix, Function<Key,T> func) {
return Iterables.transform(createScanner(prefix), entry -> func.apply(entry.getKey()));
}
Iterable<Candidate> candidates() {
return transformRange("C:", k -> {
String row = k.getRowData().toString();
String[] fields = row.substring(11).split(":");
Preconditions.checkState(fields.length == 3, "Bad candidate row %s", row);
long clientId = Long.parseLong(fields[0], 16);
long groupId = Long.parseLong(fields[1], 16);
long itemId = Long.parseLong(fields[2], 16);
return new Candidate(clientId, groupId, itemId);
});
}
Iterable<ItemRef> itemRefs() {
return transformRange("R:", k -> {
String row = k.getRowData().toString();
String qual = k.getColumnQualifierData().toString();
int bucket = Integer.parseInt(row.substring(2), 16);
String[] fields = qual.split(":");
Preconditions.checkState(fields.length == 3, "Bad item ref %s", k);
long clientId = Long.parseLong(fields[0], 16);
long groupId = Long.parseLong(fields[1], 16);
long itemId = Long.parseLong(fields[2], 16);
return new ItemRef(bucket, clientId, groupId, itemId);
});
}
Iterable<GroupRef> groupRefs() {
return transformRange("G:", k -> {
String row = k.getRowData().toString();
String[] fields = row.substring(11).split(":");
Preconditions.checkState(fields.length == 2, "Bad group ref row %s", row);
long clientId = Long.parseLong(fields[0], 16);
long groupId = Long.parseLong(fields[1], 16);
return new GroupRef(clientId, groupId);
});
}
Iterable<Item> items(ItemState... states) {
EnumSet<ItemState> stateSet = EnumSet.of(states[0]);
for (int i = 1; i < states.length; i++) {
stateSet.add(states[i]);
}
Iterable<Entry<Key,Value>> itemIter = Iterables.filter(createScanner("I:"),
entry -> stateSet.contains(ItemState.valueOf(entry.getValue().toString())));
return Iterables.transform(itemIter, entry -> {
Key k = entry.getKey();
String row = k.getRowData().toString();
String qual = k.getColumnQualifierData().toString();
String[] fields = row.substring(11).split(":");
Preconditions.checkState(fields.length == 2, "Bad group ref row %s", row);
long clientId = Long.parseLong(fields[0], 16);
long groupId = Long.parseLong(fields[1], 16);
long itemId = Long.parseLong(qual, 16);
return new Item(clientId, groupId, itemId);
});
}
private static TreeSet<Text> initialSplits(GcsEnv env) {
TreeSet<Text> splits = new TreeSet<>();
int tabletsPerSection = env.getInitialTablets();
for (String prefix : new String[] {"G:", "C:", "I:", "R:"}) {
int numSplits = tabletsPerSection - 1;
long max;
if (prefix.equals("R:")) {
max = env.getMaxBuckets();
} else {
max = 1L << 33 - 1;
}
long distance = (max / tabletsPerSection) + 1;
long split = distance;
for (int i = 0; i < numSplits; i++) {
String s = String.format("%08x", split);
while (s.charAt(s.length() - 1) == '0') {
s = s.substring(0, s.length() - 1);
}
splits.add(new Text(prefix + s));
split += distance;
}
splits.add(new Text(prefix + "~"));
}
return splits;
}
public static void init(GcsEnv env) throws Exception {
NewTableConfiguration ntc = new NewTableConfiguration();
ntc.withSplits(initialSplits(env));
ntc.setProperties(ImmutableMap.of("table.compaction.major.ratio", "1"));
env.getAccumuloClient().tableOperations().create(env.getTableName(), ntc);
}
public void flushTable() {
try {
client.tableOperations().flush(table);
} catch (AccumuloException | AccumuloSecurityException e) {
throw new RuntimeException(e);
}
}
}