blob: 4392e2337f265d15aa964936c5e891e96be4bb00 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.compile;
import java.sql.SQLException;
import java.util.Collections;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.schema.*;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.ScanUtil;
import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
import static org.apache.phoenix.util.IndexUtil.addEmptyColumnToScan;
/**
* Class that compiles plan to generate initial data values after a DDL command for
* index table.
*/
public class ServerBuildIndexCompiler {
private final PhoenixConnection connection;
private final String tableName;
private PTable dataTable;
private QueryPlan plan;
private class RowCountMutationPlan extends BaseMutationPlan {
private RowCountMutationPlan(StatementContext context, PhoenixStatement.Operation operation) {
super(context, operation);
}
@Override
public MutationState execute() throws SQLException {
connection.getMutationState().commitDDLFence(dataTable);
Tuple tuple = plan.iterator().next();
long rowCount = 0;
if (tuple != null) {
Cell kv = tuple.getValue(0);
ImmutableBytesWritable tmpPtr = new ImmutableBytesWritable(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
// A single Cell will be returned with the count(*) - we decode that here
rowCount = PLong.INSTANCE.getCodec().decodeLong(tmpPtr, SortOrder.getDefault());
}
// The contract is to return a MutationState that contains the number of rows modified. In this
// case, it's the number of rows in the data table which corresponds to the number of index
// rows that were added.
return new MutationState(0, 0, connection, rowCount);
}
@Override
public QueryPlan getQueryPlan() {
return plan;
}
};
public ServerBuildIndexCompiler(PhoenixConnection connection, String tableName) {
this.connection = connection;
this.tableName = tableName;
}
public MutationPlan compile(PTable index) throws SQLException {
try (final PhoenixStatement statement = new PhoenixStatement(connection)) {
String query = "SELECT count(*) FROM " + tableName;
this.plan = statement.compileQuery(query);
TableRef tableRef = plan.getTableRef();
Scan scan = plan.getContext().getScan();
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
dataTable = tableRef.getTable();
if (index.getIndexType() == PTable.IndexType.GLOBAL && dataTable.isTransactional()) {
throw new IllegalArgumentException(
"ServerBuildIndexCompiler does not support global indexes on transactional tables");
}
// By default, we'd use a FirstKeyOnly filter as nothing else needs to be projected for count(*).
// However, in this case, we need to project all of the data columns that contribute to the index.
IndexMaintainer indexMaintainer = index.getIndexMaintainer(dataTable, connection);
for (ColumnReference columnRef : indexMaintainer.getAllColumns()) {
if (index.getImmutableStorageScheme() == PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
scan.addFamily(columnRef.getFamily());
} else {
scan.addColumn(columnRef.getFamily(), columnRef.getQualifier());
}
}
IndexMaintainer.serialize(dataTable, ptr, Collections.singletonList(index), plan.getContext().getConnection());
// Set the scan attributes that UngroupedAggregateRegionObserver will switch on.
// For local indexes, the BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO attribute, and
// for global indexes PhoenixIndexCodec.INDEX_PROTO_MD attribute is set to the serialized form of index
// metadata to build index rows from data table rows. For global indexes, we also need to set (1) the
// BaseScannerRegionObserver.REBUILD_INDEXES attribute in order to signal UngroupedAggregateRegionObserver
// that this scan is for building global indexes and (2) the MetaDataProtocol.PHOENIX_VERSION attribute
// that will be passed as a mutation attribute for the scanned mutations that will be applied on
// the index table possibly remotely
if (index.getIndexType() == PTable.IndexType.LOCAL) {
scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO, ByteUtil.copyKeyBytesIfNecessary(ptr));
} else {
scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr));
scan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES);
ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION);
addEmptyColumnToScan(scan, indexMaintainer.getDataEmptyKeyValueCF(), indexMaintainer.getEmptyKeyValueQualifier());
}
if (dataTable.isTransactional()) {
scan.setAttribute(BaseScannerRegionObserver.TX_STATE, connection.getMutationState().encodeTransaction());
}
// Go through MutationPlan abstraction so that we can create local indexes
// with a connectionless connection (which makes testing easier).
return new RowCountMutationPlan(plan.getContext(), PhoenixStatement.Operation.UPSERT);
}
}
}