| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.replication; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Iterator; |
| import java.util.List; |
| import org.apache.hadoop.hbase.Cell; |
| import org.apache.hadoop.hbase.CellBuilderType; |
| import org.apache.hadoop.hbase.CellUtil; |
| import org.apache.hadoop.hbase.ExtendedCellBuilder; |
| import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; |
| import org.apache.hadoop.hbase.wal.WALEdit; |
| import org.apache.yetus.audience.InterfaceAudience; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hbase.thirdparty.com.google.common.base.Predicate; |
| |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; |
| |
| @InterfaceAudience.Private |
| public class BulkLoadCellFilter { |
| private static final Logger LOG = LoggerFactory.getLogger(BulkLoadCellFilter.class); |
| |
| private final ExtendedCellBuilder cellBuilder = ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY); |
| /** |
| * Filters the bulk load cell using the supplied predicate. |
| * @param cell The WAL cell to filter. |
| * @param famPredicate Returns true of given family should be removed. |
| * @return The filtered cell. |
| */ |
| public Cell filterCell(Cell cell, Predicate<byte[]> famPredicate) { |
| byte[] fam; |
| BulkLoadDescriptor bld = null; |
| try { |
| bld = WALEdit.getBulkLoadDescriptor(cell); |
| } catch (IOException e) { |
| LOG.warn("Failed to get bulk load events information from the WAL file.", e); |
| return cell; |
| } |
| List<StoreDescriptor> storesList = bld.getStoresList(); |
| // Copy the StoreDescriptor list and update it as storesList is a unmodifiableList |
| List<StoreDescriptor> copiedStoresList = new ArrayList<>(storesList); |
| Iterator<StoreDescriptor> copiedStoresListIterator = copiedStoresList.iterator(); |
| boolean anyStoreRemoved = false; |
| while (copiedStoresListIterator.hasNext()) { |
| StoreDescriptor sd = copiedStoresListIterator.next(); |
| fam = sd.getFamilyName().toByteArray(); |
| if (famPredicate.apply(fam)) { |
| copiedStoresListIterator.remove(); |
| anyStoreRemoved = true; |
| } |
| } |
| |
| if (!anyStoreRemoved) { |
| return cell; |
| } else if (copiedStoresList.isEmpty()) { |
| return null; |
| } |
| BulkLoadDescriptor.Builder newDesc = |
| BulkLoadDescriptor.newBuilder().setTableName(bld.getTableName()) |
| .setEncodedRegionName(bld.getEncodedRegionName()) |
| .setBulkloadSeqNum(bld.getBulkloadSeqNum()); |
| newDesc.addAllStores(copiedStoresList); |
| BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build(); |
| return cellBuilder.clear() |
| .setRow(CellUtil.cloneRow(cell)) |
| .setFamily(WALEdit.METAFAMILY) |
| .setQualifier(WALEdit.BULK_LOAD) |
| .setTimestamp(cell.getTimestamp()) |
| .setType(cell.getTypeByte()) |
| .setValue(newBulkLoadDescriptor.toByteArray()) |
| .build(); |
| } |
| } |