blob: 810ad6bb7f1912ca02a64172e5391c82563321f9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eagle.log.entity.meta;
import java.beans.PropertyDescriptor;
import java.lang.reflect.InvocationTargetException;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.log.entity.RowkeyBuilder;
import org.apache.commons.beanutils.PropertyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.eagle.query.parser.ANDExpression;
import org.apache.eagle.query.parser.AtomicExpression;
import org.apache.eagle.query.parser.ComparisonOperator;
import org.apache.eagle.query.parser.ORExpression;
import org.apache.eagle.common.ByteUtil;
/**
* Eagle index schema definition. 1. Index schema can be defined in entity class by annotation. 2. One index
* schema can contain multiple fields/tags, defined in order 3. We only support immutable indexing for now 4.
* When entity is created or deleted, the corresponding index entity should be created or deleted at the same
* time 5. Index transparency to queries. Queries go through index when and only when index can serve all
* search conditions after query rewrite
*/
public class IndexDefinition {
public enum IndexType {
UNIQUE_INDEX,
NON_CLUSTER_INDEX,
NON_INDEX
}
private final EntityDefinition entityDef;
private final Index index;
private final IndexColumn[] columns;
private final String indexPrefix;
private static final byte[] EMPTY_VALUE = new byte[0];
private static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
public static final int EMPTY_PARTITION_DEFAULT_HASH_CODE = 0;
public static final int MAX_INDEX_VALUE_BYTE_LENGTH = 65535;
private static final String FIELD_NAME_PATTERN_STRING = "^@(.*)$";
private static final Pattern FIELD_NAME_PATTERN = Pattern.compile(FIELD_NAME_PATTERN_STRING);
private static final Logger LOG = LoggerFactory.getLogger(IndexDefinition.class);
public IndexDefinition(EntityDefinition entityDef, Index index) {
this.entityDef = entityDef;
this.index = index;
this.indexPrefix = entityDef.getPrefix() + "_" + index.name();
final String[] indexColumns = index.columns();
this.columns = new IndexColumn[indexColumns.length];
for (int i = 0; i < indexColumns.length; ++i) {
final String name = indexColumns[i];
final boolean isTag = entityDef.isTag(name);
final Qualifier qualifier = isTag ? null : entityDef.getDisplayNameMap().get(name);
columns[i] = new IndexColumn(name, isTag, qualifier);
}
LOG.info("Created index " + index.name() + " for " + entityDef.getEntityClass().getSimpleName());
}
public EntityDefinition getEntityDefinition() {
return entityDef;
}
public Index getIndex() {
return index;
}
public String getIndexName() {
return index.name();
}
public IndexColumn[] getIndexColumns() {
return columns;
}
public String getIndexPrefix() {
return indexPrefix;
}
public boolean isUnique() {
return index.unique();
}
/**
* Check if the query is suitable to go through index. If true, then return the value of index fields in
* order. Otherwise return null. TODO: currently index fields should be string type.
*
* @param query query expression after re-write
* @param rowkeys if the query can go through the index, all rowkeys will be added into rowkeys.
* @return true if the query can go through the index, otherwise return false
*/
public IndexType canGoThroughIndex(ORExpression query, List<byte[]> rowkeys) {
if (query == null || query.getANDExprList() == null || query.getANDExprList().isEmpty()) {
return IndexType.NON_CLUSTER_INDEX;
}
if (rowkeys != null) {
rowkeys.clear();
}
final Map<String, String> indexfieldMap = new HashMap<String, String>();
for (ANDExpression andExpr : query.getANDExprList()) {
indexfieldMap.clear();
for (AtomicExpression ae : andExpr.getAtomicExprList()) {
// TODO temporarily ignore those fields which are not for attributes
final String fieldName = parseEntityAttribute(ae.getKey());
if (fieldName != null && ComparisonOperator.EQUAL.equals(ae.getOp())) {
indexfieldMap.put(fieldName, ae.getValue());
}
}
final String[] partitions = entityDef.getPartitions();
int[] partitionValueHashs = null;
if (partitions != null) {
partitionValueHashs = new int[partitions.length];
for (int i = 0; i < partitions.length; ++i) {
final String value = indexfieldMap.get(partitions[i]);
if (value == null) {
throw new IllegalArgumentException("Partition " + partitions[i]
+ " is not defined in the query: "
+ query.toString());
}
partitionValueHashs[i] = value.hashCode();
}
}
final byte[][] indexFieldValues = new byte[columns.length][];
for (int i = 0; i < columns.length; ++i) {
final IndexColumn col = columns[i];
if (!indexfieldMap.containsKey(col.getColumnName())) {
// If we have to use scan anyway, there's no need to go through index
return IndexType.NON_INDEX;
}
final String value = indexfieldMap.get(col.getColumnName());
indexFieldValues[i] = value.getBytes();
}
final byte[] rowkey = generateUniqueIndexRowkey(indexFieldValues, partitionValueHashs, null);
if (rowkeys != null) {
rowkeys.add(rowkey);
}
}
if (index.unique()) {
return IndexType.UNIQUE_INDEX;
}
return IndexType.NON_CLUSTER_INDEX;
}
private String parseEntityAttribute(String fieldName) {
Matcher m = FIELD_NAME_PATTERN.matcher(fieldName);
if (m.find()) {
return m.group(1);
}
return null;
}
// TODO: We should move index rowkey generation later since this class is for general purpose, not only
// for hbase.
public byte[] generateIndexRowkey(TaggedLogAPIEntity entity)
throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
if (entity.getClass() != entityDef.getEntityClass()) {
throw new IllegalArgumentException("Expected entity class: "
+ entityDef.getEntityClass().getName() + ", but got class "
+ entity.getClass().getName());
}
final byte[][] indexValues = generateIndexValues(entity);
final int[] partitionHashCodes = generatePartitionHashCodes(entity);
SortedMap<Integer, Integer> tagMap = null;
if (!index.unique()) {
// non cluster index
tagMap = RowkeyBuilder.generateSortedTagMap(entityDef.getPartitions(), entity.getTags());
}
return generateUniqueIndexRowkey(indexValues, partitionHashCodes, tagMap);
}
private byte[] generateUniqueIndexRowkey(byte[][] indexValues, int[] partitionHashCodes,
SortedMap<Integer, Integer> tagMap) {
final int prefixHashCode = indexPrefix.hashCode();
int totalLength = 4;
totalLength += (partitionHashCodes != null) ? (4 * partitionHashCodes.length) : 0;
totalLength += (2 * indexValues.length);
for (int i = 0; i < indexValues.length; ++i) {
final byte[] value = indexValues[i];
totalLength += value.length;
}
if (tagMap != null && (!tagMap.isEmpty())) {
totalLength += tagMap.size() * 8;
}
int offset = 0;
final byte[] rowkey = new byte[totalLength];
// 1. set prefix
ByteUtil.intToBytes(prefixHashCode, rowkey, offset);
offset += 4;
// 2. set partition
if (partitionHashCodes != null) {
for (Integer partitionHashCode : partitionHashCodes) {
ByteUtil.intToBytes(partitionHashCode, rowkey, offset);
offset += 4;
}
}
// 3. set index values
for (int i = 0; i < columns.length; ++i) {
ByteUtil.shortToBytes((short)indexValues[i].length, rowkey, offset);
offset += 2;
for (int j = 0; j < indexValues[i].length; ++j) {
rowkey[offset++] = indexValues[i][j];
}
}
// Check if it's non clustered index, then set the tag/value hash code
if (tagMap != null && (!tagMap.isEmpty())) {
// 4. set tag key/value hashes
for (Map.Entry<Integer, Integer> entry : tagMap.entrySet()) {
ByteUtil.intToBytes(entry.getKey(), rowkey, offset);
offset += 4;
ByteUtil.intToBytes(entry.getValue(), rowkey, offset);
offset += 4;
}
}
return rowkey;
}
private int[] generatePartitionHashCodes(TaggedLogAPIEntity entity) {
final String[] partitions = entityDef.getPartitions();
int[] result = null;
if (partitions != null) {
result = new int[partitions.length];
final Map<String, String> tags = entity.getTags();
for (int i = 0; i < partitions.length; ++i) {
final String partition = partitions[i];
final String tagValue = tags.get(partition);
if (tagValue != null) {
result[i] = tagValue.hashCode();
} else {
result[i] = EMPTY_PARTITION_DEFAULT_HASH_CODE;
}
}
}
return result;
}
private byte[][] generateIndexValues(TaggedLogAPIEntity entity)
throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
final byte[][] result = new byte[columns.length][];
for (int i = 0; i < columns.length; ++i) {
final IndexColumn column = columns[i];
final String columnName = column.getColumnName();
if (column.isTag) {
final Map<String, String> tags = entity.getTags();
if (tags == null || tags.get(columnName) == null) {
result[i] = EMPTY_VALUE;
} else {
result[i] = tags.get(columnName).getBytes(UTF_8_CHARSET);
}
} else {
PropertyDescriptor pd = column.getPropertyDescriptor();
if (pd == null) {
pd = PropertyUtils.getPropertyDescriptor(entity, columnName);
column.setPropertyDescriptor(pd);
}
final Object value = pd.getReadMethod().invoke(entity);
if (value == null) {
result[i] = EMPTY_VALUE;
} else {
final Qualifier q = column.getQualifier();
result[i] = q.getSerDeser().serialize(value);
}
}
if (result[i].length > MAX_INDEX_VALUE_BYTE_LENGTH) {
throw new IllegalArgumentException("Index field value exceeded the max length: "
+ MAX_INDEX_VALUE_BYTE_LENGTH + ", actual length: "
+ result[i].length);
}
}
return result;
}
/**
* Index column definition class
*/
public static class IndexColumn {
private final String columnName;
private final boolean isTag;
private final Qualifier qualifier;
private PropertyDescriptor propertyDescriptor;
public IndexColumn(String columnName, boolean isTag, Qualifier qualifier) {
this.columnName = columnName;
this.isTag = isTag;
this.qualifier = qualifier;
}
public String getColumnName() {
return columnName;
}
public boolean isTag() {
return isTag;
}
public Qualifier getQualifier() {
return qualifier;
}
public PropertyDescriptor getPropertyDescriptor() {
return propertyDescriptor;
}
public void setPropertyDescriptor(PropertyDescriptor propertyDescriptor) {
this.propertyDescriptor = propertyDescriptor;
}
}
}