blob: 58b768452cd6cf2e1ae83b9b56051fd8f294697b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.compile;
import java.sql.SQLException;
import java.text.Format;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.log.QueryLogger;
import org.apache.phoenix.monitoring.OverAllQueryMetrics;
import org.apache.phoenix.monitoring.ReadMetricQueue;
import org.apache.phoenix.parse.SelectStatement;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.MetaDataClient;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.types.PDate;
import org.apache.phoenix.schema.types.PTime;
import org.apache.phoenix.schema.types.PTimestamp;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.DateUtil;
import org.apache.phoenix.util.NumberUtil;
import org.apache.phoenix.util.ReadOnlyProps;
/**
*
* Class that keeps common state used across processing the various clauses in a
* top level JDBC statement such as SELECT, UPSERT, DELETE, etc.
*
*
* @since 0.1
*/
public class StatementContext {
private ColumnResolver resolver;
private final PhoenixConnection connection;
private final BindManager binds;
private final Scan scan;
private final ExpressionManager expressions;
private final AggregationManager aggregates;
private final String numberFormat;
private final ImmutableBytesWritable tempPtr;
private final PhoenixStatement statement;
private final Map<PColumn, Integer> dataColumns;
private Map<Long, Boolean> retryingPersistentCache;
private long currentTime = QueryConstants.UNSET_TIMESTAMP;
private ScanRanges scanRanges = ScanRanges.EVERYTHING;
private final SequenceManager sequences;
private TableRef currentTable;
private List<Pair<byte[], byte[]>> whereConditionColumns;
private Map<SelectStatement, Object> subqueryResults;
private final ReadMetricQueue readMetricsQueue;
private final OverAllQueryMetrics overAllQueryMetrics;
private QueryLogger queryLogger;
private boolean isClientSideUpsertSelect;
private boolean isUncoveredIndex;
private AtomicBoolean hasFirstValidResult;
public StatementContext(PhoenixStatement statement) {
this(statement, new Scan());
}
public StatementContext(StatementContext context) {
this.resolver = context.resolver;
this.connection = context.connection;
this.binds = context.binds;
this.scan = context.scan;
this.expressions = context.expressions;
this.aggregates = context.aggregates;
this.numberFormat = context.numberFormat;
this.tempPtr = context.tempPtr;
this.statement = context.statement;
this.dataColumns = context.dataColumns;
this.retryingPersistentCache = context.retryingPersistentCache;
this.currentTime = context.currentTime;
this.scanRanges = context.scanRanges;
this.sequences = context.sequences;
this.currentTable = context.currentTable;
this.whereConditionColumns = context.whereConditionColumns;
this.subqueryResults = context.subqueryResults;
this.readMetricsQueue = context.readMetricsQueue;
this.overAllQueryMetrics = context.overAllQueryMetrics;
this.queryLogger = context.queryLogger;
this.isClientSideUpsertSelect = context.isClientSideUpsertSelect;
this.isUncoveredIndex = context.isUncoveredIndex;
this.hasFirstValidResult = new AtomicBoolean(context.getHasFirstValidResult());
}
/**
* Constructor that lets you override whether or not to collect request level metrics.
*/
public StatementContext(PhoenixStatement statement, boolean collectRequestLevelMetrics) {
this(statement, FromCompiler.EMPTY_TABLE_RESOLVER, new BindManager(statement.getParameters()), new Scan(), new SequenceManager(statement), collectRequestLevelMetrics);
}
public StatementContext(PhoenixStatement statement, Scan scan) {
this(statement, FromCompiler.EMPTY_TABLE_RESOLVER, new BindManager(statement.getParameters()), scan, new SequenceManager(statement));
}
public StatementContext(PhoenixStatement statement, ColumnResolver resolver) {
this(statement, resolver, new BindManager(statement.getParameters()), new Scan(), new SequenceManager(statement));
}
public StatementContext(PhoenixStatement statement, ColumnResolver resolver, Scan scan, SequenceManager seqManager) {
this(statement, resolver, new BindManager(statement.getParameters()), scan, seqManager);
}
public StatementContext(PhoenixStatement statement, ColumnResolver resolver, BindManager binds, Scan scan, SequenceManager seqManager) {
this(statement, resolver, binds, scan, seqManager, statement.getConnection().isRequestLevelMetricsEnabled());
}
public StatementContext(PhoenixStatement statement, ColumnResolver resolver, BindManager binds, Scan scan, SequenceManager seqManager, boolean isRequestMetricsEnabled) {
this.statement = statement;
this.resolver = resolver;
this.scan = scan;
this.sequences = seqManager;
this.binds = binds;
this.aggregates = new AggregationManager();
this.expressions = new ExpressionManager();
this.connection = statement.getConnection();
ReadOnlyProps props = connection.getQueryServices().getProps();
this.numberFormat = props.get(QueryServices.NUMBER_FORMAT_ATTRIB, NumberUtil.DEFAULT_NUMBER_FORMAT);
this.tempPtr = new ImmutableBytesWritable();
this.currentTable = resolver != null && !resolver.getTables().isEmpty() ? resolver.getTables().get(0) : null;
this.whereConditionColumns = new ArrayList<Pair<byte[], byte[]>>();
this.dataColumns = this.currentTable == null ? Collections.<PColumn, Integer> emptyMap() : Maps
.<PColumn, Integer> newLinkedHashMap();
this.subqueryResults = Maps.<SelectStatement, Object> newConcurrentMap();
this.readMetricsQueue = new ReadMetricQueue(isRequestMetricsEnabled,connection.getLogLevel());
this.overAllQueryMetrics = new OverAllQueryMetrics(isRequestMetricsEnabled,connection.getLogLevel());
this.retryingPersistentCache = Maps.<Long, Boolean> newHashMap();
this.hasFirstValidResult = new AtomicBoolean(false);
}
/**
* build map from dataColumn to what will be its position in single KeyValue value bytes
* returned from the coprocessor that joins from the index row back to the data row.
* @param column
* @return
*/
public int getDataColumnPosition(PColumn column) {
Integer pos = dataColumns.get(column);
if (pos == null) {
pos = dataColumns.size();
dataColumns.put(column, pos);
}
return pos;
}
/**
* @return return set of data columns.
*/
public Set<PColumn> getDataColumns() {
return dataColumns.keySet();
}
/**
* @return map of data columns and their positions.
*/
public Map<PColumn, Integer> getDataColumnsMap() {
return dataColumns;
}
public String getDateFormatTimeZoneId() {
return connection.getDateFormatTimeZoneId();
}
public String getDateFormat() {
return connection.getDatePattern();
}
public Format getDateFormatter() {
return connection.getFormatter(PDate.INSTANCE);
}
public String getTimeFormat() {
return connection.getTimePattern();
}
public Format getTimeFormatter() {
return connection.getFormatter(PTime.INSTANCE);
}
public String getTimestampFormat() {
return connection.getTimestampPattern();
}
public Format getTimestampFormatter() {
return connection.getFormatter(PTimestamp.INSTANCE);
}
public String getNumberFormat() {
return numberFormat;
}
public Scan getScan() {
return scan;
}
public BindManager getBindManager() {
return binds;
}
public TableRef getCurrentTable() {
return currentTable;
}
public boolean getHasFirstValidResult() {
return hasFirstValidResult.get();
}
public void setHasFirstValidResult(boolean hasValidResult) {
hasFirstValidResult.set(hasValidResult);
}
public void setCurrentTable(TableRef table) {
this.currentTable = table;
}
public AggregationManager getAggregationManager() {
return aggregates;
}
public ColumnResolver getResolver() {
return resolver;
}
public void setResolver(ColumnResolver resolver) {
this.resolver = resolver;
}
public ExpressionManager getExpressionManager() {
return expressions;
}
public ImmutableBytesWritable getTempPtr() {
return tempPtr;
}
public ScanRanges getScanRanges() {
return this.scanRanges;
}
public void setScanRanges(ScanRanges scanRanges) {
this.scanRanges = scanRanges;
scanRanges.initializeScan(scan);
}
public PhoenixConnection getConnection() {
return connection;
}
public PhoenixStatement getStatement() {
return statement;
}
public long getCurrentTime() throws SQLException {
long ts = this.getCurrentTable().getTimeStamp();
// if the table is transactional then it is only resolved once per query, so we can't use the table timestamp
if (this.getCurrentTable().getTable().getType() != PTableType.SUBQUERY
&& this.getCurrentTable().getTable().getType() != PTableType.PROJECTED
&& !this.getCurrentTable().getTable().isTransactional()
&& ts != QueryConstants.UNSET_TIMESTAMP) {
return ts;
}
if (currentTime != QueryConstants.UNSET_TIMESTAMP) {
return currentTime;
}
/*
* For an UPSERT VALUES where autocommit off, we won't hit the server until the commit.
* However, if the statement has a CURRENT_DATE() call as a value, we need to know the
* current time at execution time. In that case, we'll call MetaDataClient.updateCache
* purely to bind the current time based on the server time.
*/
PTable table = this.getCurrentTable().getTable();
MetaDataClient client = new MetaDataClient(connection);
currentTime = client.getCurrentTime(table.getSchemaName().getString(), table.getTableName().getString());
return currentTime;
}
public long getCurrentTimeWithDisplacement() throws SQLException {
if (connection.isApplyTimeZoneDisplacement()) {
return DateUtil.applyInputDisplacement(new java.sql.Date(getCurrentTime()),
statement.getLocalCalendar().getTimeZone()).getTime();
} else {
return getCurrentTime();
}
}
public SequenceManager getSequenceManager(){
return sequences;
}
public void addWhereConditionColumn(byte[] cf, byte[] q) {
whereConditionColumns.add(new Pair<byte[], byte[]>(cf, q));
}
public List<Pair<byte[], byte[]>> getWhereConditionColumns() {
return whereConditionColumns;
}
public boolean isSubqueryResultAvailable(SelectStatement select) {
return subqueryResults.containsKey(select);
}
public Object getSubqueryResult(SelectStatement select) {
return subqueryResults.get(select);
}
public void setSubqueryResult(SelectStatement select, Object result) {
subqueryResults.put(select, result);
}
public ReadMetricQueue getReadMetricsQueue() {
return readMetricsQueue;
}
public OverAllQueryMetrics getOverallQueryMetrics() {
return overAllQueryMetrics;
}
public void setQueryLogger(QueryLogger queryLogger) {
this.queryLogger=queryLogger;
}
public QueryLogger getQueryLogger() {
return queryLogger;
}
public boolean isClientSideUpsertSelect() {
return isClientSideUpsertSelect;
}
public void setClientSideUpsertSelect(boolean isClientSideUpsertSelect) {
this.isClientSideUpsertSelect = isClientSideUpsertSelect;
}
public boolean isUncoveredIndex() {
return isUncoveredIndex;
}
public void setUncoveredIndex(boolean isUncoveredIndex) {
this.isUncoveredIndex = isUncoveredIndex;
}
/*
* setRetryingPersistentCache can be used to override the USE_PERSISTENT_CACHE hint and disable the use of the
* persistent cache for a specific cache ID. This can be used to retry queries that failed when using the persistent
* cache.
*/
public void setRetryingPersistentCache(long cacheId) {
retryingPersistentCache.put(cacheId, true);
}
public boolean getRetryingPersistentCache(long cacheId) {
Boolean retrying = retryingPersistentCache.get(cacheId);
if (retrying == null) {
return false;
} else {
return retrying;
}
}
}