blob: 28a83dd25baf30d2d23f7414be11edee615cb539 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WAL.Entry;
/**
* Keeps KVs that are scoped other than local
*/
@InterfaceAudience.Private
public class ScopeWALEntryFilter implements WALEntryFilter {
private static final Log LOG = LogFactory.getLog(ScopeWALEntryFilter.class);
@Override
public Entry filter(Entry entry) {
NavigableMap<byte[], Integer> scopes = entry.getKey().getReplicationScopes();
if (scopes == null || scopes.isEmpty()) {
return null;
}
ArrayList<Cell> cells = entry.getEdit().getCells();
int size = cells.size();
byte[] fam;
for (int i = size - 1; i >= 0; i--) {
Cell cell = cells.get(i);
// If a bulk load entry has a scope then that means user has enabled replication for bulk load
// hfiles.
// TODO There is a similar logic in TableCfWALEntryFilter but data structures are different so
// cannot refactor into one now, can revisit and see if any way to unify them.
if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
Cell filteredBulkLoadEntryCell = filterBulkLoadEntries(scopes, cell);
if (filteredBulkLoadEntryCell != null) {
cells.set(i, filteredBulkLoadEntryCell);
} else {
cells.remove(i);
}
} else {
// The scope will be null or empty if
// there's nothing to replicate in that WALEdit
fam = CellUtil.cloneFamily(cell);
if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) {
cells.remove(i);
}
}
}
if (cells.size() < size / 2) {
cells.trimToSize();
}
return entry;
}
private Cell filterBulkLoadEntries(NavigableMap<byte[], Integer> scopes, Cell cell) {
byte[] fam;
BulkLoadDescriptor bld = null;
try {
bld = WALEdit.getBulkLoadDescriptor(cell);
} catch (IOException e) {
LOG.warn("Failed to get bulk load events information from the WAL file.", e);
return cell;
}
List<StoreDescriptor> storesList = bld.getStoresList();
// Copy the StoreDescriptor list and update it as storesList is a unmodifiableList
List<StoreDescriptor> copiedStoresList = new ArrayList<StoreDescriptor>(storesList);
Iterator<StoreDescriptor> copiedStoresListIterator = copiedStoresList.iterator();
boolean anyStoreRemoved = false;
while (copiedStoresListIterator.hasNext()) {
StoreDescriptor sd = copiedStoresListIterator.next();
fam = sd.getFamilyName().toByteArray();
if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) {
copiedStoresListIterator.remove();
anyStoreRemoved = true;
}
}
if (!anyStoreRemoved) {
return cell;
} else if (copiedStoresList.isEmpty()) {
return null;
}
BulkLoadDescriptor.Builder newDesc =
BulkLoadDescriptor.newBuilder().setTableName(bld.getTableName())
.setEncodedRegionName(bld.getEncodedRegionName())
.setBulkloadSeqNum(bld.getBulkloadSeqNum());
newDesc.addAllStores(copiedStoresList);
BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build();
return CellUtil.createCell(CellUtil.cloneRow(cell), WALEdit.METAFAMILY, WALEdit.BULK_LOAD,
cell.getTimestamp(), cell.getTypeByte(), newBulkLoadDescriptor.toByteArray());
}
}