blob: 8dcce30928a4e74c143855c4611f5388524069b8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.accumulo;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.accumulo.serde.AccumuloIndexParameters;
import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static java.util.Collections.EMPTY_SET;
/**
* This default index scanner expects indexes to be in the same format as presto's
* accumulo index tables defined as:
* [rowid=field value] [cf=cfname_cqname] [cq=rowid] [visibility] [value=""]
* <p>
* This handler looks for the following hive serde properties:
* 'accumulo.indextable.name' = 'table_idx' (required - name of the corresponding index table)
* 'accumulo.indexed.columns' = 'name,age,phone' (optional - comma separated list of indexed
* hive columns if not defined or defined as '*' all columns are
* assumed to be indexed )
* 'accumulo.index.rows.max' = '20000' (optional - maximum number of match indexes to use
* before converting to a full table scan default=20000'
* Note: This setting controls the size of the in-memory list of rowids
* each search predicate. Using large values for this setting or having
* very large rowid values may require additional memory to prevent
* out of memory errors
* 'accumulo.index.scanner' = 'org.apache.hadoop.hive.accumulo.AccumuloDefaultIndexScanner'
* (optional - name of the index scanner)
* <p>
* To implement your own index table scheme it should be as simple as sub-classing
* this class and overriding getIndexRowRanges() and optionally init() if you need more
* config settings
*/
public class AccumuloDefaultIndexScanner implements AccumuloIndexScanner {
private static final Logger LOG = LoggerFactory.getLogger(AccumuloDefaultIndexScanner.class);
private AccumuloConnectionParameters connectParams;
private AccumuloIndexParameters indexParams;
private int maxRowIds;
private Authorizations auths;
private String indexTable;
private Set<String> indexColumns = EMPTY_SET;
private Connector connect;
private Map<String, String> colMap;
/**
* Initialize object based on configuration.
*
* @param conf - Hive configuration
*/
@Override
public void init(Configuration conf) {
connectParams = new AccumuloConnectionParameters(conf);
indexParams = new AccumuloIndexParameters(conf);
maxRowIds = indexParams.getMaxIndexRows();
auths = indexParams.getTableAuths();
indexTable = indexParams.getIndexTable();
indexColumns = indexParams.getIndexColumns();
colMap = createColumnMap(conf);
}
/**
* Get a list of rowid ranges by scanning a column index.
*
* @param column - the hive column name
* @param indexRange - Key range to scan on the index table
* @return List of matching rowid ranges or null if too many matches found
* if index values are not found a newline range is added to list to
* short-circuit the query
*/
@Override
public List<Range> getIndexRowRanges(String column, Range indexRange) {
List<Range> rowIds = new ArrayList<Range>();
Scanner scan = null;
String col = this.colMap.get(column);
if (col != null) {
try {
LOG.debug("Searching tab=" + indexTable + " column=" + column + " range=" + indexRange);
Connector conn = getConnector();
scan = conn.createScanner(indexTable, auths);
scan.setRange(indexRange);
Text cf = new Text(col);
LOG.debug("Using Column Family=" + toString());
scan.fetchColumnFamily(cf);
for (Map.Entry<Key, Value> entry : scan) {
rowIds.add(new Range(entry.getKey().getColumnQualifier()));
// if we have too many results return null for a full scan
if (rowIds.size() > maxRowIds) {
return null;
}
}
// no hits on the index so return a no match range
if (rowIds.isEmpty()) {
LOG.debug("Found 0 index matches");
} else {
LOG.debug("Found " + rowIds.size() + " index matches");
}
return rowIds;
} catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) {
LOG.error("Failed to scan index table: " + indexTable, e);
} finally {
if (scan != null) {
scan.close();
}
}
}
// assume the index is bad and do a full scan
LOG.debug("Index lookup failed for table " + indexTable);
return null;
}
/**
* Test if column is defined in the index table.
*
* @param column - hive column name
* @return true if the column is defined as part of the index table
*/
@Override
public boolean isIndexed(String column) {
return indexTable != null
&& (indexColumns.isEmpty() || indexColumns.contains("*")
|| this.indexColumns.contains(column.toLowerCase())
|| this.indexColumns.contains(column.toUpperCase()));
}
protected Map<String, String> createColumnMap(Configuration conf) {
Map<String, String> colsMap = new HashMap<String, String>();
String accColString = conf.get(AccumuloSerDeParameters.COLUMN_MAPPINGS);
if (accColString != null && !accColString.trim().isEmpty()) {
String[] accCols = accColString.split(",");
String[] hiveCols = conf.get(serdeConstants.LIST_COLUMNS).split(",");
for (int i = 0; i < accCols.length; i++) {
colsMap.put(hiveCols[i], accCols[i].replace(':', '_'));
}
}
return colsMap;
}
protected Connector getConnector() throws AccumuloSecurityException, AccumuloException {
if (connect == null) {
connect = connectParams.getConnector();
}
return connect;
}
public void setConnectParams(AccumuloConnectionParameters connectParams) {
this.connectParams = connectParams;
}
public AccumuloConnectionParameters getConnectParams() {
return connectParams;
}
public AccumuloIndexParameters getIndexParams() {
return indexParams;
}
public int getMaxRowIds() {
return maxRowIds;
}
public Authorizations getAuths() {
return auths;
}
public String getIndexTable() {
return indexTable;
}
public Set<String> getIndexColumns() {
return indexColumns;
}
public Connector getConnect() {
return connect;
}
}