blob: a0f23db29f4f28220afabb886e2cc5f67acf7dbc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.util;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Column;
import org.apache.accumulo.core.data.ColumnUpdate;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.thrift.TMutation;
import org.apache.accumulo.core.file.FileSKVIterator;
import org.apache.accumulo.core.file.rfile.RFile.Reader;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableSet;
public class LocalityGroupUtil {
private static final Logger log = LoggerFactory.getLogger(LocalityGroupUtil.class);
// using an ImmutableSet here for more efficient comparisons in LocalityGroupIterator
public static final Set<ByteSequence> EMPTY_CF_SET = Set.of();
/**
* Create a set of families to be passed into the SortedKeyValueIterator seek call from a supplied
* set of columns. We are using the ImmutableSet to enable faster comparisons down in the
* LocalityGroupIterator.
*
* @param columns
* The set of columns
* @return An immutable set of columns
*/
public static Set<ByteSequence> families(Collection<Column> columns) {
if (columns.isEmpty()) {
return EMPTY_CF_SET;
}
var builder = ImmutableSet.<ByteSequence>builder();
columns.forEach(c -> builder.add(new ArrayByteSequence(c.getColumnFamily())));
return builder.build();
}
@SuppressWarnings("serial")
public static class LocalityGroupConfigurationError extends AccumuloException {
LocalityGroupConfigurationError(String why) {
super(why);
}
}
public static boolean isLocalityGroupProperty(String prop) {
return prop.startsWith(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey())
|| prop.equals(Property.TABLE_LOCALITY_GROUPS.getKey());
}
public static void checkLocalityGroups(Iterable<Entry<String,String>> config)
throws LocalityGroupConfigurationError {
ConfigurationCopy cc = new ConfigurationCopy(config);
if (cc.get(Property.TABLE_LOCALITY_GROUPS) != null) {
getLocalityGroups(cc);
}
}
public static Map<String,Set<ByteSequence>>
getLocalityGroupsIgnoringErrors(AccumuloConfiguration acuconf, TableId tableId) {
try {
return getLocalityGroups(acuconf);
} catch (LocalityGroupConfigurationError | RuntimeException e) {
log.warn("Failed to get locality group config for tableId:" + tableId
+ ", proceeding without locality groups.", e);
}
return Collections.emptyMap();
}
public static Map<String,Set<ByteSequence>> getLocalityGroups(AccumuloConfiguration acuconf)
throws LocalityGroupConfigurationError {
Map<String,Set<ByteSequence>> result = new HashMap<>();
String[] groups = acuconf.get(Property.TABLE_LOCALITY_GROUPS).split(",");
for (String group : groups) {
if (!group.isEmpty()) {
result.put(group, new HashSet<>());
}
}
HashSet<ByteSequence> all = new HashSet<>();
for (Entry<String,String> entry : acuconf) {
String property = entry.getKey();
String value = entry.getValue();
String prefix = Property.TABLE_LOCALITY_GROUP_PREFIX.getKey();
if (property.startsWith(prefix)) {
// this property configures a locality group, find out which one:
String group = property.substring(prefix.length());
String[] parts = group.split("\\.");
group = parts[0];
if (result.containsKey(group)) {
if (parts.length == 1) {
Set<ByteSequence> colFamsSet = decodeColumnFamilies(value);
if (!Collections.disjoint(all, colFamsSet)) {
colFamsSet.retainAll(all);
throw new LocalityGroupConfigurationError("Column families " + colFamsSet
+ " in group " + group + " is already used by another locality group");
}
all.addAll(colFamsSet);
result.put(group, colFamsSet);
}
}
}
}
Set<Entry<String,Set<ByteSequence>>> es = result.entrySet();
for (Entry<String,Set<ByteSequence>> entry : es) {
if (entry.getValue().isEmpty()) {
throw new LocalityGroupConfigurationError(
"Locality group " + entry.getKey() + " specified but not declared");
}
}
// result.put("", all);
return result;
}
public static Set<ByteSequence> decodeColumnFamilies(String colFams)
throws LocalityGroupConfigurationError {
HashSet<ByteSequence> colFamsSet = new HashSet<>();
for (String family : colFams.split(",")) {
ByteSequence cfbs = decodeColumnFamily(family);
colFamsSet.add(cfbs);
}
return colFamsSet;
}
public static ByteSequence decodeColumnFamily(String colFam)
throws LocalityGroupConfigurationError {
byte[] output = new byte[colFam.length()];
int pos = 0;
for (int i = 0; i < colFam.length(); i++) {
char c = colFam.charAt(i);
if (c == '\\') {
// next char must be 'x' or '\'
i++;
if (i >= colFam.length()) {
throw new LocalityGroupConfigurationError("Expected 'x' or '\' after '\' in " + colFam);
}
char nc = colFam.charAt(i);
switch (nc) {
case '\\':
output[pos++] = '\\';
break;
case 'x':
// next two chars must be [0-9][0-9]
i++;
output[pos++] = (byte) (0xff & Integer.parseInt(colFam.substring(i, i + 2), 16));
i++;
break;
default:
throw new LocalityGroupConfigurationError(
"Expected 'x' or '\' after '\' in " + colFam);
}
} else {
output[pos++] = (byte) (0xff & c);
}
}
return new ArrayByteSequence(output, 0, pos);
}
public static String encodeColumnFamilies(Set<Text> colFams) {
SortedSet<String> ecfs = new TreeSet<>();
StringBuilder sb = new StringBuilder();
for (Text text : colFams) {
String ecf = encodeColumnFamily(sb, text.getBytes(), text.getLength());
ecfs.add(ecf);
}
return Joiner.on(",").join(ecfs);
}
public static String encodeColumnFamily(ByteSequence bs) {
if (bs.offset() != 0) {
throw new IllegalArgumentException("The offset cannot be non-zero.");
}
return encodeColumnFamily(new StringBuilder(), bs.getBackingArray(), bs.length());
}
private static String encodeColumnFamily(StringBuilder sb, byte[] ba, int len) {
sb.setLength(0);
for (int i = 0; i < len; i++) {
int c = 0xff & ba[i];
if (c == '\\') {
sb.append("\\\\");
} else if (c >= 32 && c <= 126 && c != ',') {
sb.append((char) c);
} else {
sb.append("\\x").append(String.format("%02X", c));
}
}
return sb.toString();
}
public static class PartitionedMutation extends Mutation {
private byte[] row;
private List<ColumnUpdate> updates;
public PartitionedMutation(byte[] row, List<ColumnUpdate> updates) {
this.row = row;
this.updates = updates;
}
@Override
public byte[] getRow() {
return row;
}
@Override
public List<ColumnUpdate> getUpdates() {
return updates;
}
@Override
public TMutation toThrift() {
throw new UnsupportedOperationException();
}
@Override
public int hashCode() {
throw new UnsupportedOperationException();
}
@Override
public boolean equals(Object o) {
throw new UnsupportedOperationException();
}
@Override
public boolean equals(Mutation m) {
throw new UnsupportedOperationException();
}
}
public static class Partitioner {
private Map<ByteSequence,Integer> colfamToLgidMap;
private PreAllocatedArray<Map<ByteSequence,MutableLong>> groups;
public Partitioner(PreAllocatedArray<Map<ByteSequence,MutableLong>> groups) {
this.groups = groups;
this.colfamToLgidMap = new HashMap<>();
for (int i = 0; i < groups.length; i++) {
for (ByteSequence cf : groups.get(i).keySet()) {
colfamToLgidMap.put(cf, i);
}
}
}
public void partition(List<Mutation> mutations,
PreAllocatedArray<List<Mutation>> partitionedMutations) {
MutableByteSequence mbs = new MutableByteSequence(new byte[0], 0, 0);
PreAllocatedArray<List<ColumnUpdate>> parts = new PreAllocatedArray<>(groups.length + 1);
for (Mutation mutation : mutations) {
if (mutation.getUpdates().size() == 1) {
int lgid = getLgid(mbs, mutation.getUpdates().get(0));
partitionedMutations.get(lgid).add(mutation);
} else {
for (int i = 0; i < parts.length; i++) {
parts.set(i, null);
}
int lgcount = 0;
for (ColumnUpdate cu : mutation.getUpdates()) {
int lgid = getLgid(mbs, cu);
if (parts.get(lgid) == null) {
parts.set(lgid, new ArrayList<>());
lgcount++;
}
parts.get(lgid).add(cu);
}
if (lgcount == 1) {
for (int i = 0; i < parts.length; i++) {
if (parts.get(i) != null) {
partitionedMutations.get(i).add(mutation);
break;
}
}
} else {
for (int i = 0; i < parts.length; i++) {
if (parts.get(i) != null) {
partitionedMutations.get(i)
.add(new PartitionedMutation(mutation.getRow(), parts.get(i)));
}
}
}
}
}
}
private Integer getLgid(MutableByteSequence mbs, ColumnUpdate cu) {
mbs.setArray(cu.getColumnFamily(), 0, cu.getColumnFamily().length);
Integer lgid = colfamToLgidMap.get(mbs);
if (lgid == null) {
lgid = groups.length;
}
return lgid;
}
}
/**
* This method created to help seek an rfile for a locality group obtained from
* {@link Reader#getLocalityGroupCF()}. This method can possibly return an empty list for the
* default locality group. When this happens the default locality group needs to be seeked
* differently. This method helps do that.
*
* <p>
* For the default locality group will seek using the families of all other locality groups
* non-inclusive.
*
* @see Reader#getLocalityGroupCF()
*/
public static void seek(FileSKVIterator reader, Range range, String lgName,
Map<String,ArrayList<ByteSequence>> localityGroupCF) throws IOException {
Collection<ByteSequence> families;
boolean inclusive;
if (lgName == null) {
// this is the default locality group, create a set of all families not in the default group
Set<ByteSequence> nonDefaultFamilies = new HashSet<>();
for (Entry<String,ArrayList<ByteSequence>> entry : localityGroupCF.entrySet()) {
if (entry.getKey() != null) {
nonDefaultFamilies.addAll(entry.getValue());
}
}
families = nonDefaultFamilies;
inclusive = false;
} else {
families = localityGroupCF.get(lgName);
inclusive = true;
}
reader.seek(range, families, inclusive);
}
public static void ensureNonOverlappingGroups(Map<String,Set<Text>> groups) {
HashSet<Text> all = new HashSet<>();
for (Entry<String,Set<Text>> entry : groups.entrySet()) {
if (!Collections.disjoint(all, entry.getValue())) {
throw new IllegalArgumentException(
"Group " + entry.getKey() + " overlaps with another group");
}
if (entry.getValue().isEmpty()) {
throw new IllegalArgumentException("Group " + entry.getKey() + " is empty");
}
all.addAll(entry.getValue());
}
}
}