blob: 97cc740b911e7ab882569307825a9bd38a9f39bb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.column.operation.query;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.asterix.column.assembler.value.IValueGetterFactory;
import org.apache.asterix.column.filter.FalseColumnFilterEvaluator;
import org.apache.asterix.column.filter.FilterAccessorProvider;
import org.apache.asterix.column.filter.IColumnFilterEvaluator;
import org.apache.asterix.column.filter.TrueColumnFilterEvaluator;
import org.apache.asterix.column.filter.iterable.IColumnIterableFilterEvaluator;
import org.apache.asterix.column.filter.iterable.IColumnIterableFilterEvaluatorFactory;
import org.apache.asterix.column.filter.normalized.IColumnFilterNormalizedValueAccessor;
import org.apache.asterix.column.filter.normalized.IColumnNormalizedFilterEvaluatorFactory;
import org.apache.asterix.column.metadata.AbstractColumnImmutableReadMetadata;
import org.apache.asterix.column.metadata.FieldNamesDictionary;
import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
import org.apache.asterix.column.metadata.schema.ObjectSchemaNode;
import org.apache.asterix.column.metadata.schema.visitor.SchemaClipperVisitor;
import org.apache.asterix.column.util.SchemaStringBuilderVisitor;
import org.apache.asterix.column.values.IColumnValuesReader;
import org.apache.asterix.column.values.IColumnValuesReaderFactory;
import org.apache.asterix.column.values.reader.PrimitiveColumnValuesReader;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.runtime.projection.DataProjectionFiltrationInfo;
import org.apache.asterix.runtime.projection.FunctionCallInformation;
import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
import org.apache.hyracks.algebricks.runtime.evaluators.EvaluatorContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.data.std.primitive.IntegerPointable;
import org.apache.hyracks.dataflow.common.utils.TaskUtil;
import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleReader;
import org.apache.hyracks.util.LogRedactionUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* Query column metadata which is used to resolve the requested values in a query
*/
public class QueryColumnMetadata extends AbstractColumnImmutableReadMetadata {
private static final Logger LOGGER = LogManager.getLogger();
private final FieldNamesDictionary fieldNamesDictionary;
private final PrimitiveColumnValuesReader[] primaryKeyReaders;
private final IColumnFilterEvaluator normalizedFilterEvaluator;
private final List<IColumnFilterNormalizedValueAccessor> filterValueAccessors;
private final IColumnIterableFilterEvaluator columnFilterEvaluator;
private final List<IColumnValuesReader> filterColumnReaders;
protected final ColumnAssembler assembler;
protected QueryColumnMetadata(ARecordType datasetType, ARecordType metaType,
PrimitiveColumnValuesReader[] primaryKeyReaders, IValueReference serializedMetadata,
FieldNamesDictionary fieldNamesDictionary, ObjectSchemaNode root, IColumnValuesReaderFactory readerFactory,
IValueGetterFactory valueGetterFactory, IColumnFilterEvaluator normalizedFilterEvaluator,
List<IColumnFilterNormalizedValueAccessor> filterValueAccessors,
IColumnIterableFilterEvaluator columnFilterEvaluator, List<IColumnValuesReader> filterColumnReaders)
throws HyracksDataException {
super(datasetType, metaType, primaryKeyReaders.length, serializedMetadata, -1);
this.fieldNamesDictionary = fieldNamesDictionary;
this.assembler = new ColumnAssembler(root, datasetType, this, readerFactory, valueGetterFactory);
this.primaryKeyReaders = primaryKeyReaders;
this.normalizedFilterEvaluator = normalizedFilterEvaluator;
this.filterValueAccessors = filterValueAccessors;
this.columnFilterEvaluator = columnFilterEvaluator;
this.filterColumnReaders = filterColumnReaders;
}
public final ColumnAssembler getAssembler() {
return assembler;
}
public final FieldNamesDictionary getFieldNamesDictionary() {
return fieldNamesDictionary;
}
public final PrimitiveColumnValuesReader[] getPrimaryKeyReaders() {
return primaryKeyReaders;
}
public final IColumnFilterEvaluator getNormalizedFilterEvaluator() {
return normalizedFilterEvaluator;
}
public final List<IColumnFilterNormalizedValueAccessor> getFilterValueAccessors() {
return filterValueAccessors;
}
public final IColumnIterableFilterEvaluator getColumnFilterEvaluator() {
return columnFilterEvaluator;
}
public final List<IColumnValuesReader> getFilterColumnReaders() {
return filterColumnReaders;
}
/* *****************************************************
* Non-final methods
* *****************************************************
*/
public boolean containsMeta() {
return false;
}
@Override
public int getColumnIndex(int ordinal) {
return assembler.getColumnIndex(ordinal);
}
@Override
public int getFilteredColumnIndex(int ordinal) {
return filterColumnReaders.get(ordinal).getColumnIndex();
}
@Override
public int getNumberOfProjectedColumns() {
return assembler.getNumberOfColumns();
}
@Override
public int getNumberOfFilteredColumns() {
return filterColumnReaders.size();
}
@Override
public int getNumberOfColumns() {
return assembler.getNumberOfColumns();
}
@Override
public AbstractColumnTupleReader createTupleReader() {
return new QueryColumnTupleReader(this);
}
/**
* Create {@link QueryColumnMetadata} that would be used to determine the requested values
*
* @param datasetType dataset declared type
* @param numberOfPrimaryKeys number of PKs
* @param serializedMetadata inferred metadata (schema)
* @param readerFactory column reader factory
* @param valueGetterFactory value serializer
* @param requestedType the requested schema
* @return query metadata
*/
public static QueryColumnMetadata create(ARecordType datasetType, int numberOfPrimaryKeys,
IValueReference serializedMetadata, IColumnValuesReaderFactory readerFactory,
IValueGetterFactory valueGetterFactory, ARecordType requestedType,
Map<String, FunctionCallInformation> functionCallInfoMap,
IColumnNormalizedFilterEvaluatorFactory normalizedEvaluatorFactory,
IColumnIterableFilterEvaluatorFactory columnFilterEvaluatorFactory, IWarningCollector warningCollector,
IHyracksTaskContext context) throws IOException {
byte[] bytes = serializedMetadata.getByteArray();
int offset = serializedMetadata.getStartOffset();
int length = serializedMetadata.getLength();
int fieldNamesStart = offset + IntegerPointable.getInteger(bytes, offset + FIELD_NAMES_POINTER);
int metaRootStart = IntegerPointable.getInteger(bytes, offset + META_SCHEMA_POINTER);
int metaRootSize =
metaRootStart < 0 ? 0 : IntegerPointable.getInteger(bytes, offset + PATH_INFO_POINTER) - metaRootStart;
DataInput input = new DataInputStream(new ByteArrayInputStream(bytes, fieldNamesStart, length));
//FieldNames
FieldNamesDictionary fieldNamesDictionary = FieldNamesDictionary.deserialize(input);
//Schema
ObjectSchemaNode root = (ObjectSchemaNode) AbstractSchemaNode.deserialize(input, null);
//Skip metaRoot (if exists)
input.skipBytes(metaRootSize);
//Clip schema
SchemaClipperVisitor clipperVisitor =
new SchemaClipperVisitor(fieldNamesDictionary, functionCallInfoMap, warningCollector);
ObjectSchemaNode clippedRoot = clip(requestedType, root, clipperVisitor);
IColumnFilterEvaluator normalizedFilterEvaluator = TrueColumnFilterEvaluator.INSTANCE;
IColumnIterableFilterEvaluator columnFilterEvaluator = TrueColumnFilterEvaluator.INSTANCE;
List<IColumnValuesReader> filterColumnReaders = Collections.emptyList();
List<IColumnFilterNormalizedValueAccessor> filterValueAccessors = Collections.emptyList();
if (context != null) {
FilterAccessorProvider filterAccessorProvider =
new FilterAccessorProvider(root, clipperVisitor, readerFactory, valueGetterFactory);
TaskUtil.put(FilterAccessorProvider.FILTER_ACCESSOR_PROVIDER_KEY, filterAccessorProvider, context);
// Min/Max filters in page0
normalizedFilterEvaluator = normalizedEvaluatorFactory.create(filterAccessorProvider);
filterValueAccessors = filterAccessorProvider.getFilterAccessors();
// Filter columns (columns appeared in WHERE clause)
IEvaluatorContext evaluatorContext = new EvaluatorContext(context);
// ignore atomic (or flat) types information
clipperVisitor.setIgnoreFlatType(true);
filterAccessorProvider.reset();
columnFilterEvaluator = columnFilterEvaluatorFactory.create(filterAccessorProvider, evaluatorContext);
filterColumnReaders = filterAccessorProvider.getFilterColumnReaders();
}
// log normalized filter
logFilter(normalizedFilterEvaluator, normalizedEvaluatorFactory.toString());
// log requested schema
logSchema(clippedRoot, SchemaStringBuilderVisitor.RECORD_SCHEMA, fieldNamesDictionary);
// Primary key readers
PrimitiveColumnValuesReader[] primaryKeyReaders =
createPrimaryKeyReaders(input, readerFactory, numberOfPrimaryKeys);
return new QueryColumnMetadata(datasetType, null, primaryKeyReaders, serializedMetadata, fieldNamesDictionary,
clippedRoot, readerFactory, valueGetterFactory, normalizedFilterEvaluator, filterValueAccessors,
columnFilterEvaluator, filterColumnReaders);
}
protected static ObjectSchemaNode clip(ARecordType requestedType, ObjectSchemaNode root,
SchemaClipperVisitor clipperVisitor) {
ObjectSchemaNode clippedRoot;
if (requestedType.getTypeName().equals(DataProjectionFiltrationInfo.ALL_FIELDS_TYPE.getTypeName())) {
clippedRoot = root;
} else {
clippedRoot = (ObjectSchemaNode) requestedType.accept(clipperVisitor, root);
}
return clippedRoot;
}
protected static PrimitiveColumnValuesReader[] createPrimaryKeyReaders(DataInput input,
IColumnValuesReaderFactory readerFactory, int numberOfPrimaryKeys) throws IOException {
//skip number of columns
input.readInt();
PrimitiveColumnValuesReader[] primaryKeyReaders = new PrimitiveColumnValuesReader[numberOfPrimaryKeys];
for (int i = 0; i < numberOfPrimaryKeys; i++) {
primaryKeyReaders[i] = (PrimitiveColumnValuesReader) readerFactory.createValueReader(input);
}
return primaryKeyReaders;
}
protected static void logFilter(IColumnFilterEvaluator normalizedFilterEvaluator, String filterExpression) {
if (LOGGER.isInfoEnabled() && normalizedFilterEvaluator != TrueColumnFilterEvaluator.INSTANCE) {
String filterString = normalizedFilterEvaluator == FalseColumnFilterEvaluator.INSTANCE ? "SKIP_ALL"
: LogRedactionUtil.userData(filterExpression);
LOGGER.info("Filter: {}", filterString);
}
}
protected static void logSchema(ObjectSchemaNode root, String schemaSource,
FieldNamesDictionary fieldNamesDictionary) throws HyracksDataException {
if (LOGGER.isInfoEnabled()) {
SchemaStringBuilderVisitor schemaBuilder = new SchemaStringBuilderVisitor(fieldNamesDictionary);
String schema = LogRedactionUtil.userData(schemaBuilder.build(root));
LOGGER.info("Queried {} schema: \n {}", schemaSource, schema);
}
}
}