blob: 65ad1224f98f4596f83c7d65da428f7bd2f1687d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gora.accumulo.store;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.IsolatedScanner;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.RowIterator;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableDeletedException;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.TableOfflineException;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.client.impl.TabletLocator;
import org.apache.accumulo.core.client.mock.MockConnector;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.client.mock.MockTabletLocator;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.SortedKeyIterator;
import org.apache.accumulo.core.iterators.user.TimestampFilter;
import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.core.security.thrift.AuthInfo;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.TextUtil;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.util.Utf8;
import org.apache.gora.accumulo.encoders.Encoder;
import org.apache.gora.accumulo.query.AccumuloQuery;
import org.apache.gora.accumulo.query.AccumuloResult;
import org.apache.gora.persistency.ListGenericArray;
import org.apache.gora.persistency.Persistent;
import org.apache.gora.persistency.State;
import org.apache.gora.persistency.StateManager;
import org.apache.gora.persistency.StatefulHashMap;
import org.apache.gora.persistency.StatefulMap;
import org.apache.gora.query.PartitionQuery;
import org.apache.gora.query.Query;
import org.apache.gora.query.Result;
import org.apache.gora.query.impl.PartitionQueryImpl;
import org.apache.gora.store.DataStoreFactory;
import org.apache.gora.store.impl.DataStoreBase;
import org.apache.gora.util.AvroUtils;
import org.apache.hadoop.io.Text;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
/**
*
*/
public class AccumuloStore<K,T extends Persistent> extends DataStoreBase<K,T> {
protected static final String MOCK_PROPERTY = "accumulo.mock";
protected static final String INSTANCE_NAME_PROPERTY = "accumulo.instance";
protected static final String ZOOKEEPERS_NAME_PROPERTY = "accumulo.zookeepers";
protected static final String USERNAME_PROPERTY = "accumulo.user";
protected static final String PASSWORD_PROPERTY = "accumulo.password";
protected static final String DEFAULT_MAPPING_FILE = "gora-accumulo-mapping.xml";
private Connector conn;
private BatchWriter batchWriter;
private AccumuloMapping mapping;
private AuthInfo authInfo;
private Encoder encoder;
public Object fromBytes(Schema schema, byte data[]) {
return fromBytes(encoder, schema, data);
}
public static Object fromBytes(Encoder encoder, Schema schema, byte data[]) {
switch (schema.getType()) {
case BOOLEAN:
return encoder.decodeBoolean(data);
case DOUBLE:
return encoder.decodeDouble(data);
case FLOAT:
return encoder.decodeFloat(data);
case INT:
return encoder.decodeInt(data);
case LONG:
return encoder.decodeLong(data);
case STRING:
return new Utf8(data);
case BYTES:
return ByteBuffer.wrap(data);
case ENUM:
return AvroUtils.getEnumValue(schema, encoder.decodeInt(data));
}
throw new IllegalArgumentException("Unknown type " + schema.getType());
}
public K fromBytes(Class<K> clazz, byte[] val) {
return fromBytes(encoder, clazz, val);
}
@SuppressWarnings("unchecked")
public static <K> K fromBytes(Encoder encoder, Class<K> clazz, byte[] val) {
try {
if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) {
return (K) Byte.valueOf(encoder.decodeByte(val));
} else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) {
return (K) Boolean.valueOf(encoder.decodeBoolean(val));
} else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) {
return (K) Short.valueOf(encoder.decodeShort(val));
} else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) {
return (K) Integer.valueOf(encoder.decodeInt(val));
} else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) {
return (K) Long.valueOf(encoder.decodeLong(val));
} else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) {
return (K) Float.valueOf(encoder.decodeFloat(val));
} else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) {
return (K) Double.valueOf(encoder.decodeDouble(val));
} else if (clazz.equals(String.class)) {
return (K) new String(val, "UTF-8");
} else if (clazz.equals(Utf8.class)) {
return (K) new Utf8(val);
}
throw new IllegalArgumentException("Unknown type " + clazz.getName());
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
private static byte[] copyIfNeeded(byte b[], int offset, int len) {
if (len != b.length || offset != 0) {
byte copy[] = new byte[len];
System.arraycopy(b, offset, copy, 0, copy.length);
b = copy;
}
return b;
}
public byte[] toBytes(Object o) {
return toBytes(encoder, o);
}
public static byte[] toBytes(Encoder encoder, Object o) {
try {
if (o instanceof String) {
return ((String) o).getBytes("UTF-8");
} else if (o instanceof Utf8) {
return copyIfNeeded(((Utf8) o).getBytes(), 0, ((Utf8) o).getLength());
} else if (o instanceof ByteBuffer) {
return copyIfNeeded(((ByteBuffer) o).array(), ((ByteBuffer) o).arrayOffset() + ((ByteBuffer) o).position(), ((ByteBuffer) o).remaining());
} else if (o instanceof Long) {
return encoder.encodeLong((Long) o);
} else if (o instanceof Integer) {
return encoder.encodeInt((Integer) o);
} else if (o instanceof Short) {
return encoder.encodeShort((Short) o);
} else if (o instanceof Byte) {
return encoder.encodeByte((Byte) o);
} else if (o instanceof Boolean) {
return encoder.encodeBoolean((Boolean) o);
} else if (o instanceof Float) {
return encoder.encodeFloat((Float) o);
} else if (o instanceof Double) {
return encoder.encodeDouble((Double) o);
} else if (o instanceof Enum) {
return encoder.encodeInt(((Enum) o).ordinal());
}
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
throw new IllegalArgumentException("Uknown type " + o.getClass().getName());
}
private BatchWriter getBatchWriter() throws IOException {
if (batchWriter == null)
try {
batchWriter = conn.createBatchWriter(mapping.tableName, 10000000, 60000l, 4);
} catch (TableNotFoundException e) {
throw new IOException(e);
}
return batchWriter;
}
@Override
public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) throws IOException {
super.initialize(keyClass, persistentClass, properties);
String mock = DataStoreFactory.findProperty(properties, this, MOCK_PROPERTY, null);
String mappingFile = DataStoreFactory.getMappingFile(properties, this, DEFAULT_MAPPING_FILE);
String user = DataStoreFactory.findProperty(properties, this, USERNAME_PROPERTY, null);
String password = DataStoreFactory.findProperty(properties, this, PASSWORD_PROPERTY, null);
mapping = readMapping(mappingFile);
if (mapping.encoder == null || mapping.encoder.equals("")) {
encoder = new org.apache.gora.accumulo.encoders.BinaryEncoder();
} else {
try {
encoder = (Encoder) getClass().getClassLoader().loadClass(mapping.encoder).newInstance();
} catch (InstantiationException e) {
throw new IOException(e);
} catch (IllegalAccessException e) {
throw new IOException(e);
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
}
try {
if (mock == null || !mock.equals("true")) {
String instance = DataStoreFactory.findProperty(properties, this, INSTANCE_NAME_PROPERTY, null);
String zookeepers = DataStoreFactory.findProperty(properties, this, ZOOKEEPERS_NAME_PROPERTY, null);
conn = new ZooKeeperInstance(instance, zookeepers).getConnector(user, password);
authInfo = new AuthInfo(user, ByteBuffer.wrap(password.getBytes()), conn.getInstance().getInstanceID());
} else {
conn = new MockInstance().getConnector(user, password);
}
if (autoCreateSchema)
createSchema();
} catch (AccumuloException e) {
throw new IOException(e);
} catch (AccumuloSecurityException e) {
throw new IOException(e);
}
}
protected AccumuloMapping readMapping(String filename) throws IOException {
try {
AccumuloMapping mapping = new AccumuloMapping();
DocumentBuilder db = DocumentBuilderFactory.newInstance().newDocumentBuilder();
Document dom = db.parse(getClass().getClassLoader().getResourceAsStream(filename));
Element root = dom.getDocumentElement();
NodeList nl = root.getElementsByTagName("class");
for (int i = 0; i < nl.getLength(); i++) {
Element classElement = (Element) nl.item(i);
if (classElement.getAttribute("keyClass").equals(keyClass.getCanonicalName())
&& classElement.getAttribute("name").equals(persistentClass.getCanonicalName())) {
mapping.tableName = getSchemaName(classElement.getAttribute("table"), persistentClass);
mapping.encoder = classElement.getAttribute("encoder");
NodeList fields = classElement.getElementsByTagName("field");
for (int j = 0; j < fields.getLength(); j++) {
Element fieldElement = (Element) fields.item(j);
String name = fieldElement.getAttribute("name");
String family = fieldElement.getAttribute("family");
String qualifier = fieldElement.getAttribute("qualifier");
if (qualifier.equals(""))
qualifier = null;
Pair<Text,Text> col = new Pair<Text,Text>(new Text(family), qualifier == null ? null : new Text(qualifier));
mapping.fieldMap.put(name, col);
mapping.columnMap.put(col, name);
}
}
}
nl = root.getElementsByTagName("table");
for (int i = 0; i < nl.getLength(); i++) {
Element tableElement = (Element) nl.item(i);
if (tableElement.getAttribute("name").equals(mapping.tableName)) {
NodeList configs = tableElement.getElementsByTagName("config");
for (int j = 0; j < configs.getLength(); j++) {
Element configElement = (Element) configs.item(j);
String key = configElement.getAttribute("key");
String val = configElement.getAttribute("value");
mapping.tableConfig.put(key, val);
}
}
}
return mapping;
} catch (Exception ex) {
throw new IOException(ex);
}
}
@Override
public String getSchemaName() {
return mapping.tableName;
}
@Override
public void createSchema() throws IOException {
try {
conn.tableOperations().create(mapping.tableName);
Set<Entry<String,String>> es = mapping.tableConfig.entrySet();
for (Entry<String,String> entry : es) {
conn.tableOperations().setProperty(mapping.tableName, entry.getKey(), entry.getValue());
}
} catch (AccumuloException e) {
throw new IOException(e);
} catch (AccumuloSecurityException e) {
throw new IOException(e);
} catch (TableExistsException e) {
return;
}
}
@Override
public void deleteSchema() throws IOException {
try {
if (batchWriter != null)
batchWriter.close();
batchWriter = null;
conn.tableOperations().delete(mapping.tableName);
} catch (AccumuloException e) {
throw new IOException(e);
} catch (AccumuloSecurityException e) {
throw new IOException(e);
} catch (TableNotFoundException e) {
return;
}
}
@Override
public boolean schemaExists() throws IOException {
return conn.tableOperations().exists(mapping.tableName);
}
public ByteSequence populate(Iterator<Entry<Key,Value>> iter, T persistent) throws IOException {
ByteSequence row = null;
Map currentMap = null;
ArrayList currentArray = null;
Text currentFam = null;
int currentPos = 0;
Schema currentSchema = null;
Field currentField = null;
while (iter.hasNext()) {
Entry<Key,Value> entry = iter.next();
if (currentMap != null) {
if (currentFam.equals(entry.getKey().getColumnFamily())) {
currentMap.put(new Utf8(entry.getKey().getColumnQualifierData().toArray()), fromBytes(currentSchema, entry.getValue().get()));
continue;
} else {
persistent.put(currentPos, currentMap);
currentMap = null;
}
} else if (currentArray != null) {
if (currentFam.equals(entry.getKey().getColumnFamily())) {
currentArray.add(fromBytes(currentSchema, entry.getValue().get()));
continue;
} else {
persistent.put(currentPos, new ListGenericArray<T>(currentField.schema(), currentArray));
currentArray = null;
}
}
if (row == null)
row = entry.getKey().getRowData();
String fieldName = mapping.columnMap.get(new Pair<Text,Text>(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier()));
if (fieldName == null)
fieldName = mapping.columnMap.get(new Pair<Text,Text>(entry.getKey().getColumnFamily(), null));
Field field = fieldMap.get(fieldName);
switch (field.schema().getType()) {
case MAP:
currentMap = new StatefulHashMap();
currentPos = field.pos();
currentFam = entry.getKey().getColumnFamily();
currentSchema = field.schema().getValueType();
currentMap.put(new Utf8(entry.getKey().getColumnQualifierData().toArray()), fromBytes(currentSchema, entry.getValue().get()));
break;
case ARRAY:
currentArray = new ArrayList();
currentPos = field.pos();
currentFam = entry.getKey().getColumnFamily();
currentSchema = field.schema().getElementType();
currentField = field;
currentArray.add(fromBytes(currentSchema, entry.getValue().get()));
break;
case RECORD:
SpecificDatumReader reader = new SpecificDatumReader(field.schema());
byte[] val = entry.getValue().get();
// TODO reuse decoder
BinaryDecoder decoder = DecoderFactory.defaultFactory().createBinaryDecoder(val, null);
persistent.put(field.pos(), reader.read(null, decoder));
break;
default:
persistent.put(field.pos(), fromBytes(field.schema(), entry.getValue().get()));
}
}
if (currentMap != null) {
persistent.put(currentPos, currentMap);
} else if (currentArray != null) {
persistent.put(currentPos, new ListGenericArray<T>(currentField.schema(), currentArray));
}
persistent.clearDirty();
return row;
}
private void setFetchColumns(Scanner scanner, String fields[]) {
fields = getFieldsToQuery(fields);
for (String field : fields) {
Pair<Text,Text> col = mapping.fieldMap.get(field);
if (col.getSecond() == null) {
scanner.fetchColumnFamily(col.getFirst());
} else {
scanner.fetchColumn(col.getFirst(), col.getSecond());
}
}
}
@Override
public T get(K key, String[] fields) throws IOException {
try {
// TODO make isolated scanner optional?
Scanner scanner = new IsolatedScanner(conn.createScanner(mapping.tableName, Constants.NO_AUTHS));
Range rowRange = new Range(new Text(toBytes(key)));
scanner.setRange(rowRange);
setFetchColumns(scanner, fields);
T persistent = newPersistent();
ByteSequence row = populate(scanner.iterator(), persistent);
if (row == null)
return null;
return persistent;
} catch (TableNotFoundException e) {
return null;
}
}
@Override
public void put(K key, T val) throws IOException {
Mutation m = new Mutation(new Text(toBytes(key)));
Schema schema = val.getSchema();
StateManager stateManager = val.getStateManager();
Iterator<Field> iter = schema.getFields().iterator();
int count = 0;
for (int i = 0; iter.hasNext(); i++) {
Field field = iter.next();
if (!stateManager.isDirty(val, i)) {
continue;
}
Object o = val.get(i);
Pair<Text,Text> col = mapping.fieldMap.get(field.name());
switch (field.schema().getType()) {
case MAP:
if (o instanceof StatefulMap) {
StatefulMap map = (StatefulMap) o;
Set<?> es = map.states().entrySet();
for (Object entry : es) {
Object mapKey = ((Entry) entry).getKey();
State state = (State) ((Entry) entry).getValue();
switch (state) {
case NEW:
case DIRTY:
m.put(col.getFirst(), new Text(toBytes(mapKey)), new Value(toBytes(map.get(mapKey))));
count++;
break;
case DELETED:
m.putDelete(col.getFirst(), new Text(toBytes(mapKey)));
count++;
break;
}
}
} else {
Map map = (Map) o;
Set<?> es = map.entrySet();
for (Object entry : es) {
Object mapKey = ((Entry) entry).getKey();
Object mapVal = ((Entry) entry).getValue();
m.put(col.getFirst(), new Text(toBytes(mapKey)), new Value(toBytes(mapVal)));
count++;
}
}
break;
case ARRAY:
GenericArray array = (GenericArray) o;
int j = 0;
for (Object item : array) {
m.put(col.getFirst(), new Text(toBytes(j++)), new Value(toBytes(item)));
count++;
}
break;
case RECORD:
SpecificDatumWriter writer = new SpecificDatumWriter(field.schema());
ByteArrayOutputStream os = new ByteArrayOutputStream();
BinaryEncoder encoder = new BinaryEncoder(os);
writer.write(o, encoder);
encoder.flush();
m.put(col.getFirst(), col.getSecond(), new Value(os.toByteArray()));
break;
default:
m.put(col.getFirst(), col.getSecond(), new Value(toBytes(o)));
count++;
}
}
if (count > 0)
try {
getBatchWriter().addMutation(m);
} catch (MutationsRejectedException e) {
throw new IOException(e);
}
}
@Override
public boolean delete(K key) throws IOException {
Query<K,T> q = newQuery();
q.setKey(key);
return deleteByQuery(q) > 0;
}
@Override
public long deleteByQuery(Query<K,T> query) throws IOException {
try {
Scanner scanner = createScanner(query);
// add iterator that drops values on the server side
scanner.addScanIterator(new IteratorSetting(Integer.MAX_VALUE, SortedKeyIterator.class));
RowIterator iterator = new RowIterator(scanner.iterator());
long count = 0;
while (iterator.hasNext()) {
Iterator<Entry<Key,Value>> row = iterator.next();
Mutation m = null;
while (row.hasNext()) {
Entry<Key,Value> entry = row.next();
Key key = entry.getKey();
if (m == null)
m = new Mutation(key.getRow());
// TODO optimize to avoid continually creating column vis? prob does not matter for empty
m.putDelete(key.getColumnFamily(), key.getColumnQualifier(), new ColumnVisibility(key.getColumnVisibility()), key.getTimestamp());
}
getBatchWriter().addMutation(m);
count++;
}
return count;
} catch (TableNotFoundException e) {
// TODO return 0?
throw new IOException(e);
} catch (MutationsRejectedException e) {
throw new IOException(e);
}
}
private Range createRange(Query<K,T> query) {
Text startRow = null;
Text endRow = null;
if (query.getStartKey() != null)
startRow = new Text(toBytes(query.getStartKey()));
if (query.getEndKey() != null)
endRow = new Text(toBytes(query.getEndKey()));
return new Range(startRow, true, endRow, true);
}
private Scanner createScanner(Query<K,T> query) throws TableNotFoundException {
// TODO make isolated scanner optional?
Scanner scanner = new IsolatedScanner(conn.createScanner(mapping.tableName, Constants.NO_AUTHS));
setFetchColumns(scanner, query.getFields());
scanner.setRange(createRange(query));
if (query.getStartTime() != -1 || query.getEndTime() != -1) {
IteratorSetting is = new IteratorSetting(30, TimestampFilter.class);
if (query.getStartTime() != -1)
TimestampFilter.setStart(is, query.getStartTime(), true);
if (query.getEndTime() != -1)
TimestampFilter.setEnd(is, query.getEndTime(), true);
scanner.addScanIterator(is);
}
return scanner;
}
@Override
public Result<K,T> execute(Query<K,T> query) throws IOException {
try {
Scanner scanner = createScanner(query);
return new AccumuloResult<K,T>(this, query, scanner);
} catch (TableNotFoundException e) {
// TODO return empty result?
throw new IOException(e);
}
}
@Override
public Query<K,T> newQuery() {
return new AccumuloQuery<K,T>(this);
}
Text pad(Text key, int bytes) {
if (key.getLength() < bytes)
key = new Text(key);
while (key.getLength() < bytes) {
key.append(new byte[] {0}, 0, 1);
}
return key;
}
@Override
public List<PartitionQuery<K,T>> getPartitions(Query<K,T> query) throws IOException {
try {
TabletLocator tl;
if (conn instanceof MockConnector)
tl = new MockTabletLocator();
else
tl = TabletLocator.getInstance(conn.getInstance(), authInfo, new Text(Tables.getTableId(conn.getInstance(), mapping.tableName)));
Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
tl.invalidateCache();
while (tl.binRanges(Collections.singletonList(createRange(query)), binnedRanges).size() > 0) {
// TODO log?
if (!Tables.exists(conn.getInstance(), Tables.getTableId(conn.getInstance(), mapping.tableName)))
throw new TableDeletedException(Tables.getTableId(conn.getInstance(), mapping.tableName));
else if (Tables.getTableState(conn.getInstance(), Tables.getTableId(conn.getInstance(), mapping.tableName)) == TableState.OFFLINE)
throw new TableOfflineException(conn.getInstance(), Tables.getTableId(conn.getInstance(), mapping.tableName));
UtilWaitThread.sleep(100);
tl.invalidateCache();
}
List<PartitionQuery<K,T>> ret = new ArrayList<PartitionQuery<K,T>>();
Text startRow = null;
Text endRow = null;
if (query.getStartKey() != null)
startRow = new Text(toBytes(query.getStartKey()));
if (query.getEndKey() != null)
endRow = new Text(toBytes(query.getEndKey()));
//hadoop expects hostnames, accumulo keeps track of IPs... so need to convert
HashMap<String,String> hostNameCache = new HashMap<String,String>();
for (Entry<String,Map<KeyExtent,List<Range>>> entry : binnedRanges.entrySet()) {
String ip = entry.getKey().split(":", 2)[0];
String location = hostNameCache.get(ip);
if (location == null) {
InetAddress inetAddress = InetAddress.getByName(ip);
location = inetAddress.getHostName();
hostNameCache.put(ip, location);
}
Map<KeyExtent,List<Range>> tablets = entry.getValue();
for (KeyExtent ke : tablets.keySet()) {
K startKey = null;
if (startRow == null || !ke.contains(startRow)) {
if (ke.getPrevEndRow() != null) {
startKey = followingKey(encoder, getKeyClass(), TextUtil.getBytes(ke.getPrevEndRow()));
}
} else {
startKey = fromBytes(getKeyClass(), TextUtil.getBytes(startRow));
}
K endKey = null;
if (endRow == null || !ke.contains(endRow)) {
if (ke.getEndRow() != null)
endKey = lastPossibleKey(encoder, getKeyClass(), TextUtil.getBytes(ke.getEndRow()));
} else {
endKey = fromBytes(getKeyClass(), TextUtil.getBytes(endRow));
}
PartitionQueryImpl pqi = new PartitionQueryImpl<K,T>(query, startKey, endKey, new String[] {location});
ret.add(pqi);
}
}
return ret;
} catch (TableNotFoundException e) {
throw new IOException(e);
} catch (AccumuloException e) {
throw new IOException(e);
} catch (AccumuloSecurityException e) {
throw new IOException(e);
}
}
static <K> K lastPossibleKey(Encoder encoder, Class<K> clazz, byte[] er) {
if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) {
throw new UnsupportedOperationException();
} else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) {
throw new UnsupportedOperationException();
} else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) {
return fromBytes(encoder, clazz, encoder.lastPossibleKey(2, er));
} else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) {
return fromBytes(encoder, clazz, encoder.lastPossibleKey(4, er));
} else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) {
return fromBytes(encoder, clazz, encoder.lastPossibleKey(8, er));
} else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) {
return fromBytes(encoder, clazz, encoder.lastPossibleKey(4, er));
} else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) {
return fromBytes(encoder, clazz, encoder.lastPossibleKey(8, er));
} else if (clazz.equals(String.class)) {
throw new UnsupportedOperationException();
} else if (clazz.equals(Utf8.class)) {
return fromBytes(encoder, clazz, er);
}
throw new IllegalArgumentException("Unknown type " + clazz.getName());
}
/**
* @param keyClass
* @param bytes
* @return
*/
static <K> K followingKey(Encoder encoder, Class<K> clazz, byte[] per) {
if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) {
return (K) Byte.valueOf(encoder.followingKey(1, per)[0]);
} else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) {
throw new UnsupportedOperationException();
} else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) {
return fromBytes(encoder, clazz, encoder.followingKey(2, per));
} else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) {
return fromBytes(encoder, clazz, encoder.followingKey(4, per));
} else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) {
return fromBytes(encoder, clazz, encoder.followingKey(8, per));
} else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) {
return fromBytes(encoder, clazz, encoder.followingKey(4, per));
} else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) {
return fromBytes(encoder, clazz, encoder.followingKey(8, per));
} else if (clazz.equals(String.class)) {
throw new UnsupportedOperationException();
} else if (clazz.equals(Utf8.class)) {
return fromBytes(encoder, clazz, Arrays.copyOf(per, per.length + 1));
}
throw new IllegalArgumentException("Unknown type " + clazz.getName());
}
@Override
public void flush() throws IOException {
try {
if (batchWriter != null) {
batchWriter.flush();
}
} catch (MutationsRejectedException e) {
throw new IOException(e);
}
}
@Override
public void close() throws IOException {
try {
if (batchWriter != null) {
batchWriter.close();
batchWriter = null;
}
} catch (MutationsRejectedException e) {
throw new IOException(e);
}
}
}