blob: b5d164d4efc779370e71ed54a4d8b5c5bdc1c076 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.omid.transaction;
import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.hbase.HBaseCommitTable;
import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.omid.HBaseShims;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.CompactorScanner;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionConnectionFactory;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.COMMIT_TABLE_NAME_KEY;
/**
* Garbage collector for stale data: triggered upon HBase
* compactions, it removes data from uncommitted transactions
* older than the low watermark using a special scanner
*/
public class OmidCompactor extends BaseRegionObserver {
private static final Logger LOG = LoggerFactory.getLogger(OmidCompactor.class);
private static final String HBASE_RETAIN_NON_TRANSACTIONALLY_DELETED_CELLS_KEY
= "omid.hbase.compactor.retain.tombstones";
private static final boolean HBASE_RETAIN_NON_TRANSACTIONALLY_DELETED_CELLS_DEFAULT = true;
final static String OMID_COMPACTABLE_CF_FLAG = "OMID_ENABLED";
private boolean enableCompactorForAllFamilies = false;
private HBaseCommitTableConfig commitTableConf = null;
private RegionCoprocessorEnvironment env = null;
@VisibleForTesting
CommitTable.Client commitTableClient;
// When compacting, if a cell which has been marked by HBase as Delete or
// Delete Family (that is, non-transactionally deleted), we allow the user
// to decide what the compactor scanner should do with it: retain it or not
// If retained, the deleted cell will appear after a minor compaction, but
// will be deleted anyways after a major one
private boolean retainNonTransactionallyDeletedCells;
private Connection connection;
public OmidCompactor() {
this(false);
}
public OmidCompactor(boolean enableCompactorForAllFamilies) {
LOG.info("Compactor coprocessor initialized");
this.enableCompactorForAllFamilies = enableCompactorForAllFamilies;
}
@Override
public void start(CoprocessorEnvironment env) throws IOException {
LOG.info("Starting compactor coprocessor");
commitTableConf = new HBaseCommitTableConfig();
String commitTableName = env.getConfiguration().get(COMMIT_TABLE_NAME_KEY);
if (commitTableName != null) {
commitTableConf.setTableName(commitTableName);
}
connection = RegionConnectionFactory
.getConnection(RegionConnectionFactory.ConnectionType.COMPACTION_CONNECTION, (RegionCoprocessorEnvironment) env);
commitTableClient = new HBaseCommitTable(connection, commitTableConf).getClient();
retainNonTransactionallyDeletedCells =
env.getConfiguration().getBoolean(HBASE_RETAIN_NON_TRANSACTIONALLY_DELETED_CELLS_KEY,
HBASE_RETAIN_NON_TRANSACTIONALLY_DELETED_CELLS_DEFAULT);
LOG.info("Compactor coprocessor started");
}
@Override
public void stop(CoprocessorEnvironment e) throws IOException {
LOG.info("Stopping compactor coprocessor");
LOG.info("Compactor coprocessor stopped");
}
@Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> env,
Store store,
InternalScanner scanner,
ScanType scanType,
CompactionRequest request) throws IOException {
boolean omidCompactable;
try {
if (enableCompactorForAllFamilies) {
omidCompactable = true;
} else {
omidCompactable = HBaseShims.OmidCompactionEnabled(env, store, OMID_COMPACTABLE_CF_FLAG);
}
// only column families tagged as compactable are compacted
// with omid compactor
if (!omidCompactable) {
return scanner;
} else {
boolean isMajorCompaction = request.isMajor();
return new CompactorScanner(env,
scanner,
commitTableClient,
isMajorCompaction,
retainNonTransactionallyDeletedCells);
}
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw new DoNotRetryIOException(e);
}
}
}