| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.phoenix.coprocessor; |
| |
| import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES; |
| import static org.apache.phoenix.hbase.index.IndexRegionObserver.removeEmptyColumn; |
| import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY; |
| import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP; |
| import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; |
| import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; |
| import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY; |
| import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB; |
| import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB; |
| import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone; |
| import static org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.COMPACTION_UPDATE_STATS_ROW_COUNT; |
| import static org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.CONCURRENT_UPDATE_STATS_ROW_COUNT; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.ByteArrayOutputStream; |
| import java.io.Closeable; |
| import java.io.DataInputStream; |
| import java.io.DataOutputStream; |
| import java.io.IOException; |
| import java.security.PrivilegedExceptionAction; |
| import java.sql.SQLException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutionException; |
| |
| import javax.annotation.concurrent.GuardedBy; |
| |
| import com.google.common.collect.Maps; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hbase.Cell; |
| import org.apache.hadoop.hbase.CoprocessorEnvironment; |
| import org.apache.hadoop.hbase.DoNotRetryIOException; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.KeepDeletedCells; |
| import org.apache.hadoop.hbase.KeyValue; |
| import org.apache.hadoop.hbase.NamespaceDescriptor; |
| import org.apache.hadoop.hbase.TableName; |
| import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; |
| import org.apache.hadoop.hbase.client.Connection; |
| import org.apache.hadoop.hbase.client.ConnectionFactory; |
| import org.apache.hadoop.hbase.client.Delete; |
| import org.apache.hadoop.hbase.client.Durability; |
| import org.apache.hadoop.hbase.client.Mutation; |
| import org.apache.hadoop.hbase.client.Put; |
| import org.apache.hadoop.hbase.client.RegionInfo; |
| import org.apache.hadoop.hbase.client.Result; |
| import org.apache.hadoop.hbase.client.ResultScanner; |
| import org.apache.hadoop.hbase.client.Scan; |
| import org.apache.hadoop.hbase.client.Table; |
| import org.apache.hadoop.hbase.client.TableDescriptor; |
| import org.apache.hadoop.hbase.client.TableDescriptorBuilder; |
| import org.apache.hadoop.hbase.coprocessor.ObserverContext; |
| import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; |
| import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; |
| import org.apache.hadoop.hbase.coprocessor.RegionObserver; |
| import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
| import org.apache.hadoop.hbase.ipc.RpcControllerFactory; |
| import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory; |
| import org.apache.hadoop.hbase.regionserver.InternalScanner; |
| import org.apache.hadoop.hbase.regionserver.Region; |
| import org.apache.hadoop.hbase.regionserver.RegionScanner; |
| import org.apache.hadoop.hbase.regionserver.ScanOptions; |
| import org.apache.hadoop.hbase.regionserver.ScanType; |
| import org.apache.hadoop.hbase.regionserver.Store; |
| import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; |
| import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; |
| import org.apache.hadoop.hbase.security.User; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.hbase.util.Pair; |
| import org.apache.hadoop.io.WritableUtils; |
| import org.apache.phoenix.cache.GlobalCache; |
| import org.apache.phoenix.cache.TenantCache; |
| import org.apache.phoenix.compile.ScanRanges; |
| import org.apache.phoenix.coprocessor.generated.PTableProtos; |
| import org.apache.phoenix.exception.DataExceedsCapacityException; |
| import org.apache.phoenix.exception.SQLExceptionCode; |
| import org.apache.phoenix.execute.TupleProjector; |
| import org.apache.phoenix.expression.Expression; |
| import org.apache.phoenix.expression.ExpressionType; |
| import org.apache.phoenix.expression.aggregator.Aggregator; |
| import org.apache.phoenix.expression.aggregator.Aggregators; |
| import org.apache.phoenix.expression.aggregator.ServerAggregators; |
| import org.apache.phoenix.filter.SkipScanFilter; |
| import org.apache.phoenix.hbase.index.ValueGetter; |
| import org.apache.phoenix.hbase.index.covered.update.ColumnReference; |
| import org.apache.phoenix.hbase.index.exception.IndexWriteException; |
| import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure; |
| import org.apache.phoenix.hbase.index.parallel.Task; |
| import org.apache.phoenix.hbase.index.parallel.TaskBatch; |
| import org.apache.phoenix.hbase.index.parallel.TaskRunner; |
| import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder; |
| import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager; |
| import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner; |
| import org.apache.phoenix.hbase.index.table.HTableFactory; |
| import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; |
| import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; |
| import org.apache.phoenix.hbase.index.util.KeyValueBuilder; |
| import org.apache.phoenix.index.IndexMaintainer; |
| import org.apache.phoenix.index.PhoenixIndexCodec; |
| import org.apache.phoenix.index.PhoenixIndexFailurePolicy; |
| import org.apache.phoenix.index.PhoenixIndexFailurePolicy.MutateCommand; |
| import org.apache.phoenix.index.PhoenixIndexMetaData; |
| import org.apache.phoenix.jdbc.PhoenixConnection; |
| import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; |
| import org.apache.phoenix.join.HashJoinInfo; |
| import org.apache.phoenix.memory.MemoryManager.MemoryChunk; |
| import org.apache.phoenix.query.KeyRange; |
| import org.apache.phoenix.query.QueryConstants; |
| import org.apache.phoenix.query.QueryServices; |
| import org.apache.phoenix.query.QueryServicesOptions; |
| import org.apache.phoenix.schema.ColumnFamilyNotFoundException; |
| import org.apache.phoenix.schema.PColumn; |
| import org.apache.phoenix.schema.PRow; |
| import org.apache.phoenix.schema.PTable; |
| import org.apache.phoenix.schema.PTableImpl; |
| import org.apache.phoenix.schema.PTableType; |
| import org.apache.phoenix.schema.RowKeySchema; |
| import org.apache.phoenix.schema.SortOrder; |
| import org.apache.phoenix.schema.TableNotFoundException; |
| import org.apache.phoenix.schema.TableRef; |
| import org.apache.phoenix.schema.ValueSchema.Field; |
| import org.apache.phoenix.schema.stats.NoOpStatisticsCollector; |
| import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker; |
| import org.apache.phoenix.schema.stats.StatisticsCollector; |
| import org.apache.phoenix.schema.stats.StatisticsCollectorFactory; |
| import org.apache.phoenix.schema.stats.StatsCollectionDisabledOnServerException; |
| import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList; |
| import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; |
| import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple; |
| import org.apache.phoenix.schema.tuple.Tuple; |
| import org.apache.phoenix.schema.types.PBinary; |
| import org.apache.phoenix.schema.types.PChar; |
| import org.apache.phoenix.schema.types.PDataType; |
| import org.apache.phoenix.schema.types.PDouble; |
| import org.apache.phoenix.schema.types.PFloat; |
| import org.apache.phoenix.schema.types.PLong; |
| import org.apache.phoenix.schema.types.PVarbinary; |
| import org.apache.phoenix.transaction.PhoenixTransactionContext; |
| import org.apache.phoenix.transaction.PhoenixTransactionProvider; |
| import org.apache.phoenix.transaction.TransactionFactory; |
| import org.apache.phoenix.util.ByteUtil; |
| import org.apache.phoenix.util.EncodedColumnsUtil; |
| import org.apache.phoenix.util.EnvironmentEdgeManager; |
| import org.apache.phoenix.util.ExpressionUtil; |
| import org.apache.phoenix.util.IndexUtil; |
| import org.apache.phoenix.util.LogUtil; |
| import org.apache.phoenix.util.PhoenixKeyValueUtil; |
| import org.apache.phoenix.util.PhoenixRuntime; |
| import org.apache.phoenix.util.PropertiesUtil; |
| import org.apache.phoenix.util.QueryUtil; |
| import org.apache.phoenix.util.ReadOnlyProps; |
| import org.apache.phoenix.util.ScanUtil; |
| import org.apache.phoenix.util.SchemaUtil; |
| import org.apache.phoenix.util.ServerUtil; |
| import org.apache.phoenix.util.ServerUtil.ConnectionType; |
| import org.apache.phoenix.util.StringUtil; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.base.Throwables; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Sets; |
| import com.google.common.primitives.Ints; |
| |
| |
| /** |
| * Region observer that aggregates ungrouped rows(i.e. SQL query with aggregation function and no GROUP BY). |
| * |
| * |
| * @since 0.1 |
| */ |
| public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver implements RegionCoprocessor { |
| // TODO: move all constants into a single class |
| public static final String UNGROUPED_AGG = "UngroupedAgg"; |
| public static final String DELETE_AGG = "DeleteAgg"; |
| public static final String DELETE_CQ = "DeleteCQ"; |
| public static final String DELETE_CF = "DeleteCF"; |
| public static final String EMPTY_CF = "EmptyCF"; |
| /** |
| * This lock used for synchronizing the state of |
| * {@link UngroupedAggregateRegionObserver#scansReferenceCount}, |
| * {@link UngroupedAggregateRegionObserver#isRegionClosingOrSplitting} variables used to avoid possible |
| * dead lock situation in case below steps: |
| * 1. We get read lock when we start writing local indexes, deletes etc.. |
| * 2. when memstore reach threshold, flushes happen. Since they use read (shared) lock they |
| * happen without any problem until someone tries to obtain write lock. |
| * 3. at one moment we decide to split/bulkload/close and try to acquire write lock. |
| * 4. Since that moment all attempts to get read lock will be blocked. I.e. no more |
| * flushes will happen. But we continue to fill memstore with local index batches and |
| * finally we get RTBE. |
| * |
| * The solution to this is to not allow or delay operations acquire the write lock. |
| * 1) In case of split we just throw IOException so split won't happen but it will not cause any harm. |
| * 2) In case of bulkload failing it by throwing the exception. |
| * 3) In case of region close by balancer/move wait before closing the reason and fail the query which |
| * does write after reading. |
| * |
| * See PHOENIX-3111 for more info. |
| */ |
| |
| private final Object lock = new Object(); |
| /** |
| * To maintain the number of scans used for create index, delete and upsert select operations |
| * which reads and writes to same region in coprocessors. |
| */ |
| @GuardedBy("lock") |
| private int scansReferenceCount = 0; |
| @GuardedBy("lock") |
| private boolean isRegionClosingOrSplitting = false; |
| private static final Logger LOGGER = LoggerFactory.getLogger(UngroupedAggregateRegionObserver.class); |
| private KeyValueBuilder kvBuilder; |
| private Configuration upsertSelectConfig; |
| private Configuration compactionConfig; |
| private Configuration indexWriteConfig; |
| private ReadOnlyProps indexWriteProps; |
| |
| @Override |
| public Optional<RegionObserver> getRegionObserver() { |
| return Optional.of(this); |
| } |
| |
| |
| @Override |
| public void start(CoprocessorEnvironment e) throws IOException { |
| // Can't use ClientKeyValueBuilder on server-side because the memstore expects to |
| // be able to get a single backing buffer for a KeyValue. |
| this.kvBuilder = GenericKeyValueBuilder.INSTANCE; |
| /* |
| * We need to create a copy of region's configuration since we don't want any side effect of |
| * setting the RpcControllerFactory. |
| */ |
| upsertSelectConfig = PropertiesUtil.cloneConfig(e.getConfiguration()); |
| /* |
| * Till PHOENIX-3995 is fixed, we need to use the |
| * InterRegionServerIndexRpcControllerFactory. Although this would cause remote RPCs to use |
| * index handlers on the destination region servers, it is better than using the regular |
| * priority handlers which could result in a deadlock. |
| */ |
| upsertSelectConfig.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, |
| InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class); |
| |
| compactionConfig = ServerUtil.getCompactionConfig(e.getConfiguration()); |
| |
| // For retries of index write failures, use the same # of retries as the rebuilder |
| indexWriteConfig = PropertiesUtil.cloneConfig(e.getConfiguration()); |
| indexWriteConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, |
| e.getConfiguration().getInt(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER, |
| QueryServicesOptions.DEFAULT_INDEX_REBUILD_RPC_RETRIES_COUNTER)); |
| indexWriteProps = new ReadOnlyProps(indexWriteConfig.iterator()); |
| } |
| |
| public void commitBatchWithRetries(final Region region, final List<Mutation> localRegionMutations, final long blockingMemstoreSize) throws IOException { |
| try { |
| commitBatch(region, localRegionMutations, blockingMemstoreSize); |
| } catch (IOException e) { |
| handleIndexWriteException(localRegionMutations, e, new MutateCommand() { |
| @Override |
| public void doMutation() throws IOException { |
| commitBatch(region, localRegionMutations, blockingMemstoreSize); |
| } |
| |
| @Override |
| public List<Mutation> getMutationList() { |
| return localRegionMutations; |
| } |
| }); |
| } |
| } |
| |
| private void commitBatch(Region region, List<Mutation> mutations, long blockingMemstoreSize) throws IOException { |
| if (mutations.isEmpty()) { |
| return; |
| } |
| |
| Mutation[] mutationArray = new Mutation[mutations.size()]; |
| // When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the |
| // flush happen which decrease the memstore size and then writes allowed on the region. |
| for (int i = 0; blockingMemstoreSize > 0 && (region.getMemStoreHeapSize() + region.getMemStoreOffHeapSize()) > blockingMemstoreSize |
| && i < 30; i++) { |
| try { |
| checkForRegionClosingOrSplitting(); |
| Thread.sleep(100); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new IOException(e); |
| } |
| } |
| // TODO: should we use the one that is all or none? |
| LOGGER.debug("Committing batch of " + mutations.size() + " mutations for " + region.getRegionInfo().getTable().getNameAsString()); |
| region.batchMutate(mutations.toArray(mutationArray)); |
| } |
| |
| private void setIndexAndTransactionProperties(List<Mutation> mutations, byte[] indexUUID, |
| byte[] indexMaintainersPtr, byte[] txState, byte[] clientVersionBytes, |
| boolean useIndexProto) { |
| for (Mutation m : mutations) { |
| if (indexMaintainersPtr != null) { |
| m.setAttribute(useIndexProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMaintainersPtr); |
| } |
| if (indexUUID != null) { |
| m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID); |
| } |
| if (txState != null) { |
| m.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); |
| } |
| if (clientVersionBytes != null) { |
| m.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes); |
| } |
| } |
| } |
| |
| private void commitBatchWithHTable(Table table, List<Mutation> mutations) throws IOException { |
| if (mutations.isEmpty()) { |
| return; |
| } |
| |
| LOGGER.debug("Committing batch of " + mutations.size() + " mutations for " + table); |
| try { |
| table.batch(mutations, null); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new RuntimeException(e); |
| } |
| } |
| |
| /** |
| * There is a chance that region might be closing while running balancer/move/merge. In this |
| * case if the memstore size reaches blockingMemstoreSize better to fail query because there is |
| * a high chance that flush might not proceed and memstore won't be freed up. |
| * @throws IOException |
| */ |
| public void checkForRegionClosingOrSplitting() throws IOException { |
| synchronized (lock) { |
| if(isRegionClosingOrSplitting) { |
| lock.notifyAll(); |
| throw new IOException("Region is getting closed. Not allowing to write to avoid possible deadlock."); |
| } |
| } |
| } |
| |
| public static void serializeIntoScan(Scan scan) { |
| scan.setAttribute(BaseScannerRegionObserver.UNGROUPED_AGG, QueryConstants.TRUE); |
| } |
| |
| @Override |
| public void preScannerOpen(org.apache.hadoop.hbase.coprocessor.ObserverContext<RegionCoprocessorEnvironment> c, |
| Scan scan) throws IOException { |
| super.preScannerOpen(c, scan); |
| if (ScanUtil.isAnalyzeTable(scan)) { |
| // We are setting the start row and stop row such that it covers the entire region. As part |
| // of Phonenix-1263 we are storing the guideposts against the physical table rather than |
| // individual tenant specific tables. |
| scan.withStartRow(HConstants.EMPTY_START_ROW); |
| scan.withStopRow(HConstants.EMPTY_END_ROW); |
| scan.setFilter(null); |
| } |
| } |
| |
| public static class MutationList extends ArrayList<Mutation> { |
| private long byteSize = 0l; |
| public MutationList() { |
| super(); |
| } |
| |
| public MutationList(int size){ |
| super(size); |
| } |
| |
| @Override |
| public boolean add(Mutation e) { |
| boolean r = super.add(e); |
| if (r) { |
| this.byteSize += PhoenixKeyValueUtil.calculateMutationDiskSize(e); |
| } |
| return r; |
| } |
| |
| public long byteSize() { |
| return byteSize; |
| } |
| |
| @Override |
| public void clear() { |
| byteSize = 0l; |
| super.clear(); |
| } |
| } |
| |
| public static long getBlockingMemstoreSize(Region region, Configuration conf) { |
| long flushSize = region.getTableDescriptor().getMemStoreFlushSize(); |
| |
| if (flushSize <= 0) { |
| flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, |
| TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE); |
| } |
| return flushSize * (conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, |
| HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER)-1) ; |
| } |
| |
| @Override |
| protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException, SQLException { |
| RegionCoprocessorEnvironment env = c.getEnvironment(); |
| Region region = env.getRegion(); |
| long ts = scan.getTimeRange().getMax(); |
| boolean localIndexScan = ScanUtil.isLocalIndex(scan); |
| if (ScanUtil.isAnalyzeTable(scan)) { |
| byte[] gp_width_bytes = |
| scan.getAttribute(BaseScannerRegionObserver.GUIDEPOST_WIDTH_BYTES); |
| byte[] gp_per_region_bytes = |
| scan.getAttribute(BaseScannerRegionObserver.GUIDEPOST_PER_REGION); |
| // Let this throw, as this scan is being done for the sole purpose of collecting stats |
| StatisticsCollector statsCollector = StatisticsCollectorFactory.createStatisticsCollector( |
| env, region.getRegionInfo().getTable().getNameAsString(), ts, |
| gp_width_bytes, gp_per_region_bytes); |
| if (statsCollector instanceof NoOpStatisticsCollector) { |
| throw new StatsCollectionDisabledOnServerException(); |
| } else { |
| return collectStats(s, statsCollector, region, scan, env.getConfiguration()); |
| } |
| } else if (ScanUtil.isIndexRebuild(scan)) { |
| return rebuildIndices(s, region, scan, env); |
| } |
| |
| PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan); |
| boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan); |
| int offsetToBe = 0; |
| if (localIndexScan) { |
| /* |
| * For local indexes, we need to set an offset on row key expressions to skip |
| * the region start key. |
| */ |
| offsetToBe = region.getRegionInfo().getStartKey().length != 0 ? region.getRegionInfo().getStartKey().length : |
| region.getRegionInfo().getEndKey().length; |
| ScanUtil.setRowKeyOffset(scan, offsetToBe); |
| } |
| final int offset = offsetToBe; |
| |
| PTable projectedTable = null; |
| PTable writeToTable = null; |
| byte[][] values = null; |
| byte[] descRowKeyTableBytes = scan.getAttribute(UPGRADE_DESC_ROW_KEY); |
| boolean isDescRowKeyOrderUpgrade = descRowKeyTableBytes != null; |
| if (isDescRowKeyOrderUpgrade) { |
| LOGGER.debug("Upgrading row key for " + region.getRegionInfo().getTable().getNameAsString()); |
| projectedTable = deserializeTable(descRowKeyTableBytes); |
| try { |
| writeToTable = PTableImpl.builderWithColumns(projectedTable, |
| getColumnsToClone(projectedTable)) |
| .setRowKeyOrderOptimizable(true) |
| .build(); |
| } catch (SQLException e) { |
| ServerUtil.throwIOException("Upgrade failed", e); // Impossible |
| } |
| values = new byte[projectedTable.getPKColumns().size()][]; |
| } |
| boolean useProto = false; |
| byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD_PROTO); |
| useProto = localIndexBytes != null; |
| if (localIndexBytes == null) { |
| localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD); |
| } |
| List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes, useProto); |
| MutationList indexMutations = localIndexBytes == null ? new MutationList() : new MutationList(1024); |
| |
| RegionScanner theScanner = s; |
| |
| byte[] replayMutations = scan.getAttribute(BaseScannerRegionObserver.REPLAY_WRITES); |
| byte[] indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID); |
| byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE); |
| byte[] clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION); |
| PhoenixTransactionProvider txnProvider = null; |
| if (txState != null) { |
| int clientVersion = clientVersionBytes == null ? ScanUtil.UNKNOWN_CLIENT_VERSION : Bytes.toInt(clientVersionBytes); |
| txnProvider = TransactionFactory.getTransactionProvider(txState, clientVersion); |
| } |
| List<Expression> selectExpressions = null; |
| byte[] upsertSelectTable = scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_TABLE); |
| boolean isUpsert = false; |
| boolean isDelete = false; |
| byte[] deleteCQ = null; |
| byte[] deleteCF = null; |
| byte[] emptyCF = null; |
| Table targetHTable = null; |
| Connection targetHConn = null; |
| boolean isPKChanging = false; |
| ImmutableBytesWritable ptr = new ImmutableBytesWritable(); |
| if (upsertSelectTable != null) { |
| isUpsert = true; |
| projectedTable = deserializeTable(upsertSelectTable); |
| targetHConn = ConnectionFactory.createConnection(upsertSelectConfig); |
| targetHTable = targetHConn.getTable( |
| TableName.valueOf(projectedTable.getPhysicalName().getBytes())); |
| selectExpressions = deserializeExpressions(scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS)); |
| values = new byte[projectedTable.getPKColumns().size()][]; |
| isPKChanging = ExpressionUtil.isPkPositionChanging(new TableRef(projectedTable), selectExpressions); |
| } else { |
| byte[] isDeleteAgg = scan.getAttribute(BaseScannerRegionObserver.DELETE_AGG); |
| isDelete = isDeleteAgg != null && Bytes.compareTo(PDataType.TRUE_BYTES, isDeleteAgg) == 0; |
| if (!isDelete) { |
| deleteCF = scan.getAttribute(BaseScannerRegionObserver.DELETE_CF); |
| deleteCQ = scan.getAttribute(BaseScannerRegionObserver.DELETE_CQ); |
| } |
| emptyCF = scan.getAttribute(BaseScannerRegionObserver.EMPTY_CF); |
| } |
| TupleProjector tupleProjector = null; |
| byte[][] viewConstants = null; |
| ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan); |
| final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan); |
| final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan); |
| boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan)); |
| if ((localIndexScan && !isDelete && !isDescRowKeyOrderUpgrade) || (j == null && p != null)) { |
| if (dataColumns != null) { |
| tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns); |
| viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan); |
| } |
| ImmutableBytesWritable tempPtr = new ImmutableBytesWritable(); |
| theScanner = |
| getWrappedScanner(c, theScanner, offset, scan, dataColumns, tupleProjector, |
| region, indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr, useQualifierAsIndex); |
| } |
| |
| if (j != null) { |
| theScanner = new HashJoinRegionScanner(theScanner, p, j, ScanUtil.getTenantId(scan), env, useQualifierAsIndex, useNewValueColumnQualifier); |
| } |
| |
| int maxBatchSize = 0; |
| long maxBatchSizeBytes = 0L; |
| MutationList mutations = new MutationList(); |
| boolean needToWrite = false; |
| Configuration conf = env.getConfiguration(); |
| |
| /** |
| * Slow down the writes if the memstore size more than |
| * (hbase.hregion.memstore.block.multiplier - 1) times hbase.hregion.memstore.flush.size |
| * bytes. This avoids flush storm to hdfs for cases like index building where reads and |
| * write happen to all the table regions in the server. |
| */ |
| final long blockingMemStoreSize = getBlockingMemstoreSize(region, conf); |
| |
| boolean buildLocalIndex = indexMaintainers != null && dataColumns==null && !localIndexScan; |
| if(buildLocalIndex) { |
| checkForLocalIndexColumnFamilies(region, indexMaintainers); |
| } |
| if (isDescRowKeyOrderUpgrade || isDelete || isUpsert || (deleteCQ != null && deleteCF != null) || emptyCF != null || buildLocalIndex) { |
| needToWrite = true; |
| maxBatchSize = conf.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE); |
| mutations = new MutationList(Ints.saturatedCast(maxBatchSize + maxBatchSize / 10)); |
| maxBatchSizeBytes = conf.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB, |
| QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES); |
| } |
| boolean hasMore; |
| int rowCount = 0; |
| boolean hasAny = false; |
| boolean acquiredLock = false; |
| boolean incrScanRefCount = false; |
| Aggregators aggregators = null; |
| Aggregator[] rowAggregators = null; |
| final RegionScanner innerScanner = theScanner; |
| final TenantCache tenantCache = GlobalCache.getTenantCache(env, ScanUtil.getTenantId(scan)); |
| try (MemoryChunk em = tenantCache.getMemoryManager().allocate(0)) { |
| aggregators = ServerAggregators.deserialize( |
| scan.getAttribute(BaseScannerRegionObserver.AGGREGATORS), conf, em); |
| rowAggregators = aggregators.getAggregators(); |
| Pair<Integer, Integer> minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan); |
| Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple(); |
| if (LOGGER.isDebugEnabled()) { |
| LOGGER.debug(LogUtil.addCustomAnnotations("Starting ungrouped coprocessor scan " + scan + " "+region.getRegionInfo(), ScanUtil.getCustomAnnotations(scan))); |
| } |
| boolean useIndexProto = true; |
| byte[] indexMaintainersPtr = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD); |
| // for backward compatiblity fall back to look by the old attribute |
| if (indexMaintainersPtr == null) { |
| indexMaintainersPtr = scan.getAttribute(PhoenixIndexCodec.INDEX_MD); |
| useIndexProto = false; |
| } |
| |
| if(needToWrite) { |
| synchronized (lock) { |
| if (isRegionClosingOrSplitting) { |
| throw new IOException("Temporarily unable to write from scan because region is closing or splitting"); |
| } |
| scansReferenceCount++; |
| incrScanRefCount = true; |
| lock.notifyAll(); |
| } |
| } |
| region.startRegionOperation(); |
| acquiredLock = true; |
| synchronized (innerScanner) { |
| do { |
| List<Cell> results = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>(); |
| // Results are potentially returned even when the return value of s.next is false |
| // since this is an indication of whether or not there are more values after the |
| // ones returned |
| hasMore = innerScanner.nextRaw(results); |
| if (!results.isEmpty()) { |
| rowCount++; |
| result.setKeyValues(results); |
| if (isDescRowKeyOrderUpgrade) { |
| Arrays.fill(values, null); |
| Cell firstKV = results.get(0); |
| RowKeySchema schema = projectedTable.getRowKeySchema(); |
| int maxOffset = schema.iterator(firstKV.getRowArray(), firstKV.getRowOffset() + offset, firstKV.getRowLength(), ptr); |
| for (int i = 0; i < schema.getFieldCount(); i++) { |
| Boolean hasValue = schema.next(ptr, i, maxOffset); |
| if (hasValue == null) { |
| break; |
| } |
| Field field = schema.getField(i); |
| if (field.getSortOrder() == SortOrder.DESC) { |
| // Special case for re-writing DESC ARRAY, as the actual byte value needs to change in this case |
| if (field.getDataType().isArrayType()) { |
| field.getDataType().coerceBytes(ptr, null, field.getDataType(), |
| field.getMaxLength(), field.getScale(), field.getSortOrder(), |
| field.getMaxLength(), field.getScale(), field.getSortOrder(), true); // force to use correct separator byte |
| } |
| // Special case for re-writing DESC CHAR or DESC BINARY, to force the re-writing of trailing space characters |
| else if (field.getDataType() == PChar.INSTANCE || field.getDataType() == PBinary.INSTANCE) { |
| int len = ptr.getLength(); |
| while (len > 0 && ptr.get()[ptr.getOffset() + len - 1] == StringUtil.SPACE_UTF8) { |
| len--; |
| } |
| ptr.set(ptr.get(), ptr.getOffset(), len); |
| // Special case for re-writing DESC FLOAT and DOUBLE, as they're not inverted like they should be (PHOENIX-2171) |
| } else if (field.getDataType() == PFloat.INSTANCE || field.getDataType() == PDouble.INSTANCE) { |
| byte[] invertedBytes = SortOrder.invert(ptr.get(), ptr.getOffset(), ptr.getLength()); |
| ptr.set(invertedBytes); |
| } |
| } else if (field.getDataType() == PBinary.INSTANCE) { |
| // Remove trailing space characters so that the setValues call below will replace them |
| // with the correct zero byte character. Note this is somewhat dangerous as these |
| // could be legit, but I don't know what the alternative is. |
| int len = ptr.getLength(); |
| while (len > 0 && ptr.get()[ptr.getOffset() + len - 1] == StringUtil.SPACE_UTF8) { |
| len--; |
| } |
| ptr.set(ptr.get(), ptr.getOffset(), len); |
| } |
| values[i] = ptr.copyBytes(); |
| } |
| writeToTable.newKey(ptr, values); |
| if (Bytes.compareTo( |
| firstKV.getRowArray(), firstKV.getRowOffset() + offset, firstKV.getRowLength(), |
| ptr.get(),ptr.getOffset() + offset,ptr.getLength()) == 0) { |
| continue; |
| } |
| byte[] newRow = ByteUtil.copyKeyBytesIfNecessary(ptr); |
| if (offset > 0) { // for local indexes (prepend region start key) |
| byte[] newRowWithOffset = new byte[offset + newRow.length]; |
| System.arraycopy(firstKV.getRowArray(), firstKV.getRowOffset(), newRowWithOffset, 0, offset);; |
| System.arraycopy(newRow, 0, newRowWithOffset, offset, newRow.length); |
| newRow = newRowWithOffset; |
| } |
| byte[] oldRow = Bytes.copy(firstKV.getRowArray(), firstKV.getRowOffset(), firstKV.getRowLength()); |
| for (Cell cell : results) { |
| // Copy existing cell but with new row key |
| Cell newCell = new KeyValue(newRow, 0, newRow.length, |
| cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), |
| cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), |
| cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()), |
| cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); |
| switch (KeyValue.Type.codeToType(cell.getTypeByte())) { |
| case Put: |
| // If Put, point delete old Put |
| Delete del = new Delete(oldRow); |
| del.addDeleteMarker(new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), |
| cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), |
| cell.getQualifierArray(), cell.getQualifierOffset(), |
| cell.getQualifierLength(), cell.getTimestamp(), KeyValue.Type.Delete, |
| ByteUtil.EMPTY_BYTE_ARRAY, 0, 0)); |
| mutations.add(del); |
| |
| Put put = new Put(newRow); |
| put.add(newCell); |
| mutations.add(put); |
| break; |
| case Delete: |
| case DeleteColumn: |
| case DeleteFamily: |
| case DeleteFamilyVersion: |
| Delete delete = new Delete(newRow); |
| delete.addDeleteMarker(newCell); |
| mutations.add(delete); |
| break; |
| } |
| } |
| } else if (buildLocalIndex) { |
| for (IndexMaintainer maintainer : indexMaintainers) { |
| if (!results.isEmpty()) { |
| result.getKey(ptr); |
| ValueGetter valueGetter = |
| maintainer.createGetterFromKeyValues( |
| ImmutableBytesPtr.copyBytesIfNecessary(ptr), |
| results); |
| Put put = maintainer.buildUpdateMutation(kvBuilder, |
| valueGetter, ptr, results.get(0).getTimestamp(), |
| env.getRegion().getRegionInfo().getStartKey(), |
| env.getRegion().getRegionInfo().getEndKey()); |
| |
| if (txnProvider != null) { |
| put = txnProvider.markPutAsCommitted(put, ts, ts); |
| } |
| indexMutations.add(put); |
| } |
| } |
| result.setKeyValues(results); |
| } else if (isDelete) { |
| // FIXME: the version of the Delete constructor without the lock |
| // args was introduced in 0.94.4, thus if we try to use it here |
| // we can no longer use the 0.94.2 version of the client. |
| Cell firstKV = results.get(0); |
| Delete delete = new Delete(firstKV.getRowArray(), |
| firstKV.getRowOffset(), firstKV.getRowLength(),ts); |
| if (replayMutations != null) { |
| delete.setAttribute(REPLAY_WRITES, replayMutations); |
| } |
| mutations.add(delete); |
| // force tephra to ignore this deletes |
| delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); |
| } else if (isUpsert) { |
| Arrays.fill(values, null); |
| int bucketNumOffset = 0; |
| if (projectedTable.getBucketNum() != null) { |
| values[0] = new byte[] { 0 }; |
| bucketNumOffset = 1; |
| } |
| int i = bucketNumOffset; |
| List<PColumn> projectedColumns = projectedTable.getColumns(); |
| for (; i < projectedTable.getPKColumns().size(); i++) { |
| Expression expression = selectExpressions.get(i - bucketNumOffset); |
| if (expression.evaluate(result, ptr)) { |
| values[i] = ptr.copyBytes(); |
| // If SortOrder from expression in SELECT doesn't match the |
| // column being projected into then invert the bits. |
| if (expression.getSortOrder() != |
| projectedColumns.get(i).getSortOrder()) { |
| SortOrder.invert(values[i], 0, values[i], 0, |
| values[i].length); |
| } |
| }else{ |
| values[i] = ByteUtil.EMPTY_BYTE_ARRAY; |
| } |
| } |
| projectedTable.newKey(ptr, values); |
| PRow row = projectedTable.newRow(kvBuilder, ts, ptr, false); |
| for (; i < projectedColumns.size(); i++) { |
| Expression expression = selectExpressions.get(i - bucketNumOffset); |
| if (expression.evaluate(result, ptr)) { |
| PColumn column = projectedColumns.get(i); |
| if (!column.getDataType().isSizeCompatible(ptr, null, |
| expression.getDataType(), expression.getSortOrder(), |
| expression.getMaxLength(), expression.getScale(), |
| column.getMaxLength(), column.getScale())) { |
| throw new DataExceedsCapacityException( |
| column.getDataType(), column.getMaxLength(), |
| column.getScale(), column.getName().getString(), ptr); |
| } |
| column.getDataType().coerceBytes(ptr, null, |
| expression.getDataType(), expression.getMaxLength(), |
| expression.getScale(), expression.getSortOrder(), |
| column.getMaxLength(), column.getScale(), |
| column.getSortOrder(), projectedTable.rowKeyOrderOptimizable()); |
| byte[] bytes = ByteUtil.copyKeyBytesIfNecessary(ptr); |
| row.setValue(column, bytes); |
| } |
| } |
| for (Mutation mutation : row.toRowMutations()) { |
| if (replayMutations != null) { |
| mutation.setAttribute(REPLAY_WRITES, replayMutations); |
| } else if (txnProvider != null && projectedTable.getType() == PTableType.INDEX) { |
| mutation = txnProvider.markPutAsCommitted((Put)mutation, ts, ts); |
| } |
| mutations.add(mutation); |
| } |
| for (i = 0; i < selectExpressions.size(); i++) { |
| selectExpressions.get(i).reset(); |
| } |
| } else if (deleteCF != null && deleteCQ != null) { |
| // No need to search for delete column, since we project only it |
| // if no empty key value is being set |
| if (emptyCF == null || |
| result.getValue(deleteCF, deleteCQ) != null) { |
| Delete delete = new Delete(results.get(0).getRowArray(), |
| results.get(0).getRowOffset(), |
| results.get(0).getRowLength()); |
| delete.addColumns(deleteCF, deleteCQ, ts); |
| // force tephra to ignore this deletes |
| delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); |
| mutations.add(delete); |
| } |
| } |
| if (emptyCF != null) { |
| /* |
| * If we've specified an emptyCF, then we need to insert an empty |
| * key value "retroactively" for any key value that is visible at |
| * the timestamp that the DDL was issued. Key values that are not |
| * visible at this timestamp will not ever be projected up to |
| * scans past this timestamp, so don't need to be considered. |
| * We insert one empty key value per row per timestamp. |
| */ |
| Set<Long> timeStamps = |
| Sets.newHashSetWithExpectedSize(results.size()); |
| for (Cell kv : results) { |
| long kvts = kv.getTimestamp(); |
| if (!timeStamps.contains(kvts)) { |
| Put put = new Put(kv.getRowArray(), kv.getRowOffset(), |
| kv.getRowLength()); |
| put.addColumn(emptyCF, QueryConstants.EMPTY_COLUMN_BYTES, kvts, |
| ByteUtil.EMPTY_BYTE_ARRAY); |
| mutations.add(put); |
| } |
| } |
| } |
| if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) { |
| commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, |
| txState, targetHTable, useIndexProto, isPKChanging, clientVersionBytes); |
| mutations.clear(); |
| } |
| // Commit in batches based on UPSERT_BATCH_SIZE_BYTES_ATTRIB in config |
| |
| if (ServerUtil.readyToCommit(indexMutations.size(), indexMutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) { |
| setIndexAndTransactionProperties(indexMutations, indexUUID, indexMaintainersPtr, txState, clientVersionBytes, useIndexProto); |
| commitBatch(region, indexMutations, blockingMemStoreSize); |
| indexMutations.clear(); |
| } |
| aggregators.aggregate(rowAggregators, result); |
| hasAny = true; |
| } |
| } while (hasMore); |
| if (!mutations.isEmpty()) { |
| commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, txState, |
| targetHTable, useIndexProto, isPKChanging, clientVersionBytes); |
| mutations.clear(); |
| } |
| |
| if (!indexMutations.isEmpty()) { |
| commitBatch(region, indexMutations, blockingMemStoreSize); |
| indexMutations.clear(); |
| } |
| } |
| } finally { |
| if (needToWrite && incrScanRefCount) { |
| synchronized (lock) { |
| scansReferenceCount--; |
| if (scansReferenceCount < 0) { |
| LOGGER.warn( |
| "Scan reference count went below zero. Something isn't correct. Resetting it back to zero"); |
| scansReferenceCount = 0; |
| } |
| lock.notifyAll(); |
| } |
| } |
| try { |
| tryClosingResourceSilently(targetHTable); |
| tryClosingResourceSilently(targetHConn); |
| } finally { |
| try { |
| innerScanner.close(); |
| } finally { |
| if (acquiredLock) region.closeRegionOperation(); |
| } |
| } |
| } |
| if (LOGGER.isDebugEnabled()) { |
| LOGGER.debug(LogUtil.addCustomAnnotations("Finished scanning " + rowCount + " rows for ungrouped coprocessor scan " + scan, ScanUtil.getCustomAnnotations(scan))); |
| } |
| |
| final boolean hadAny = hasAny; |
| Cell keyValue = null; |
| if (hadAny) { |
| byte[] value = aggregators.toBytes(rowAggregators); |
| keyValue = PhoenixKeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length); |
| } |
| final Cell aggKeyValue = keyValue; |
| |
| RegionScanner scanner = new BaseRegionScanner(innerScanner) { |
| private boolean done = !hadAny; |
| |
| @Override |
| public boolean isFilterDone() { |
| return done; |
| } |
| |
| @Override |
| public boolean next(List<Cell> results) throws IOException { |
| if (done) return false; |
| done = true; |
| results.add(aggKeyValue); |
| return false; |
| } |
| |
| @Override |
| public long getMaxResultSize() { |
| return scan.getMaxResultSize(); |
| } |
| }; |
| return scanner; |
| |
| } |
| |
| private static void tryClosingResourceSilently(Closeable res) { |
| if (res != null) { |
| try { |
| res.close(); |
| } catch (IOException e) { |
| LOGGER.error("Closing resource: " + res + " failed: ", e); |
| } |
| } |
| } |
| |
| private void checkForLocalIndexColumnFamilies(Region region, |
| List<IndexMaintainer> indexMaintainers) throws IOException { |
| TableDescriptor tableDesc = region.getTableDescriptor(); |
| String schemaName = |
| tableDesc.getTableName().getNamespaceAsString() |
| .equals(NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR) ? SchemaUtil |
| .getSchemaNameFromFullName(tableDesc.getTableName().getNameAsString()) |
| : tableDesc.getTableName().getNamespaceAsString(); |
| String tableName = SchemaUtil.getTableNameFromFullName(tableDesc.getTableName().getNameAsString()); |
| for (IndexMaintainer indexMaintainer : indexMaintainers) { |
| Set<ColumnReference> coveredColumns = indexMaintainer.getCoveredColumns(); |
| if(coveredColumns.isEmpty()) { |
| byte[] localIndexCf = indexMaintainer.getEmptyKeyValueFamily().get(); |
| // When covered columns empty we store index data in default column family so check for it. |
| if (tableDesc.getColumnFamily(localIndexCf) == null) { |
| ServerUtil.throwIOException("Column Family Not Found", |
| new ColumnFamilyNotFoundException(schemaName, tableName, Bytes |
| .toString(localIndexCf))); |
| } |
| } |
| for (ColumnReference reference : coveredColumns) { |
| byte[] cf = IndexUtil.getLocalIndexColumnFamily(reference.getFamily()); |
| ColumnFamilyDescriptor family = region.getTableDescriptor().getColumnFamily(cf); |
| if (family == null) { |
| ServerUtil.throwIOException("Column Family Not Found", |
| new ColumnFamilyNotFoundException(schemaName, tableName, Bytes.toString(cf))); |
| } |
| } |
| } |
| } |
| |
| private void commit(Region region, List<Mutation> mutations, byte[] indexUUID, |
| long blockingMemStoreSize, byte[] indexMaintainersPtr, byte[] txState, |
| Table targetHTable, boolean useIndexProto, boolean isPKChanging, |
| byte[] clientVersionBytes) throws IOException { |
| List<Mutation> localRegionMutations = Lists.newArrayList(); |
| List<Mutation> remoteRegionMutations = Lists.newArrayList(); |
| setIndexAndTransactionProperties(mutations, indexUUID, indexMaintainersPtr, txState, |
| clientVersionBytes, useIndexProto); |
| separateLocalAndRemoteMutations(targetHTable, region, mutations, localRegionMutations, remoteRegionMutations, |
| isPKChanging); |
| commitBatchWithRetries(region, localRegionMutations, blockingMemStoreSize); |
| try { |
| commitBatchWithHTable(targetHTable, remoteRegionMutations); |
| } catch (IOException e) { |
| handleIndexWriteException(remoteRegionMutations, e, new MutateCommand() { |
| @Override |
| public void doMutation() throws IOException { |
| commitBatchWithHTable(targetHTable, remoteRegionMutations); |
| } |
| |
| @Override |
| public List<Mutation> getMutationList() { |
| return remoteRegionMutations; |
| } |
| }); |
| } |
| localRegionMutations.clear(); |
| remoteRegionMutations.clear(); |
| } |
| |
| private void handleIndexWriteException(final List<Mutation> localRegionMutations, IOException origIOE, |
| MutateCommand mutateCommand) throws IOException { |
| long serverTimestamp = ServerUtil.parseTimestampFromRemoteException(origIOE); |
| SQLException inferredE = ServerUtil.parseLocalOrRemoteServerException(origIOE); |
| if (inferredE != null && inferredE.getErrorCode() == SQLExceptionCode.INDEX_WRITE_FAILURE.getErrorCode()) { |
| // For an index write failure, the data table write succeeded, |
| // so when we retry we need to set REPLAY_WRITES |
| for (Mutation mutation : localRegionMutations) { |
| if (PhoenixIndexMetaData.isIndexRebuild(mutation.getAttributesMap())) { |
| mutation.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, |
| BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES); |
| } else { |
| mutation.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, |
| BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES); |
| } |
| // use the server timestamp for index write retries |
| PhoenixKeyValueUtil.setTimestamp(mutation, serverTimestamp); |
| } |
| IndexWriteException iwe = PhoenixIndexFailurePolicy.getIndexWriteException(inferredE); |
| try (PhoenixConnection conn = |
| QueryUtil.getConnectionOnServer(indexWriteConfig) |
| .unwrap(PhoenixConnection.class)) { |
| PhoenixIndexFailurePolicy.doBatchWithRetries(mutateCommand, iwe, conn, |
| indexWriteProps); |
| } catch (Exception e) { |
| throw new DoNotRetryIOException(e); |
| } |
| } else { |
| throw origIOE; |
| } |
| } |
| |
| private void separateLocalAndRemoteMutations(Table targetHTable, Region region, List<Mutation> mutations, |
| List<Mutation> localRegionMutations, List<Mutation> remoteRegionMutations, |
| boolean isPKChanging){ |
| boolean areMutationsInSameTable = areMutationsInSameTable(targetHTable, region); |
| //if we're writing to the same table, but the PK can change, that means that some |
| //mutations might be in our current region, and others in a different one. |
| if (areMutationsInSameTable && isPKChanging) { |
| RegionInfo regionInfo = region.getRegionInfo(); |
| for (Mutation mutation : mutations){ |
| if (regionInfo.containsRow(mutation.getRow())){ |
| localRegionMutations.add(mutation); |
| } else { |
| remoteRegionMutations.add(mutation); |
| } |
| } |
| } else if (areMutationsInSameTable && !isPKChanging) { |
| localRegionMutations.addAll(mutations); |
| } else { |
| remoteRegionMutations.addAll(mutations); |
| } |
| } |
| |
| private boolean areMutationsInSameTable(Table targetHTable, Region region) { |
| return (targetHTable == null || Bytes.compareTo(targetHTable.getName().getName(), |
| region.getTableDescriptor().getTableName().getName()) == 0); |
| } |
| |
| @Override |
| public InternalScanner preCompact( |
| org.apache.hadoop.hbase.coprocessor.ObserverContext<RegionCoprocessorEnvironment> c, Store store, |
| InternalScanner scanner, ScanType scanType, |
| org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker tracker, |
| CompactionRequest request) throws IOException { |
| if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) { |
| final TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable(); |
| // Compaction and split upcalls run with the effective user context of the requesting user. |
| // This will lead to failure of cross cluster RPC if the effective user is not |
| // the login user. Switch to the login user context to ensure we have the expected |
| // security context. |
| return User.runAsLoginUser( |
| new PrivilegedExceptionAction<InternalScanner>() { |
| @Override |
| public InternalScanner run() throws Exception { |
| InternalScanner internalScanner = scanner; |
| try { |
| long clientTimeStamp = EnvironmentEdgeManager.currentTimeMillis(); |
| DelegateRegionCoprocessorEnvironment compactionConfEnv = |
| new DelegateRegionCoprocessorEnvironment( |
| c.getEnvironment(), ConnectionType.COMPACTION_CONNECTION); |
| StatisticsCollector statisticsCollector = |
| StatisticsCollectorFactory.createStatisticsCollector( |
| compactionConfEnv, |
| table.getNameAsString(), |
| clientTimeStamp, |
| store.getColumnFamilyDescriptor().getName()); |
| statisticsCollector.init(); |
| internalScanner = statisticsCollector.createCompactionScanner(compactionConfEnv, store, scanner); |
| } catch (Exception e) { |
| // If we can't reach the stats table, don't interrupt the normal |
| // compaction operation, just log a warning. |
| if (LOGGER.isWarnEnabled()) { |
| LOGGER.warn("Unable to collect stats for " + table, e); |
| } |
| } |
| return internalScanner; |
| } |
| }); |
| } |
| return scanner; |
| } |
| |
| private static PTable deserializeTable(byte[] b) { |
| try { |
| PTableProtos.PTable ptableProto = PTableProtos.PTable.parseFrom(b); |
| return PTableImpl.createFromProto(ptableProto); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan, |
| final RegionCoprocessorEnvironment env) throws IOException { |
| |
| RegionScanner scanner = new IndexRebuildRegionScanner(innerScanner, region, scan, env, this); |
| return scanner; |
| } |
| |
| private RegionScanner collectStats(final RegionScanner innerScanner, StatisticsCollector stats, |
| final Region region, final Scan scan, Configuration config) throws IOException { |
| StatsCollectionCallable callable = |
| new StatsCollectionCallable(stats, region, innerScanner, config, scan); |
| byte[] asyncBytes = scan.getAttribute(BaseScannerRegionObserver.RUN_UPDATE_STATS_ASYNC_ATTRIB); |
| boolean async = false; |
| if (asyncBytes != null) { |
| async = Bytes.toBoolean(asyncBytes); |
| } |
| long rowCount = 0; // in case of async, we report 0 as number of rows updated |
| StatisticsCollectionRunTracker statsRunTracker = |
| StatisticsCollectionRunTracker.getInstance(config); |
| final boolean runUpdateStats = statsRunTracker.addUpdateStatsCommandRegion(region.getRegionInfo(),scan.getFamilyMap().keySet()); |
| if (runUpdateStats) { |
| if (!async) { |
| rowCount = callable.call(); |
| } else { |
| statsRunTracker.runTask(callable); |
| } |
| } else { |
| rowCount = CONCURRENT_UPDATE_STATS_ROW_COUNT; |
| LOGGER.info("UPDATE STATISTICS didn't run because another UPDATE STATISTICS command was already running on the region " |
| + region.getRegionInfo().getRegionNameAsString()); |
| } |
| byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount)); |
| final Cell aggKeyValue = |
| PhoenixKeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, |
| SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length); |
| RegionScanner scanner = new BaseRegionScanner(innerScanner) { |
| @Override |
| public RegionInfo getRegionInfo() { |
| return region.getRegionInfo(); |
| } |
| |
| @Override |
| public boolean isFilterDone() { |
| return true; |
| } |
| |
| @Override |
| public void close() throws IOException { |
| // If we ran/scheduled StatsCollectionCallable the delegate |
| // scanner is closed there. Otherwise close it here. |
| if (!runUpdateStats) { |
| super.close(); |
| } |
| } |
| |
| @Override |
| public boolean next(List<Cell> results) throws IOException { |
| results.add(aggKeyValue); |
| return false; |
| } |
| |
| @Override |
| public long getMaxResultSize() { |
| return scan.getMaxResultSize(); |
| } |
| }; |
| return scanner; |
| } |
| |
| /** |
| * |
| * Callable to encapsulate the collection of stats triggered by |
| * UPDATE STATISTICS command. |
| * |
| * Package private for tests. |
| */ |
| static class StatsCollectionCallable implements Callable<Long> { |
| private final StatisticsCollector statsCollector; |
| private final Region region; |
| private final RegionScanner innerScanner; |
| private final Configuration config; |
| private final Scan scan; |
| |
| StatsCollectionCallable(StatisticsCollector s, Region r, RegionScanner rs, |
| Configuration config, Scan scan) { |
| this.statsCollector = s; |
| this.region = r; |
| this.innerScanner = rs; |
| this.config = config; |
| this.scan = scan; |
| } |
| |
| @Override |
| public Long call() throws IOException { |
| return collectStatsInternal(); |
| } |
| |
| private boolean areStatsBeingCollectedViaCompaction() { |
| return StatisticsCollectionRunTracker.getInstance(config) |
| .areStatsBeingCollectedOnCompaction(region.getRegionInfo()); |
| } |
| |
| private long collectStatsInternal() throws IOException { |
| long startTime = EnvironmentEdgeManager.currentTimeMillis(); |
| region.startRegionOperation(); |
| boolean hasMore = false; |
| boolean noErrors = false; |
| boolean compactionRunning = areStatsBeingCollectedViaCompaction(); |
| long rowCount = 0; |
| try { |
| if (!compactionRunning) { |
| statsCollector.init(); |
| synchronized (innerScanner) { |
| do { |
| List<Cell> results = new ArrayList<Cell>(); |
| hasMore = innerScanner.nextRaw(results); |
| statsCollector.collectStatistics(results); |
| rowCount++; |
| compactionRunning = areStatsBeingCollectedViaCompaction(); |
| } while (hasMore && !compactionRunning); |
| noErrors = true; |
| } |
| } |
| return compactionRunning ? COMPACTION_UPDATE_STATS_ROW_COUNT : rowCount; |
| } catch (IOException e) { |
| LOGGER.error("IOException in update stats: " + Throwables.getStackTraceAsString(e)); |
| throw e; |
| } finally { |
| try { |
| if (noErrors && !compactionRunning) { |
| statsCollector.updateStatistics(region, scan); |
| LOGGER.info("UPDATE STATISTICS finished successfully for scanner: " |
| + innerScanner + ". Number of rows scanned: " + rowCount |
| + ". Time: " + (EnvironmentEdgeManager.currentTimeMillis() - startTime)); |
| } |
| if (compactionRunning) { |
| LOGGER.info("UPDATE STATISTICS stopped in between because major compaction was running for region " |
| + region.getRegionInfo().getRegionNameAsString()); |
| } |
| } finally { |
| try { |
| StatisticsCollectionRunTracker.getInstance(config).removeUpdateStatsCommandRegion(region.getRegionInfo(),scan.getFamilyMap().keySet()); |
| statsCollector.close(); |
| } finally { |
| try { |
| innerScanner.close(); |
| } finally { |
| region.closeRegionOperation(); |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| private static List<Expression> deserializeExpressions(byte[] b) { |
| ByteArrayInputStream stream = new ByteArrayInputStream(b); |
| try { |
| DataInputStream input = new DataInputStream(stream); |
| int size = WritableUtils.readVInt(input); |
| List<Expression> selectExpressions = Lists.newArrayListWithExpectedSize(size); |
| for (int i = 0; i < size; i++) { |
| ExpressionType type = ExpressionType.values()[WritableUtils.readVInt(input)]; |
| Expression selectExpression = type.newInstance(); |
| selectExpression.readFields(input); |
| selectExpressions.add(selectExpression); |
| } |
| return selectExpressions; |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } finally { |
| try { |
| stream.close(); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| |
| public static byte[] serialize(PTable projectedTable) { |
| PTableProtos.PTable ptableProto = PTableImpl.toProto(projectedTable); |
| return ptableProto.toByteArray(); |
| } |
| |
| public static byte[] serialize(List<Expression> selectExpressions) { |
| ByteArrayOutputStream stream = new ByteArrayOutputStream(); |
| try { |
| DataOutputStream output = new DataOutputStream(stream); |
| WritableUtils.writeVInt(output, selectExpressions.size()); |
| for (int i = 0; i < selectExpressions.size(); i++) { |
| Expression expression = selectExpressions.get(i); |
| WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal()); |
| expression.write(output); |
| } |
| return stream.toByteArray(); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } finally { |
| try { |
| stream.close(); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| |
| @Override |
| public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> c, |
| List<Pair<byte[], String>> familyPaths) throws IOException { |
| // Don't allow bulkload if operations need read and write to same region are going on in the |
| // the coprocessors to avoid dead lock scenario. See PHOENIX-3111. |
| synchronized (lock) { |
| if (scansReferenceCount > 0) { |
| throw new DoNotRetryIOException("Operations like local index building/delete/upsert select" |
| + " might be going on so not allowing to bulkload."); |
| } |
| } |
| } |
| |
| @Override |
| public void preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested) |
| throws IOException { |
| synchronized (lock) { |
| isRegionClosingOrSplitting = true; |
| while (scansReferenceCount > 0) { |
| try { |
| lock.wait(1000); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| } |
| |
| @Override |
| protected boolean isRegionObserverFor(Scan scan) { |
| return scan.getAttribute(BaseScannerRegionObserver.UNGROUPED_AGG) != null; |
| } |
| |
| @Override |
| public void preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store, |
| ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker, |
| final CompactionRequest request) throws IOException { |
| // Compaction and split upcalls run with the effective user context of the requesting user. |
| // This will lead to failure of cross cluster RPC if the effective user is not |
| // the login user. Switch to the login user context to ensure we have the expected |
| // security context. |
| final String fullTableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString(); |
| // since we will make a call to syscat, do nothing if we are compacting syscat itself |
| if (request.isMajor() && !PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME.equals(fullTableName)) { |
| User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { |
| @Override |
| public Void run() throws Exception { |
| // If the index is disabled, keep the deleted cells so the rebuild doesn't corrupt the index |
| try (PhoenixConnection conn = |
| QueryUtil.getConnectionOnServer(compactionConfig).unwrap(PhoenixConnection.class)) { |
| PTable table = PhoenixRuntime.getTableNoCache(conn, fullTableName); |
| List<PTable> indexes = PTableType.INDEX.equals(table.getType()) ? Lists.newArrayList(table) : table.getIndexes(); |
| // FIXME need to handle views and indexes on views as well |
| for (PTable index : indexes) { |
| if (index.getIndexDisableTimestamp() != 0) { |
| LOGGER.info( |
| "Modifying major compaction scanner to retain deleted cells for a table with disabled index: " |
| + fullTableName); |
| options.setKeepDeletedCells(KeepDeletedCells.TRUE); |
| options.readAllVersions(); |
| options.setTTL(Long.MAX_VALUE); |
| } |
| } |
| } catch (Exception e) { |
| if (e instanceof TableNotFoundException) { |
| LOGGER.debug("Ignoring HBase table that is not a Phoenix table: " + fullTableName); |
| // non-Phoenix HBase tables won't be found, do nothing |
| } else { |
| LOGGER.error("Unable to modify compaction scanner to retain deleted cells for a table with disabled Index; " |
| + fullTableName, |
| e); |
| } |
| } |
| return null; |
| } |
| }); |
| } |
| } |
| } |