| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.regionserver; |
| |
| import static org.apache.hadoop.hbase.regionserver.TestHRegion.assertGet; |
| import static org.apache.hadoop.hbase.regionserver.TestHRegion.putData; |
| import static org.apache.hadoop.hbase.regionserver.TestHRegion.verifyData; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| import static org.mockito.ArgumentMatchers.any; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.spy; |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.when; |
| |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Random; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hbase.Cell; |
| import org.apache.hadoop.hbase.CellBuilderType; |
| import org.apache.hadoop.hbase.CellUtil; |
| import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; |
| import org.apache.hadoop.hbase.HBaseClassTestRule; |
| import org.apache.hadoop.hbase.HBaseTestingUtility; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.KeyValue; |
| import org.apache.hadoop.hbase.ServerName; |
| import org.apache.hadoop.hbase.TableName; |
| import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; |
| import org.apache.hadoop.hbase.client.Durability; |
| import org.apache.hadoop.hbase.client.Get; |
| import org.apache.hadoop.hbase.client.Put; |
| import org.apache.hadoop.hbase.client.RegionInfo; |
| import org.apache.hadoop.hbase.client.RegionInfoBuilder; |
| import org.apache.hadoop.hbase.client.TableDescriptor; |
| import org.apache.hadoop.hbase.client.TableDescriptorBuilder; |
| import org.apache.hadoop.hbase.executor.ExecutorService; |
| import org.apache.hadoop.hbase.io.hfile.HFile; |
| import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; |
| import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl; |
| import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult; |
| import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; |
| import org.apache.hadoop.hbase.testclassification.LargeTests; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; |
| import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; |
| import org.apache.hadoop.hbase.util.FSUtils; |
| import org.apache.hadoop.hbase.util.Pair; |
| import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; |
| import org.apache.hadoop.hbase.wal.WAL; |
| import org.apache.hadoop.hbase.wal.WALEdit; |
| import org.apache.hadoop.hbase.wal.WALFactory; |
| import org.apache.hadoop.hbase.wal.WALKeyImpl; |
| import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay; |
| import org.apache.hadoop.util.StringUtils; |
| import org.junit.After; |
| import org.junit.AfterClass; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.ClassRule; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| import org.junit.rules.TestName; |
| import org.mockito.Mockito; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hbase.thirdparty.com.google.common.collect.Lists; |
| import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; |
| |
| import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor; |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType; |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; |
| |
| /** |
| * Tests of HRegion methods for replaying flush, compaction, region open, etc events for secondary |
| * region replicas |
| */ |
| @Category(LargeTests.class) |
| public class TestHRegionReplayEvents { |
| |
| @ClassRule |
| public static final HBaseClassTestRule CLASS_RULE = |
| HBaseClassTestRule.forClass(TestHRegionReplayEvents.class); |
| |
| private static final Logger LOG = LoggerFactory.getLogger(TestHRegion.class); |
| @Rule public TestName name = new TestName(); |
| |
| private static HBaseTestingUtility TEST_UTIL; |
| |
| public static Configuration CONF; |
| private String dir; |
| |
| private byte[][] families = new byte[][] { |
| Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")}; |
| |
| // Test names |
| protected byte[] tableName; |
| protected String method; |
| protected final byte[] row = Bytes.toBytes("rowA"); |
| protected final byte[] row2 = Bytes.toBytes("rowB"); |
| protected byte[] cq = Bytes.toBytes("cq"); |
| |
| // per test fields |
| private Path rootDir; |
| private TableDescriptor htd; |
| private RegionServerServices rss; |
| private RegionInfo primaryHri, secondaryHri; |
| private HRegion primaryRegion, secondaryRegion; |
| private WAL walPrimary, walSecondary; |
| private WAL.Reader reader; |
| |
| @BeforeClass |
| public static void setUpBeforeClass() throws Exception { |
| TEST_UTIL = new HBaseTestingUtility(); |
| TEST_UTIL.startMiniDFSCluster(1); |
| } |
| |
| @AfterClass |
| public static void tearDownAfterClass() throws Exception { |
| LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir()); |
| TEST_UTIL.cleanupTestDir(); |
| TEST_UTIL.shutdownMiniDFSCluster(); |
| } |
| |
| @Before |
| public void setUp() throws Exception { |
| CONF = TEST_UTIL.getConfiguration(); |
| dir = TEST_UTIL.getDataTestDir("TestHRegionReplayEvents").toString(); |
| method = name.getMethodName(); |
| tableName = Bytes.toBytes(name.getMethodName()); |
| rootDir = new Path(dir + method); |
| TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString()); |
| method = name.getMethodName(); |
| TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TableName.valueOf(method)); |
| for (byte[] family : families) { |
| builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)); |
| } |
| htd = builder.build(); |
| |
| long time = System.currentTimeMillis(); |
| ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); |
| primaryHri = |
| RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(0).build(); |
| secondaryHri = |
| RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(1).build(); |
| |
| WALFactory wals = TestHRegion.createWALFactory(CONF, rootDir); |
| walPrimary = wals.getWAL(primaryHri); |
| walSecondary = wals.getWAL(secondaryHri); |
| |
| rss = mock(RegionServerServices.class); |
| when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1)); |
| when(rss.getConfiguration()).thenReturn(CONF); |
| when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(CONF)); |
| String string = org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER |
| .toString(); |
| ExecutorService es = new ExecutorService(string); |
| es.startExecutorService( |
| string+"-"+string, 1); |
| when(rss.getExecutorService()).thenReturn(es); |
| primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary); |
| primaryRegion.close(); |
| List<HRegion> regions = new ArrayList<>(); |
| regions.add(primaryRegion); |
| Mockito.doReturn(regions).when(rss).getRegions(); |
| |
| primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); |
| secondaryRegion = HRegion.openHRegion(secondaryHri, htd, null, CONF, rss, null); |
| |
| reader = null; |
| } |
| |
| @After |
| public void tearDown() throws Exception { |
| if (reader != null) { |
| reader.close(); |
| } |
| |
| if (primaryRegion != null) { |
| HBaseTestingUtility.closeRegionAndWAL(primaryRegion); |
| } |
| if (secondaryRegion != null) { |
| HBaseTestingUtility.closeRegionAndWAL(secondaryRegion); |
| } |
| |
| EnvironmentEdgeManagerTestHelper.reset(); |
| } |
| |
| String getName() { |
| return name.getMethodName(); |
| } |
| |
| // Some of the test cases are as follows: |
| // 1. replay flush start marker again |
| // 2. replay flush with smaller seqId than what is there in memstore snapshot |
| // 3. replay flush with larger seqId than what is there in memstore snapshot |
| // 4. replay flush commit without flush prepare. non droppable memstore |
| // 5. replay flush commit without flush prepare. droppable memstore |
| // 6. replay open region event |
| // 7. replay open region event after flush start |
| // 8. replay flush form an earlier seqId (test ignoring seqIds) |
| // 9. start flush does not prevent region from closing. |
| |
| @Test |
| public void testRegionReplicaSecondaryCannotFlush() throws IOException { |
| // load some data and flush ensure that the secondary replica will not execute the flush |
| |
| // load some data to secondary by replaying |
| putDataByReplay(secondaryRegion, 0, 1000, cq, families); |
| |
| verifyData(secondaryRegion, 0, 1000, cq, families); |
| |
| // flush region |
| FlushResultImpl flush = (FlushResultImpl)secondaryRegion.flush(true); |
| assertEquals(FlushResultImpl.Result.CANNOT_FLUSH, flush.result); |
| |
| verifyData(secondaryRegion, 0, 1000, cq, families); |
| |
| // close the region, and inspect that it has not flushed |
| Map<byte[], List<HStoreFile>> files = secondaryRegion.close(false); |
| // assert that there are no files (due to flush) |
| for (List<HStoreFile> f : files.values()) { |
| assertTrue(f.isEmpty()); |
| } |
| } |
| |
| /** |
| * Tests a case where we replay only a flush start marker, then the region is closed. This region |
| * should not block indefinitely |
| */ |
| @Test |
| public void testOnlyReplayingFlushStartDoesNotHoldUpRegionClose() throws IOException { |
| // load some data to primary and flush |
| int start = 0; |
| LOG.info("-- Writing some data to primary from " + start + " to " + (start+100)); |
| putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families); |
| LOG.info("-- Flushing primary, creating 3 files for 3 stores"); |
| primaryRegion.flush(true); |
| |
| // now replay the edits and the flush marker |
| reader = createWALReaderForPrimary(); |
| |
| LOG.info("-- Replaying edits and flush events in secondary"); |
| while (true) { |
| WAL.Entry entry = reader.next(); |
| if (entry == null) { |
| break; |
| } |
| FlushDescriptor flushDesc |
| = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); |
| if (flushDesc != null) { |
| if (flushDesc.getAction() == FlushAction.START_FLUSH) { |
| LOG.info("-- Replaying flush start in secondary"); |
| secondaryRegion.replayWALFlushStartMarker(flushDesc); |
| } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { |
| LOG.info("-- NOT Replaying flush commit in secondary"); |
| } |
| } else { |
| replayEdit(secondaryRegion, entry); |
| } |
| } |
| |
| assertTrue(rss.getRegionServerAccounting().getGlobalMemStoreDataSize() > 0); |
| // now close the region which should not cause hold because of un-committed flush |
| secondaryRegion.close(); |
| |
| // verify that the memstore size is back to what it was |
| assertEquals(0, rss.getRegionServerAccounting().getGlobalMemStoreDataSize()); |
| } |
| |
| static int replayEdit(HRegion region, WAL.Entry entry) throws IOException { |
| if (WALEdit.isMetaEditFamily(entry.getEdit().getCells().get(0))) { |
| return 0; // handled elsewhere |
| } |
| Put put = new Put(CellUtil.cloneRow(entry.getEdit().getCells().get(0))); |
| for (Cell cell : entry.getEdit().getCells()) put.add(cell); |
| put.setDurability(Durability.SKIP_WAL); |
| MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0); |
| region.batchReplay(new MutationReplay[] {mutation}, |
| entry.getKey().getSequenceId()); |
| return Integer.parseInt(Bytes.toString(put.getRow())); |
| } |
| |
| WAL.Reader createWALReaderForPrimary() throws FileNotFoundException, IOException { |
| return WALFactory.createReader(TEST_UTIL.getTestFileSystem(), |
| AbstractFSWALProvider.getCurrentFileName(walPrimary), |
| TEST_UTIL.getConfiguration()); |
| } |
| |
| @Test |
| public void testBatchReplayWithMultipleNonces() throws IOException { |
| try { |
| MutationReplay[] mutations = new MutationReplay[100]; |
| for (int i = 0; i < 100; i++) { |
| Put put = new Put(Bytes.toBytes(i)); |
| put.setDurability(Durability.SYNC_WAL); |
| for (byte[] familly : this.families) { |
| put.addColumn(familly, this.cq, null); |
| long nonceNum = i / 10; |
| mutations[i] = new MutationReplay(MutationType.PUT, put, nonceNum, nonceNum); |
| } |
| } |
| primaryRegion.batchReplay(mutations, 20); |
| } catch (Exception e) { |
| String msg = "Error while replay of batch with multiple nonces. "; |
| LOG.error(msg, e); |
| fail(msg + e.getMessage()); |
| } |
| } |
| |
| @Test |
| public void testReplayFlushesAndCompactions() throws IOException { |
| // initiate a secondary region with some data. |
| |
| // load some data to primary and flush. 3 flushes and some more unflushed data |
| putDataWithFlushes(primaryRegion, 100, 300, 100); |
| |
| // compaction from primary |
| LOG.info("-- Compacting primary, only 1 store"); |
| primaryRegion.compactStore(Bytes.toBytes("cf1"), |
| NoLimitThroughputController.INSTANCE); |
| |
| // now replay the edits and the flush marker |
| reader = createWALReaderForPrimary(); |
| |
| LOG.info("-- Replaying edits and flush events in secondary"); |
| int lastReplayed = 0; |
| int expectedStoreFileCount = 0; |
| while (true) { |
| WAL.Entry entry = reader.next(); |
| if (entry == null) { |
| break; |
| } |
| FlushDescriptor flushDesc |
| = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); |
| CompactionDescriptor compactionDesc |
| = WALEdit.getCompaction(entry.getEdit().getCells().get(0)); |
| if (flushDesc != null) { |
| // first verify that everything is replayed and visible before flush event replay |
| verifyData(secondaryRegion, 0, lastReplayed, cq, families); |
| HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); |
| long storeMemstoreSize = store.getMemStoreSize().getHeapSize(); |
| long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); |
| MemStoreSize mss = store.getFlushableSize(); |
| long storeSize = store.getSize(); |
| long storeSizeUncompressed = store.getStoreSizeUncompressed(); |
| if (flushDesc.getAction() == FlushAction.START_FLUSH) { |
| LOG.info("-- Replaying flush start in secondary"); |
| PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(flushDesc); |
| assertNull(result.result); |
| assertEquals(result.flushOpSeqId, flushDesc.getFlushSequenceNumber()); |
| |
| // assert that the store memstore is smaller now |
| long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize(); |
| LOG.info("Memstore size reduced by:" |
| + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize)); |
| assertTrue(storeMemstoreSize > newStoreMemstoreSize); |
| |
| } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { |
| LOG.info("-- Replaying flush commit in secondary"); |
| secondaryRegion.replayWALFlushCommitMarker(flushDesc); |
| |
| // assert that the flush files are picked |
| expectedStoreFileCount++; |
| for (HStore s : secondaryRegion.getStores()) { |
| assertEquals(expectedStoreFileCount, s.getStorefilesCount()); |
| } |
| MemStoreSize newMss = store.getFlushableSize(); |
| assertTrue(mss.getHeapSize() > newMss.getHeapSize()); |
| |
| // assert that the region memstore is smaller now |
| long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); |
| assertTrue(regionMemstoreSize > newRegionMemstoreSize); |
| |
| // assert that the store sizes are bigger |
| assertTrue(store.getSize() > storeSize); |
| assertTrue(store.getStoreSizeUncompressed() > storeSizeUncompressed); |
| assertEquals(store.getSize(), store.getStorefilesSize()); |
| } |
| // after replay verify that everything is still visible |
| verifyData(secondaryRegion, 0, lastReplayed+1, cq, families); |
| } else if (compactionDesc != null) { |
| secondaryRegion.replayWALCompactionMarker(compactionDesc, true, false, Long.MAX_VALUE); |
| |
| // assert that the compaction is applied |
| for (HStore store : secondaryRegion.getStores()) { |
| if (store.getColumnFamilyName().equals("cf1")) { |
| assertEquals(1, store.getStorefilesCount()); |
| } else { |
| assertEquals(expectedStoreFileCount, store.getStorefilesCount()); |
| } |
| } |
| } else { |
| lastReplayed = replayEdit(secondaryRegion, entry); |
| } |
| } |
| |
| assertEquals(400-1, lastReplayed); |
| LOG.info("-- Verifying edits from secondary"); |
| verifyData(secondaryRegion, 0, 400, cq, families); |
| |
| LOG.info("-- Verifying edits from primary. Ensuring that files are not deleted"); |
| verifyData(primaryRegion, 0, lastReplayed, cq, families); |
| for (HStore store : primaryRegion.getStores()) { |
| if (store.getColumnFamilyName().equals("cf1")) { |
| assertEquals(1, store.getStorefilesCount()); |
| } else { |
| assertEquals(expectedStoreFileCount, store.getStorefilesCount()); |
| } |
| } |
| } |
| |
| /** |
| * Tests cases where we prepare a flush with some seqId and we receive other flush start markers |
| * equal to, greater or less than the previous flush start marker. |
| */ |
| @Test |
| public void testReplayFlushStartMarkers() throws IOException { |
| // load some data to primary and flush. 1 flush and some more unflushed data |
| putDataWithFlushes(primaryRegion, 100, 100, 100); |
| int numRows = 200; |
| |
| // now replay the edits and the flush marker |
| reader = createWALReaderForPrimary(); |
| |
| LOG.info("-- Replaying edits and flush events in secondary"); |
| |
| FlushDescriptor startFlushDesc = null; |
| |
| int lastReplayed = 0; |
| while (true) { |
| WAL.Entry entry = reader.next(); |
| if (entry == null) { |
| break; |
| } |
| FlushDescriptor flushDesc |
| = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); |
| if (flushDesc != null) { |
| // first verify that everything is replayed and visible before flush event replay |
| HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); |
| long storeMemstoreSize = store.getMemStoreSize().getHeapSize(); |
| long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); |
| MemStoreSize mss = store.getFlushableSize(); |
| |
| if (flushDesc.getAction() == FlushAction.START_FLUSH) { |
| startFlushDesc = flushDesc; |
| LOG.info("-- Replaying flush start in secondary"); |
| PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); |
| assertNull(result.result); |
| assertEquals(result.flushOpSeqId, startFlushDesc.getFlushSequenceNumber()); |
| assertTrue(regionMemstoreSize > 0); |
| assertTrue(mss.getHeapSize() > 0); |
| |
| // assert that the store memstore is smaller now |
| long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize(); |
| LOG.info("Memstore size reduced by:" |
| + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize)); |
| assertTrue(storeMemstoreSize > newStoreMemstoreSize); |
| verifyData(secondaryRegion, 0, lastReplayed+1, cq, families); |
| |
| } |
| // after replay verify that everything is still visible |
| verifyData(secondaryRegion, 0, lastReplayed+1, cq, families); |
| } else { |
| lastReplayed = replayEdit(secondaryRegion, entry); |
| } |
| } |
| |
| // at this point, there should be some data (rows 0-100) in memstore snapshot |
| // and some more data in memstores (rows 100-200) |
| |
| verifyData(secondaryRegion, 0, numRows, cq, families); |
| |
| // Test case 1: replay the same flush start marker again |
| LOG.info("-- Replaying same flush start in secondary again"); |
| PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); |
| assertNull(result); // this should return null. Ignoring the flush start marker |
| // assert that we still have prepared flush with the previous setup. |
| assertNotNull(secondaryRegion.getPrepareFlushResult()); |
| assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId, |
| startFlushDesc.getFlushSequenceNumber()); |
| assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty |
| verifyData(secondaryRegion, 0, numRows, cq, families); |
| |
| // Test case 2: replay a flush start marker with a smaller seqId |
| FlushDescriptor startFlushDescSmallerSeqId |
| = clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() - 50); |
| LOG.info("-- Replaying same flush start in secondary again " + startFlushDescSmallerSeqId); |
| result = secondaryRegion.replayWALFlushStartMarker(startFlushDescSmallerSeqId); |
| assertNull(result); // this should return null. Ignoring the flush start marker |
| // assert that we still have prepared flush with the previous setup. |
| assertNotNull(secondaryRegion.getPrepareFlushResult()); |
| assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId, |
| startFlushDesc.getFlushSequenceNumber()); |
| assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty |
| verifyData(secondaryRegion, 0, numRows, cq, families); |
| |
| // Test case 3: replay a flush start marker with a larger seqId |
| FlushDescriptor startFlushDescLargerSeqId |
| = clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() + 50); |
| LOG.info("-- Replaying same flush start in secondary again " + startFlushDescLargerSeqId); |
| result = secondaryRegion.replayWALFlushStartMarker(startFlushDescLargerSeqId); |
| assertNull(result); // this should return null. Ignoring the flush start marker |
| // assert that we still have prepared flush with the previous setup. |
| assertNotNull(secondaryRegion.getPrepareFlushResult()); |
| assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId, |
| startFlushDesc.getFlushSequenceNumber()); |
| assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty |
| verifyData(secondaryRegion, 0, numRows, cq, families); |
| |
| LOG.info("-- Verifying edits from secondary"); |
| verifyData(secondaryRegion, 0, numRows, cq, families); |
| |
| LOG.info("-- Verifying edits from primary."); |
| verifyData(primaryRegion, 0, numRows, cq, families); |
| } |
| |
| /** |
| * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker |
| * less than the previous flush start marker. |
| */ |
| @Test |
| public void testReplayFlushCommitMarkerSmallerThanFlushStartMarker() throws IOException { |
| // load some data to primary and flush. 2 flushes and some more unflushed data |
| putDataWithFlushes(primaryRegion, 100, 200, 100); |
| int numRows = 300; |
| |
| // now replay the edits and the flush marker |
| reader = createWALReaderForPrimary(); |
| |
| LOG.info("-- Replaying edits and flush events in secondary"); |
| FlushDescriptor startFlushDesc = null; |
| FlushDescriptor commitFlushDesc = null; |
| |
| int lastReplayed = 0; |
| while (true) { |
| System.out.println(lastReplayed); |
| WAL.Entry entry = reader.next(); |
| if (entry == null) { |
| break; |
| } |
| FlushDescriptor flushDesc |
| = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); |
| if (flushDesc != null) { |
| if (flushDesc.getAction() == FlushAction.START_FLUSH) { |
| // don't replay the first flush start marker, hold on to it, replay the second one |
| if (startFlushDesc == null) { |
| startFlushDesc = flushDesc; |
| } else { |
| LOG.info("-- Replaying flush start in secondary"); |
| startFlushDesc = flushDesc; |
| PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); |
| assertNull(result.result); |
| } |
| } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { |
| // do not replay any flush commit yet |
| if (commitFlushDesc == null) { |
| commitFlushDesc = flushDesc; // hold on to the first flush commit marker |
| } |
| } |
| // after replay verify that everything is still visible |
| verifyData(secondaryRegion, 0, lastReplayed+1, cq, families); |
| } else { |
| lastReplayed = replayEdit(secondaryRegion, entry); |
| } |
| } |
| |
| // at this point, there should be some data (rows 0-200) in memstore snapshot |
| // and some more data in memstores (rows 200-300) |
| verifyData(secondaryRegion, 0, numRows, cq, families); |
| |
| // no store files in the region |
| int expectedStoreFileCount = 0; |
| for (HStore s : secondaryRegion.getStores()) { |
| assertEquals(expectedStoreFileCount, s.getStorefilesCount()); |
| } |
| long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); |
| |
| // Test case 1: replay the a flush commit marker smaller than what we have prepared |
| LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START" |
| + startFlushDesc); |
| assertTrue(commitFlushDesc.getFlushSequenceNumber() < startFlushDesc.getFlushSequenceNumber()); |
| |
| LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc); |
| secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc); |
| |
| // assert that the flush files are picked |
| expectedStoreFileCount++; |
| for (HStore s : secondaryRegion.getStores()) { |
| assertEquals(expectedStoreFileCount, s.getStorefilesCount()); |
| } |
| HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); |
| MemStoreSize mss = store.getFlushableSize(); |
| assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped |
| |
| // assert that the region memstore is same as before |
| long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); |
| assertEquals(regionMemstoreSize, newRegionMemstoreSize); |
| |
| assertNotNull(secondaryRegion.getPrepareFlushResult()); // not dropped |
| |
| LOG.info("-- Verifying edits from secondary"); |
| verifyData(secondaryRegion, 0, numRows, cq, families); |
| |
| LOG.info("-- Verifying edits from primary."); |
| verifyData(primaryRegion, 0, numRows, cq, families); |
| } |
| |
| /** |
| * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker |
| * larger than the previous flush start marker. |
| */ |
| @Test |
| public void testReplayFlushCommitMarkerLargerThanFlushStartMarker() throws IOException { |
| // load some data to primary and flush. 1 flush and some more unflushed data |
| putDataWithFlushes(primaryRegion, 100, 100, 100); |
| int numRows = 200; |
| |
| // now replay the edits and the flush marker |
| reader = createWALReaderForPrimary(); |
| |
| LOG.info("-- Replaying edits and flush events in secondary"); |
| FlushDescriptor startFlushDesc = null; |
| FlushDescriptor commitFlushDesc = null; |
| |
| int lastReplayed = 0; |
| while (true) { |
| WAL.Entry entry = reader.next(); |
| if (entry == null) { |
| break; |
| } |
| FlushDescriptor flushDesc |
| = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); |
| if (flushDesc != null) { |
| if (flushDesc.getAction() == FlushAction.START_FLUSH) { |
| if (startFlushDesc == null) { |
| LOG.info("-- Replaying flush start in secondary"); |
| startFlushDesc = flushDesc; |
| PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); |
| assertNull(result.result); |
| } |
| } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { |
| // do not replay any flush commit yet |
| // hold on to the flush commit marker but simulate a larger |
| // flush commit seqId |
| commitFlushDesc = |
| FlushDescriptor.newBuilder(flushDesc) |
| .setFlushSequenceNumber(flushDesc.getFlushSequenceNumber() + 50) |
| .build(); |
| } |
| // after replay verify that everything is still visible |
| verifyData(secondaryRegion, 0, lastReplayed+1, cq, families); |
| } else { |
| lastReplayed = replayEdit(secondaryRegion, entry); |
| } |
| } |
| |
| // at this point, there should be some data (rows 0-100) in memstore snapshot |
| // and some more data in memstores (rows 100-200) |
| verifyData(secondaryRegion, 0, numRows, cq, families); |
| |
| // no store files in the region |
| int expectedStoreFileCount = 0; |
| for (HStore s : secondaryRegion.getStores()) { |
| assertEquals(expectedStoreFileCount, s.getStorefilesCount()); |
| } |
| long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); |
| |
| // Test case 1: replay the a flush commit marker larger than what we have prepared |
| LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START" |
| + startFlushDesc); |
| assertTrue(commitFlushDesc.getFlushSequenceNumber() > startFlushDesc.getFlushSequenceNumber()); |
| |
| LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc); |
| secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc); |
| |
| // assert that the flush files are picked |
| expectedStoreFileCount++; |
| for (HStore s : secondaryRegion.getStores()) { |
| assertEquals(expectedStoreFileCount, s.getStorefilesCount()); |
| } |
| HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); |
| MemStoreSize mss = store.getFlushableSize(); |
| assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped |
| |
| // assert that the region memstore is smaller than before, but not empty |
| long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); |
| assertTrue(newRegionMemstoreSize > 0); |
| assertTrue(regionMemstoreSize > newRegionMemstoreSize); |
| |
| assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped |
| |
| LOG.info("-- Verifying edits from secondary"); |
| verifyData(secondaryRegion, 0, numRows, cq, families); |
| |
| LOG.info("-- Verifying edits from primary."); |
| verifyData(primaryRegion, 0, numRows, cq, families); |
| } |
| |
| /** |
| * Tests the case where we receive a flush commit before receiving any flush prepare markers. |
| * The memstore edits should be dropped after the flush commit replay since they should be in |
| * flushed files |
| */ |
| @Test |
| public void testReplayFlushCommitMarkerWithoutFlushStartMarkerDroppableMemstore() |
| throws IOException { |
| testReplayFlushCommitMarkerWithoutFlushStartMarker(true); |
| } |
| |
| /** |
| * Tests the case where we receive a flush commit before receiving any flush prepare markers. |
| * The memstore edits should be not dropped after the flush commit replay since not every edit |
| * will be in flushed files (based on seqId) |
| */ |
| @Test |
| public void testReplayFlushCommitMarkerWithoutFlushStartMarkerNonDroppableMemstore() |
| throws IOException { |
| testReplayFlushCommitMarkerWithoutFlushStartMarker(false); |
| } |
| |
| /** |
| * Tests the case where we receive a flush commit before receiving any flush prepare markers |
| */ |
| public void testReplayFlushCommitMarkerWithoutFlushStartMarker(boolean droppableMemstore) |
| throws IOException { |
| // load some data to primary and flush. 1 flushes and some more unflushed data. |
| // write more data after flush depending on whether droppableSnapshot |
| putDataWithFlushes(primaryRegion, 100, 100, droppableMemstore ? 0 : 100); |
| int numRows = droppableMemstore ? 100 : 200; |
| |
| // now replay the edits and the flush marker |
| reader = createWALReaderForPrimary(); |
| |
| LOG.info("-- Replaying edits and flush events in secondary"); |
| FlushDescriptor commitFlushDesc = null; |
| |
| int lastReplayed = 0; |
| while (true) { |
| WAL.Entry entry = reader.next(); |
| if (entry == null) { |
| break; |
| } |
| FlushDescriptor flushDesc |
| = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); |
| if (flushDesc != null) { |
| if (flushDesc.getAction() == FlushAction.START_FLUSH) { |
| // do not replay flush start marker |
| } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { |
| commitFlushDesc = flushDesc; // hold on to the flush commit marker |
| } |
| // after replay verify that everything is still visible |
| verifyData(secondaryRegion, 0, lastReplayed+1, cq, families); |
| } else { |
| lastReplayed = replayEdit(secondaryRegion, entry); |
| } |
| } |
| |
| // at this point, there should be some data (rows 0-200) in the memstore without snapshot |
| // and some more data in memstores (rows 100-300) |
| verifyData(secondaryRegion, 0, numRows, cq, families); |
| |
| // no store files in the region |
| int expectedStoreFileCount = 0; |
| for (HStore s : secondaryRegion.getStores()) { |
| assertEquals(expectedStoreFileCount, s.getStorefilesCount()); |
| } |
| long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); |
| |
| // Test case 1: replay a flush commit marker without start flush marker |
| assertNull(secondaryRegion.getPrepareFlushResult()); |
| assertTrue(commitFlushDesc.getFlushSequenceNumber() > 0); |
| |
| // ensure all files are visible in secondary |
| for (HStore store : secondaryRegion.getStores()) { |
| assertTrue(store.getMaxSequenceId().orElse(0L) <= secondaryRegion.getReadPoint(null)); |
| } |
| |
| LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc); |
| secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc); |
| |
| // assert that the flush files are picked |
| expectedStoreFileCount++; |
| for (HStore s : secondaryRegion.getStores()) { |
| assertEquals(expectedStoreFileCount, s.getStorefilesCount()); |
| } |
| HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); |
| MemStoreSize mss = store.getFlushableSize(); |
| if (droppableMemstore) { |
| // assert that the memstore is dropped |
| assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD); |
| } else { |
| assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped |
| } |
| |
| // assert that the region memstore is same as before (we could not drop) |
| long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); |
| if (droppableMemstore) { |
| assertTrue(0 == newRegionMemstoreSize); |
| } else { |
| assertTrue(regionMemstoreSize == newRegionMemstoreSize); |
| } |
| |
| LOG.info("-- Verifying edits from secondary"); |
| verifyData(secondaryRegion, 0, numRows, cq, families); |
| |
| LOG.info("-- Verifying edits from primary."); |
| verifyData(primaryRegion, 0, numRows, cq, families); |
| } |
| |
| private FlushDescriptor clone(FlushDescriptor flush, long flushSeqId) { |
| return FlushDescriptor.newBuilder(flush) |
| .setFlushSequenceNumber(flushSeqId) |
| .build(); |
| } |
| |
| /** |
| * Tests replaying region open markers from primary region. Checks whether the files are picked up |
| */ |
| @Test |
| public void testReplayRegionOpenEvent() throws IOException { |
| putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush |
| int numRows = 100; |
| |
| // close the region and open again. |
| primaryRegion.close(); |
| primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); |
| |
| // now replay the edits and the flush marker |
| reader = createWALReaderForPrimary(); |
| List<RegionEventDescriptor> regionEvents = Lists.newArrayList(); |
| |
| LOG.info("-- Replaying edits and region events in secondary"); |
| while (true) { |
| WAL.Entry entry = reader.next(); |
| if (entry == null) { |
| break; |
| } |
| FlushDescriptor flushDesc |
| = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); |
| RegionEventDescriptor regionEventDesc |
| = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); |
| |
| if (flushDesc != null) { |
| // don't replay flush events |
| } else if (regionEventDesc != null) { |
| regionEvents.add(regionEventDesc); |
| } else { |
| // don't replay edits |
| } |
| } |
| |
| // we should have 1 open, 1 close and 1 open event |
| assertEquals(3, regionEvents.size()); |
| |
| // replay the first region open event. |
| secondaryRegion.replayWALRegionEventMarker(regionEvents.get(0)); |
| |
| // replay the close event as well |
| secondaryRegion.replayWALRegionEventMarker(regionEvents.get(1)); |
| |
| // no store files in the region |
| int expectedStoreFileCount = 0; |
| for (HStore s : secondaryRegion.getStores()) { |
| assertEquals(expectedStoreFileCount, s.getStorefilesCount()); |
| } |
| long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); |
| assertTrue(regionMemstoreSize == 0); |
| |
| // now replay the region open event that should contain new file locations |
| LOG.info("Testing replaying region open event " + regionEvents.get(2)); |
| secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2)); |
| |
| // assert that the flush files are picked |
| expectedStoreFileCount++; |
| for (HStore s : secondaryRegion.getStores()) { |
| assertEquals(expectedStoreFileCount, s.getStorefilesCount()); |
| } |
| HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); |
| MemStoreSize mss = store.getFlushableSize(); |
| assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD); |
| |
| // assert that the region memstore is empty |
| long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); |
| assertTrue(newRegionMemstoreSize == 0); |
| |
| assertNull(secondaryRegion.getPrepareFlushResult()); //prepare snapshot should be dropped if any |
| |
| LOG.info("-- Verifying edits from secondary"); |
| verifyData(secondaryRegion, 0, numRows, cq, families); |
| |
| LOG.info("-- Verifying edits from primary."); |
| verifyData(primaryRegion, 0, numRows, cq, families); |
| } |
| |
| /** |
| * Tests the case where we replay a region open event after a flush start but before receiving |
| * flush commit |
| */ |
| @Test |
| public void testReplayRegionOpenEventAfterFlushStart() throws IOException { |
| putDataWithFlushes(primaryRegion, 100, 100, 100); |
| int numRows = 200; |
| |
| // close the region and open again. |
| primaryRegion.close(); |
| primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); |
| |
| // now replay the edits and the flush marker |
| reader = createWALReaderForPrimary(); |
| List<RegionEventDescriptor> regionEvents = Lists.newArrayList(); |
| |
| LOG.info("-- Replaying edits and region events in secondary"); |
| while (true) { |
| WAL.Entry entry = reader.next(); |
| if (entry == null) { |
| break; |
| } |
| FlushDescriptor flushDesc |
| = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); |
| RegionEventDescriptor regionEventDesc |
| = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); |
| |
| if (flushDesc != null) { |
| // only replay flush start |
| if (flushDesc.getAction() == FlushAction.START_FLUSH) { |
| secondaryRegion.replayWALFlushStartMarker(flushDesc); |
| } |
| } else if (regionEventDesc != null) { |
| regionEvents.add(regionEventDesc); |
| } else { |
| replayEdit(secondaryRegion, entry); |
| } |
| } |
| |
| // at this point, there should be some data (rows 0-100) in the memstore snapshot |
| // and some more data in memstores (rows 100-200) |
| verifyData(secondaryRegion, 0, numRows, cq, families); |
| |
| // we should have 1 open, 1 close and 1 open event |
| assertEquals(3, regionEvents.size()); |
| |
| // no store files in the region |
| int expectedStoreFileCount = 0; |
| for (HStore s : secondaryRegion.getStores()) { |
| assertEquals(expectedStoreFileCount, s.getStorefilesCount()); |
| } |
| |
| // now replay the region open event that should contain new file locations |
| LOG.info("Testing replaying region open event " + regionEvents.get(2)); |
| secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2)); |
| |
| // assert that the flush files are picked |
| expectedStoreFileCount = 2; // two flushes happened |
| for (HStore s : secondaryRegion.getStores()) { |
| assertEquals(expectedStoreFileCount, s.getStorefilesCount()); |
| } |
| HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); |
| MemStoreSize newSnapshotSize = store.getSnapshotSize(); |
| assertTrue(newSnapshotSize.getDataSize() == 0); |
| |
| // assert that the region memstore is empty |
| long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); |
| assertTrue(newRegionMemstoreSize == 0); |
| |
| assertNull(secondaryRegion.getPrepareFlushResult()); //prepare snapshot should be dropped if any |
| |
| LOG.info("-- Verifying edits from secondary"); |
| verifyData(secondaryRegion, 0, numRows, cq, families); |
| |
| LOG.info("-- Verifying edits from primary."); |
| verifyData(primaryRegion, 0, numRows, cq, families); |
| } |
| |
| /** |
| * Tests whether edits coming in for replay are skipped which have smaller seq id than the seqId |
| * of the last replayed region open event. |
| */ |
| @Test |
| public void testSkippingEditsWithSmallerSeqIdAfterRegionOpenEvent() throws IOException { |
| putDataWithFlushes(primaryRegion, 100, 100, 0); |
| int numRows = 100; |
| |
| // close the region and open again. |
| primaryRegion.close(); |
| primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); |
| |
| // now replay the edits and the flush marker |
| reader = createWALReaderForPrimary(); |
| List<RegionEventDescriptor> regionEvents = Lists.newArrayList(); |
| List<WAL.Entry> edits = Lists.newArrayList(); |
| |
| LOG.info("-- Replaying edits and region events in secondary"); |
| while (true) { |
| WAL.Entry entry = reader.next(); |
| if (entry == null) { |
| break; |
| } |
| FlushDescriptor flushDesc |
| = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); |
| RegionEventDescriptor regionEventDesc |
| = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); |
| |
| if (flushDesc != null) { |
| // don't replay flushes |
| } else if (regionEventDesc != null) { |
| regionEvents.add(regionEventDesc); |
| } else { |
| edits.add(entry); |
| } |
| } |
| |
| // replay the region open of first open, but with the seqid of the second open |
| // this way non of the flush files will be picked up. |
| secondaryRegion.replayWALRegionEventMarker( |
| RegionEventDescriptor.newBuilder(regionEvents.get(0)).setLogSequenceNumber( |
| regionEvents.get(2).getLogSequenceNumber()).build()); |
| |
| |
| // replay edits from the before region close. If replay does not |
| // skip these the following verification will NOT fail. |
| for (WAL.Entry entry: edits) { |
| replayEdit(secondaryRegion, entry); |
| } |
| |
| boolean expectedFail = false; |
| try { |
| verifyData(secondaryRegion, 0, numRows, cq, families); |
| } catch (AssertionError e) { |
| expectedFail = true; // expected |
| } |
| if (!expectedFail) { |
| fail("Should have failed this verification"); |
| } |
| } |
| |
| @Test |
| public void testReplayFlushSeqIds() throws IOException { |
| // load some data to primary and flush |
| int start = 0; |
| LOG.info("-- Writing some data to primary from " + start + " to " + (start+100)); |
| putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families); |
| LOG.info("-- Flushing primary, creating 3 files for 3 stores"); |
| primaryRegion.flush(true); |
| |
| // now replay the flush marker |
| reader = createWALReaderForPrimary(); |
| |
| long flushSeqId = -1; |
| LOG.info("-- Replaying flush events in secondary"); |
| while (true) { |
| WAL.Entry entry = reader.next(); |
| if (entry == null) { |
| break; |
| } |
| FlushDescriptor flushDesc |
| = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); |
| if (flushDesc != null) { |
| if (flushDesc.getAction() == FlushAction.START_FLUSH) { |
| LOG.info("-- Replaying flush start in secondary"); |
| secondaryRegion.replayWALFlushStartMarker(flushDesc); |
| flushSeqId = flushDesc.getFlushSequenceNumber(); |
| } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { |
| LOG.info("-- Replaying flush commit in secondary"); |
| secondaryRegion.replayWALFlushCommitMarker(flushDesc); |
| assertEquals(flushSeqId, flushDesc.getFlushSequenceNumber()); |
| } |
| } |
| // else do not replay |
| } |
| |
| // TODO: what to do with this? |
| // assert that the newly picked up flush file is visible |
| long readPoint = secondaryRegion.getMVCC().getReadPoint(); |
| assertEquals(flushSeqId, readPoint); |
| |
| // after replay verify that everything is still visible |
| verifyData(secondaryRegion, 0, 100, cq, families); |
| } |
| |
| @Test |
| public void testSeqIdsFromReplay() throws IOException { |
| // test the case where seqId's coming from replayed WALEdits are made persisted with their |
| // original seqIds and they are made visible through mvcc read point upon replay |
| String method = name.getMethodName(); |
| byte[] tableName = Bytes.toBytes(method); |
| byte[] family = Bytes.toBytes("family"); |
| |
| HRegion region = initHRegion(tableName, method, family); |
| try { |
| // replay an entry that is bigger than current read point |
| long readPoint = region.getMVCC().getReadPoint(); |
| long origSeqId = readPoint + 100; |
| |
| Put put = new Put(row).addColumn(family, row, row); |
| put.setDurability(Durability.SKIP_WAL); // we replay with skip wal |
| replay(region, put, origSeqId); |
| |
| // read point should have advanced to this seqId |
| assertGet(region, family, row); |
| |
| // region seqId should have advanced at least to this seqId |
| assertEquals(origSeqId, region.getReadPoint(null)); |
| |
| // replay an entry that is smaller than current read point |
| // caution: adding an entry below current read point might cause partial dirty reads. Normal |
| // replay does not allow reads while replay is going on. |
| put = new Put(row2).addColumn(family, row2, row2); |
| put.setDurability(Durability.SKIP_WAL); |
| replay(region, put, origSeqId - 50); |
| |
| assertGet(region, family, row2); |
| } finally { |
| region.close(); |
| } |
| } |
| |
| /** |
| * Tests that a region opened in secondary mode would not write region open / close |
| * events to its WAL. |
| * @throws IOException |
| */ |
| @Test |
| public void testSecondaryRegionDoesNotWriteRegionEventsToWAL() throws IOException { |
| secondaryRegion.close(); |
| walSecondary = spy(walSecondary); |
| |
| // test for region open and close |
| secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null); |
| verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class), |
| any(WALEdit.class)); |
| |
| // test for replay prepare flush |
| putDataByReplay(secondaryRegion, 0, 10, cq, families); |
| secondaryRegion.replayWALFlushStartMarker(FlushDescriptor.newBuilder(). |
| setFlushSequenceNumber(10) |
| .setTableName(UnsafeByteOperations.unsafeWrap( |
| primaryRegion.getTableDescriptor().getTableName().getName())) |
| .setAction(FlushAction.START_FLUSH) |
| .setEncodedRegionName( |
| UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) |
| .setRegionName(UnsafeByteOperations.unsafeWrap( |
| primaryRegion.getRegionInfo().getRegionName())) |
| .build()); |
| |
| verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class), |
| any(WALEdit.class)); |
| |
| secondaryRegion.close(); |
| verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class), |
| any(WALEdit.class)); |
| } |
| |
| /** |
| * Tests the reads enabled flag for the region. When unset all reads should be rejected |
| */ |
| @Test |
| public void testRegionReadsEnabledFlag() throws IOException { |
| |
| putDataByReplay(secondaryRegion, 0, 100, cq, families); |
| |
| verifyData(secondaryRegion, 0, 100, cq, families); |
| |
| // now disable reads |
| secondaryRegion.setReadsEnabled(false); |
| try { |
| verifyData(secondaryRegion, 0, 100, cq, families); |
| fail("Should have failed with IOException"); |
| } catch(IOException ex) { |
| // expected |
| } |
| |
| // verify that we can still replay data |
| putDataByReplay(secondaryRegion, 100, 100, cq, families); |
| |
| // now enable reads again |
| secondaryRegion.setReadsEnabled(true); |
| verifyData(secondaryRegion, 0, 200, cq, families); |
| } |
| |
| /** |
| * Tests the case where a request for flush cache is sent to the region, but region cannot flush. |
| * It should write the flush request marker instead. |
| */ |
| @Test |
| public void testWriteFlushRequestMarker() throws IOException { |
| // primary region is empty at this point. Request a flush with writeFlushRequestWalMarker=false |
| FlushResultImpl result = primaryRegion.flushcache(true, false, FlushLifeCycleTracker.DUMMY); |
| assertNotNull(result); |
| assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result); |
| assertFalse(result.wroteFlushWalMarker); |
| |
| // request flush again, but this time with writeFlushRequestWalMarker = true |
| result = primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY); |
| assertNotNull(result); |
| assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result); |
| assertTrue(result.wroteFlushWalMarker); |
| |
| List<FlushDescriptor> flushes = Lists.newArrayList(); |
| reader = createWALReaderForPrimary(); |
| while (true) { |
| WAL.Entry entry = reader.next(); |
| if (entry == null) { |
| break; |
| } |
| FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); |
| if (flush != null) { |
| flushes.add(flush); |
| } |
| } |
| |
| assertEquals(1, flushes.size()); |
| assertNotNull(flushes.get(0)); |
| assertEquals(FlushDescriptor.FlushAction.CANNOT_FLUSH, flushes.get(0).getAction()); |
| } |
| |
| /** |
| * Test the case where the secondary region replica is not in reads enabled state because it is |
| * waiting for a flush or region open marker from primary region. Replaying CANNOT_FLUSH |
| * flush marker entry should restore the reads enabled status in the region and allow the reads |
| * to continue. |
| */ |
| @Test |
| public void testReplayingFlushRequestRestoresReadsEnabledState() throws IOException { |
| disableReads(secondaryRegion); |
| |
| // Test case 1: Test that replaying CANNOT_FLUSH request marker assuming this came from |
| // triggered flush restores readsEnabled |
| primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY); |
| reader = createWALReaderForPrimary(); |
| while (true) { |
| WAL.Entry entry = reader.next(); |
| if (entry == null) { |
| break; |
| } |
| FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); |
| if (flush != null) { |
| secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId()); |
| } |
| } |
| |
| // now reads should be enabled |
| secondaryRegion.get(new Get(Bytes.toBytes(0))); |
| } |
| |
| /** |
| * Test the case where the secondary region replica is not in reads enabled state because it is |
| * waiting for a flush or region open marker from primary region. Replaying flush start and commit |
| * entries should restore the reads enabled status in the region and allow the reads |
| * to continue. |
| */ |
| @Test |
| public void testReplayingFlushRestoresReadsEnabledState() throws IOException { |
| // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came |
| // from triggered flush restores readsEnabled |
| disableReads(secondaryRegion); |
| |
| // put some data in primary |
| putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families); |
| primaryRegion.flush(true); |
| // I seem to need to push more edits through so the WAL flushes on local fs. This was not |
| // needed before HBASE-15028. Not sure whats up. I can see that we have not flushed if I |
| // look at the WAL if I pause the test here and then use WALPrettyPrinter to look at content.. |
| // Doing same check before HBASE-15028 I can see all edits flushed to the WAL. Somethings up |
| // but can't figure it... and this is only test that seems to suffer this flush issue. |
| // St.Ack 20160201 |
| putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families); |
| |
| reader = createWALReaderForPrimary(); |
| while (true) { |
| WAL.Entry entry = reader.next(); |
| LOG.info(Objects.toString(entry)); |
| if (entry == null) { |
| break; |
| } |
| FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); |
| if (flush != null) { |
| secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId()); |
| } else { |
| replayEdit(secondaryRegion, entry); |
| } |
| } |
| |
| // now reads should be enabled |
| verifyData(secondaryRegion, 0, 100, cq, families); |
| } |
| |
| /** |
| * Test the case where the secondary region replica is not in reads enabled state because it is |
| * waiting for a flush or region open marker from primary region. Replaying flush start and commit |
| * entries should restore the reads enabled status in the region and allow the reads |
| * to continue. |
| */ |
| @Test |
| public void testReplayingFlushWithEmptyMemstoreRestoresReadsEnabledState() throws IOException { |
| // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came |
| // from triggered flush restores readsEnabled |
| disableReads(secondaryRegion); |
| |
| // put some data in primary |
| putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families); |
| primaryRegion.flush(true); |
| |
| reader = createWALReaderForPrimary(); |
| while (true) { |
| WAL.Entry entry = reader.next(); |
| if (entry == null) { |
| break; |
| } |
| FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); |
| if (flush != null) { |
| secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId()); |
| } |
| } |
| |
| // now reads should be enabled |
| verifyData(secondaryRegion, 0, 100, cq, families); |
| } |
| |
| /** |
| * Test the case where the secondary region replica is not in reads enabled state because it is |
| * waiting for a flush or region open marker from primary region. Replaying region open event |
| * entry from primary should restore the reads enabled status in the region and allow the reads |
| * to continue. |
| */ |
| @Test |
| public void testReplayingRegionOpenEventRestoresReadsEnabledState() throws IOException { |
| // Test case 3: Test that replaying region open event markers restores readsEnabled |
| disableReads(secondaryRegion); |
| |
| primaryRegion.close(); |
| primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); |
| |
| reader = createWALReaderForPrimary(); |
| while (true) { |
| WAL.Entry entry = reader.next(); |
| if (entry == null) { |
| break; |
| } |
| |
| RegionEventDescriptor regionEventDesc |
| = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); |
| |
| if (regionEventDesc != null) { |
| secondaryRegion.replayWALRegionEventMarker(regionEventDesc); |
| } |
| } |
| |
| // now reads should be enabled |
| secondaryRegion.get(new Get(Bytes.toBytes(0))); |
| } |
| |
| @Test |
| public void testRefresStoreFiles() throws IOException { |
| assertEquals(0, primaryRegion.getStoreFileList(families).size()); |
| assertEquals(0, secondaryRegion.getStoreFileList(families).size()); |
| |
| // Test case 1: refresh with an empty region |
| secondaryRegion.refreshStoreFiles(); |
| assertEquals(0, secondaryRegion.getStoreFileList(families).size()); |
| |
| // do one flush |
| putDataWithFlushes(primaryRegion, 100, 100, 0); |
| int numRows = 100; |
| |
| // refresh the store file list, and ensure that the files are picked up. |
| secondaryRegion.refreshStoreFiles(); |
| assertPathListsEqual(primaryRegion.getStoreFileList(families), |
| secondaryRegion.getStoreFileList(families)); |
| assertEquals(families.length, secondaryRegion.getStoreFileList(families).size()); |
| |
| LOG.info("-- Verifying edits from secondary"); |
| verifyData(secondaryRegion, 0, numRows, cq, families); |
| |
| // Test case 2: 3 some more flushes |
| putDataWithFlushes(primaryRegion, 100, 300, 0); |
| numRows = 300; |
| |
| // refresh the store file list, and ensure that the files are picked up. |
| secondaryRegion.refreshStoreFiles(); |
| assertPathListsEqual(primaryRegion.getStoreFileList(families), |
| secondaryRegion.getStoreFileList(families)); |
| assertEquals(families.length * 4, secondaryRegion.getStoreFileList(families).size()); |
| |
| LOG.info("-- Verifying edits from secondary"); |
| verifyData(secondaryRegion, 0, numRows, cq, families); |
| |
| if (FSUtils.WINDOWS) { |
| // compaction cannot move files while they are open in secondary on windows. Skip remaining. |
| return; |
| } |
| |
| // Test case 3: compact primary files |
| primaryRegion.compactStores(); |
| List<HRegion> regions = new ArrayList<>(); |
| regions.add(primaryRegion); |
| Mockito.doReturn(regions).when(rss).getRegions(); |
| CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, rss, false); |
| cleaner.chore(); |
| secondaryRegion.refreshStoreFiles(); |
| assertPathListsEqual(primaryRegion.getStoreFileList(families), |
| secondaryRegion.getStoreFileList(families)); |
| assertEquals(families.length, secondaryRegion.getStoreFileList(families).size()); |
| |
| LOG.info("-- Verifying edits from secondary"); |
| verifyData(secondaryRegion, 0, numRows, cq, families); |
| |
| LOG.info("-- Replaying edits in secondary"); |
| |
| // Test case 4: replay some edits, ensure that memstore is dropped. |
| assertTrue(secondaryRegion.getMemStoreDataSize() == 0); |
| putDataWithFlushes(primaryRegion, 400, 400, 0); |
| numRows = 400; |
| |
| reader = createWALReaderForPrimary(); |
| while (true) { |
| WAL.Entry entry = reader.next(); |
| if (entry == null) { |
| break; |
| } |
| FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); |
| if (flush != null) { |
| // do not replay flush |
| } else { |
| replayEdit(secondaryRegion, entry); |
| } |
| } |
| |
| assertTrue(secondaryRegion.getMemStoreDataSize() > 0); |
| |
| secondaryRegion.refreshStoreFiles(); |
| |
| assertTrue(secondaryRegion.getMemStoreDataSize() == 0); |
| |
| LOG.info("-- Verifying edits from primary"); |
| verifyData(primaryRegion, 0, numRows, cq, families); |
| LOG.info("-- Verifying edits from secondary"); |
| verifyData(secondaryRegion, 0, numRows, cq, families); |
| } |
| |
| /** |
| * Paths can be qualified or not. This does the assertion using String->Path conversion. |
| */ |
| private void assertPathListsEqual(List<String> list1, List<String> list2) { |
| List<Path> l1 = new ArrayList<>(list1.size()); |
| for (String path : list1) { |
| l1.add(Path.getPathWithoutSchemeAndAuthority(new Path(path))); |
| } |
| List<Path> l2 = new ArrayList<>(list2.size()); |
| for (String path : list2) { |
| l2.add(Path.getPathWithoutSchemeAndAuthority(new Path(path))); |
| } |
| assertEquals(l1, l2); |
| } |
| |
| private void disableReads(HRegion region) { |
| region.setReadsEnabled(false); |
| try { |
| verifyData(region, 0, 1, cq, families); |
| fail("Should have failed with IOException"); |
| } catch(IOException ex) { |
| // expected |
| } |
| } |
| |
| private void replay(HRegion region, Put put, long replaySeqId) throws IOException { |
| put.setDurability(Durability.SKIP_WAL); |
| MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0); |
| region.batchReplay(new MutationReplay[] {mutation}, replaySeqId); |
| } |
| |
| /** |
| * Tests replaying region open markers from primary region. Checks whether the files are picked up |
| */ |
| @Test |
| public void testReplayBulkLoadEvent() throws IOException { |
| LOG.info("testReplayBulkLoadEvent starts"); |
| putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush |
| |
| // close the region and open again. |
| primaryRegion.close(); |
| primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); |
| |
| // bulk load a file into primary region |
| Random random = new Random(); |
| byte[] randomValues = new byte[20]; |
| random.nextBytes(randomValues); |
| Path testPath = TEST_UTIL.getDataTestDirOnTestFS(); |
| |
| List<Pair<byte[], String>> familyPaths = new ArrayList<>(); |
| int expectedLoadFileCount = 0; |
| for (byte[] family : families) { |
| familyPaths.add(new Pair<>(family, createHFileForFamilies(testPath, family, randomValues))); |
| expectedLoadFileCount++; |
| } |
| primaryRegion.bulkLoadHFiles(familyPaths, false, null); |
| |
| // now replay the edits and the bulk load marker |
| reader = createWALReaderForPrimary(); |
| |
| LOG.info("-- Replaying edits and region events in secondary"); |
| BulkLoadDescriptor bulkloadEvent = null; |
| while (true) { |
| WAL.Entry entry = reader.next(); |
| if (entry == null) { |
| break; |
| } |
| bulkloadEvent = WALEdit.getBulkLoadDescriptor(entry.getEdit().getCells().get(0)); |
| if (bulkloadEvent != null) { |
| break; |
| } |
| } |
| |
| // we should have 1 bulk load event |
| assertTrue(bulkloadEvent != null); |
| assertEquals(expectedLoadFileCount, bulkloadEvent.getStoresCount()); |
| |
| // replay the bulk load event |
| secondaryRegion.replayWALBulkLoadEventMarker(bulkloadEvent); |
| |
| |
| List<String> storeFileName = new ArrayList<>(); |
| for (StoreDescriptor storeDesc : bulkloadEvent.getStoresList()) { |
| storeFileName.addAll(storeDesc.getStoreFileList()); |
| } |
| // assert that the bulk loaded files are picked |
| for (HStore s : secondaryRegion.getStores()) { |
| for (HStoreFile sf : s.getStorefiles()) { |
| storeFileName.remove(sf.getPath().getName()); |
| } |
| } |
| assertTrue("Found some store file isn't loaded:" + storeFileName, storeFileName.isEmpty()); |
| |
| LOG.info("-- Verifying edits from secondary"); |
| for (byte[] family : families) { |
| assertGet(secondaryRegion, family, randomValues); |
| } |
| } |
| |
| @Test |
| public void testReplayingFlushCommitWithFileAlreadyDeleted() throws IOException { |
| // tests replaying flush commit marker, but the flush file has already been compacted |
| // from primary and also deleted from the archive directory |
| secondaryRegion.replayWALFlushCommitMarker(FlushDescriptor.newBuilder(). |
| setFlushSequenceNumber(Long.MAX_VALUE) |
| .setTableName(UnsafeByteOperations.unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName())) |
| .setAction(FlushAction.COMMIT_FLUSH) |
| .setEncodedRegionName( |
| UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) |
| .setRegionName(UnsafeByteOperations.unsafeWrap( |
| primaryRegion.getRegionInfo().getRegionName())) |
| .addStoreFlushes(StoreFlushDescriptor.newBuilder() |
| .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])) |
| .setStoreHomeDir("/store_home_dir") |
| .addFlushOutput("/foo/baz/123") |
| .build()) |
| .build()); |
| } |
| |
| @Test |
| public void testReplayingCompactionWithFileAlreadyDeleted() throws IOException { |
| // tests replaying compaction marker, but the compaction output file has already been compacted |
| // from primary and also deleted from the archive directory |
| secondaryRegion.replayWALCompactionMarker(CompactionDescriptor.newBuilder() |
| .setTableName(UnsafeByteOperations.unsafeWrap( |
| primaryRegion.getTableDescriptor().getTableName().getName())) |
| .setEncodedRegionName( |
| UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) |
| .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])) |
| .addCompactionInput("/123") |
| .addCompactionOutput("/456") |
| .setStoreHomeDir("/store_home_dir") |
| .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName())) |
| .build() |
| , true, true, Long.MAX_VALUE); |
| } |
| |
| @Test |
| public void testReplayingRegionOpenEventWithFileAlreadyDeleted() throws IOException { |
| // tests replaying region open event marker, but the region files have already been compacted |
| // from primary and also deleted from the archive directory |
| secondaryRegion.replayWALRegionEventMarker(RegionEventDescriptor.newBuilder() |
| .setTableName(UnsafeByteOperations.unsafeWrap( |
| primaryRegion.getTableDescriptor().getTableName().getName())) |
| .setEncodedRegionName( |
| UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) |
| .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName())) |
| .setEventType(EventType.REGION_OPEN) |
| .setServer(ProtobufUtil.toServerName(ServerName.valueOf("foo", 1, 1))) |
| .setLogSequenceNumber(Long.MAX_VALUE) |
| .addStores(StoreDescriptor.newBuilder() |
| .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])) |
| .setStoreHomeDir("/store_home_dir") |
| .addStoreFile("/123") |
| .build()) |
| .build()); |
| } |
| |
| @Test |
| public void testReplayingBulkLoadEventWithFileAlreadyDeleted() throws IOException { |
| // tests replaying bulk load event marker, but the bulk load files have already been compacted |
| // from primary and also deleted from the archive directory |
| secondaryRegion.replayWALBulkLoadEventMarker(BulkLoadDescriptor.newBuilder() |
| .setTableName(ProtobufUtil.toProtoTableName(primaryRegion.getTableDescriptor().getTableName())) |
| .setEncodedRegionName( |
| UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) |
| .setBulkloadSeqNum(Long.MAX_VALUE) |
| .addStores(StoreDescriptor.newBuilder() |
| .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])) |
| .setStoreHomeDir("/store_home_dir") |
| .addStoreFile("/123") |
| .build()) |
| .build()); |
| } |
| |
| private String createHFileForFamilies(Path testPath, byte[] family, |
| byte[] valueBytes) throws IOException { |
| HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration()); |
| // TODO We need a way to do this without creating files |
| Path testFile = new Path(testPath, TEST_UTIL.getRandomUUID().toString()); |
| FSDataOutputStream out = TEST_UTIL.getTestFileSystem().create(testFile); |
| try { |
| hFileFactory.withOutputStream(out); |
| hFileFactory.withFileContext(new HFileContextBuilder().build()); |
| HFile.Writer writer = hFileFactory.create(); |
| try { |
| writer.append(new KeyValue(ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY) |
| .setRow(valueBytes) |
| .setFamily(family) |
| .setQualifier(valueBytes) |
| .setTimestamp(0L) |
| .setType(KeyValue.Type.Put.getCode()) |
| .setValue(valueBytes) |
| .build())); |
| } finally { |
| writer.close(); |
| } |
| } finally { |
| out.close(); |
| } |
| return testFile.toString(); |
| } |
| |
| /** Puts a total of numRows + numRowsAfterFlush records indexed with numeric row keys. Does |
| * a flush every flushInterval number of records. Then it puts numRowsAfterFlush number of |
| * more rows but does not execute flush after |
| * @throws IOException */ |
| private void putDataWithFlushes(HRegion region, int flushInterval, |
| int numRows, int numRowsAfterFlush) throws IOException { |
| int start = 0; |
| for (; start < numRows; start += flushInterval) { |
| LOG.info("-- Writing some data to primary from " + start + " to " + (start+flushInterval)); |
| putData(region, Durability.SYNC_WAL, start, flushInterval, cq, families); |
| LOG.info("-- Flushing primary, creating 3 files for 3 stores"); |
| region.flush(true); |
| } |
| LOG.info("-- Writing some more data to primary, not flushing"); |
| putData(region, Durability.SYNC_WAL, start, numRowsAfterFlush, cq, families); |
| } |
| |
| private void putDataByReplay(HRegion region, |
| int startRow, int numRows, byte[] qf, byte[]... families) throws IOException { |
| for (int i = startRow; i < startRow + numRows; i++) { |
| Put put = new Put(Bytes.toBytes("" + i)); |
| put.setDurability(Durability.SKIP_WAL); |
| for (byte[] family : families) { |
| put.addColumn(family, qf, EnvironmentEdgeManager.currentTime(), null); |
| } |
| replay(region, put, i+1); |
| } |
| } |
| |
| private static HRegion initHRegion(byte[] tableName, |
| String callingMethod, byte[]... families) throws IOException { |
| return initHRegion(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, |
| callingMethod, TEST_UTIL.getConfiguration(), false, Durability.SYNC_WAL, null, families); |
| } |
| |
| private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey, |
| String callingMethod, Configuration conf, boolean isReadOnly, Durability durability, |
| WAL wal, byte[]... families) throws IOException { |
| return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, callingMethod, conf, |
| isReadOnly, durability, wal, families); |
| } |
| } |