| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.tool; |
| |
| import static org.junit.Assert.assertArrayEquals; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| import static org.mockito.ArgumentMatchers.any; |
| import static org.mockito.ArgumentMatchers.anyBoolean; |
| import static org.mockito.ArgumentMatchers.anyList; |
| import static org.mockito.Mockito.doReturn; |
| import static org.mockito.Mockito.spy; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.Deque; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.stream.IntStream; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hbase.HBaseClassTestRule; |
| import org.apache.hadoop.hbase.HBaseTestingUtility; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.TableExistsException; |
| import org.apache.hadoop.hbase.TableName; |
| import org.apache.hadoop.hbase.client.Admin; |
| import org.apache.hadoop.hbase.client.AsyncClusterConnection; |
| import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; |
| import org.apache.hadoop.hbase.client.Connection; |
| import org.apache.hadoop.hbase.client.RegionInfo; |
| import org.apache.hadoop.hbase.client.Result; |
| import org.apache.hadoop.hbase.client.ResultScanner; |
| import org.apache.hadoop.hbase.client.Scan; |
| import org.apache.hadoop.hbase.client.Table; |
| import org.apache.hadoop.hbase.client.TableDescriptor; |
| import org.apache.hadoop.hbase.client.TableDescriptorBuilder; |
| import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; |
| import org.apache.hadoop.hbase.regionserver.HRegionServer; |
| import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad; |
| import org.apache.hadoop.hbase.testclassification.LargeTests; |
| import org.apache.hadoop.hbase.testclassification.MiscTests; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.hbase.util.CommonFSUtils; |
| import org.apache.hadoop.hbase.util.Pair; |
| import org.junit.AfterClass; |
| import org.junit.BeforeClass; |
| import org.junit.ClassRule; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| import org.junit.rules.TestName; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; |
| |
| import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; |
| |
| /** |
| * Test cases for the atomic load error handling of the bulk load functionality. |
| */ |
| @Category({ MiscTests.class, LargeTests.class }) |
| public class TestBulkLoadHFilesSplitRecovery { |
| |
| @ClassRule |
| public static final HBaseClassTestRule CLASS_RULE = |
| HBaseClassTestRule.forClass(TestBulkLoadHFilesSplitRecovery.class); |
| |
| private static final Logger LOG = LoggerFactory.getLogger(TestHRegionServerBulkLoad.class); |
| |
| static HBaseTestingUtility util; |
| // used by secure subclass |
| static boolean useSecure = false; |
| |
| final static int NUM_CFS = 10; |
| final static byte[] QUAL = Bytes.toBytes("qual"); |
| final static int ROWCOUNT = 100; |
| |
| private final static byte[][] families = new byte[NUM_CFS][]; |
| |
| @Rule |
| public TestName name = new TestName(); |
| |
| static { |
| for (int i = 0; i < NUM_CFS; i++) { |
| families[i] = Bytes.toBytes(family(i)); |
| } |
| } |
| |
| static byte[] rowkey(int i) { |
| return Bytes.toBytes(String.format("row_%08d", i)); |
| } |
| |
| static String family(int i) { |
| return String.format("family_%04d", i); |
| } |
| |
| static byte[] value(int i) { |
| return Bytes.toBytes(String.format("%010d", i)); |
| } |
| |
| public static void buildHFiles(FileSystem fs, Path dir, int value) throws IOException { |
| byte[] val = value(value); |
| for (int i = 0; i < NUM_CFS; i++) { |
| Path testIn = new Path(dir, family(i)); |
| |
| TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i), |
| Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT); |
| } |
| } |
| |
| private TableDescriptor createTableDesc(TableName name, int cfs) { |
| TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name); |
| IntStream.range(0, cfs).mapToObj(i -> ColumnFamilyDescriptorBuilder.of(family(i))) |
| .forEachOrdered(builder::setColumnFamily); |
| return builder.build(); |
| } |
| |
| /** |
| * Creates a table with given table name and specified number of column families if the table does |
| * not already exist. |
| */ |
| private void setupTable(final Connection connection, TableName table, int cfs) |
| throws IOException { |
| try { |
| LOG.info("Creating table " + table); |
| try (Admin admin = connection.getAdmin()) { |
| admin.createTable(createTableDesc(table, cfs)); |
| } |
| } catch (TableExistsException tee) { |
| LOG.info("Table " + table + " already exists"); |
| } |
| } |
| |
| /** |
| * Creates a table with given table name,specified number of column families<br> |
| * and splitkeys if the table does not already exist. |
| * @param table |
| * @param cfs |
| * @param SPLIT_KEYS |
| */ |
| private void setupTableWithSplitkeys(TableName table, int cfs, byte[][] SPLIT_KEYS) |
| throws IOException { |
| try { |
| LOG.info("Creating table " + table); |
| util.createTable(createTableDesc(table, cfs), SPLIT_KEYS); |
| } catch (TableExistsException tee) { |
| LOG.info("Table " + table + " already exists"); |
| } |
| } |
| |
| private Path buildBulkFiles(TableName table, int value) throws Exception { |
| Path dir = util.getDataTestDirOnTestFS(table.getNameAsString()); |
| Path bulk1 = new Path(dir, table.getNameAsString() + value); |
| FileSystem fs = util.getTestFileSystem(); |
| buildHFiles(fs, bulk1, value); |
| return bulk1; |
| } |
| |
| /** |
| * Populate table with known values. |
| */ |
| private void populateTable(final Connection connection, TableName table, int value) |
| throws Exception { |
| // create HFiles for different column families |
| Path dir = buildBulkFiles(table, value); |
| BulkLoadHFiles.create(util.getConfiguration()).bulkLoad(table, dir); |
| } |
| |
| /** |
| * Split the known table in half. (this is hard coded for this test suite) |
| */ |
| private void forceSplit(TableName table) { |
| try { |
| // need to call regions server to by synchronous but isn't visible. |
| HRegionServer hrs = util.getRSForFirstRegionInTable(table); |
| |
| for (RegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) { |
| if (hri.getTable().equals(table)) { |
| util.getAdmin().splitRegionAsync(hri.getRegionName(), rowkey(ROWCOUNT / 2)); |
| // ProtobufUtil.split(null, hrs.getRSRpcServices(), hri, rowkey(ROWCOUNT / 2)); |
| } |
| } |
| |
| // verify that split completed. |
| int regions; |
| do { |
| regions = 0; |
| for (RegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) { |
| if (hri.getTable().equals(table)) { |
| regions++; |
| } |
| } |
| if (regions != 2) { |
| LOG.info("Taking some time to complete split..."); |
| Thread.sleep(250); |
| } |
| } while (regions != 2); |
| } catch (IOException e) { |
| e.printStackTrace(); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| } |
| } |
| |
| @BeforeClass |
| public static void setupCluster() throws Exception { |
| util = new HBaseTestingUtility(); |
| util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); |
| util.startMiniCluster(1); |
| } |
| |
| @AfterClass |
| public static void teardownCluster() throws Exception { |
| util.shutdownMiniCluster(); |
| } |
| |
| /** |
| * Checks that all columns have the expected value and that there is the expected number of rows. |
| * @throws IOException |
| */ |
| void assertExpectedTable(TableName table, int count, int value) throws IOException { |
| TableDescriptor htd = util.getAdmin().getDescriptor(table); |
| assertNotNull(htd); |
| try (Table t = util.getConnection().getTable(table); |
| ResultScanner sr = t.getScanner(new Scan())) { |
| int i = 0; |
| for (Result r; (r = sr.next()) != null;) { |
| r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream()) |
| .forEach(v -> assertArrayEquals(value(value), v)); |
| i++; |
| } |
| assertEquals(count, i); |
| } catch (IOException e) { |
| fail("Failed due to exception"); |
| } |
| } |
| |
| private static <T> CompletableFuture<T> failedFuture(Throwable error) { |
| CompletableFuture<T> future = new CompletableFuture<>(); |
| future.completeExceptionally(error); |
| return future; |
| } |
| |
| private static AsyncClusterConnection mockAndInjectError(AsyncClusterConnection conn) { |
| AsyncClusterConnection errConn = spy(conn); |
| doReturn(failedFuture(new IOException("injecting bulk load error"))).when(errConn) |
| .bulkLoad(any(), anyList(), any(), anyBoolean(), any(), any(), anyBoolean(), anyList(), |
| anyBoolean()); |
| return errConn; |
| } |
| |
| /** |
| * Test that shows that exception thrown from the RS side will result in an exception on the |
| * LIHFile client. |
| */ |
| @Test(expected = IOException.class) |
| public void testBulkLoadPhaseFailure() throws Exception { |
| final TableName table = TableName.valueOf(name.getMethodName()); |
| final AtomicInteger attemptedCalls = new AtomicInteger(); |
| Configuration conf = new Configuration(util.getConfiguration()); |
| conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); |
| BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf) { |
| |
| @Override |
| protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName, |
| Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups, |
| boolean copyFiles, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException { |
| AsyncClusterConnection c = |
| attemptedCalls.incrementAndGet() == 1 ? mockAndInjectError(conn) : conn; |
| super.bulkLoadPhase(c, tableName, queue, regionGroups, copyFiles, item2RegionMap); |
| } |
| }; |
| Path dir = buildBulkFiles(table, 1); |
| loader.bulkLoad(table, dir); |
| } |
| |
| /** |
| * Test that shows that exception thrown from the RS side will result in the expected number of |
| * retries set by ${@link HConstants#HBASE_CLIENT_RETRIES_NUMBER} when |
| * ${@link BulkLoadHFiles#RETRY_ON_IO_EXCEPTION} is set |
| */ |
| @Test |
| public void testRetryOnIOException() throws Exception { |
| TableName table = TableName.valueOf(name.getMethodName()); |
| AtomicInteger calls = new AtomicInteger(0); |
| setupTable(util.getConnection(), table, 10); |
| Configuration conf = new Configuration(util.getConfiguration()); |
| conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); |
| conf.setBoolean(BulkLoadHFiles.RETRY_ON_IO_EXCEPTION, true); |
| BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf) { |
| |
| @Override |
| protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName, |
| Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups, |
| boolean copyFiles, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException { |
| if (calls.get() < conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, |
| HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) { |
| calls.incrementAndGet(); |
| super.bulkLoadPhase(mockAndInjectError(conn), tableName, queue, regionGroups, copyFiles, |
| item2RegionMap); |
| } else { |
| super.bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, item2RegionMap); |
| } |
| } |
| }; |
| Path dir = buildBulkFiles(table, 1); |
| loader.bulkLoad(table, dir); |
| assertEquals(calls.get(), 2); |
| } |
| |
| /** |
| * This test exercises the path where there is a split after initial validation but before the |
| * atomic bulk load call. We cannot use presplitting to test this path, so we actually inject a |
| * split just before the atomic region load. |
| */ |
| @Test |
| public void testSplitWhileBulkLoadPhase() throws Exception { |
| final TableName table = TableName.valueOf(name.getMethodName()); |
| setupTable(util.getConnection(), table, 10); |
| populateTable(util.getConnection(), table, 1); |
| assertExpectedTable(table, ROWCOUNT, 1); |
| |
| // Now let's cause trouble. This will occur after checks and cause bulk |
| // files to fail when attempt to atomically import. This is recoverable. |
| final AtomicInteger attemptedCalls = new AtomicInteger(); |
| BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) { |
| |
| @Override |
| protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName, |
| Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups, |
| boolean copyFiles, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException { |
| int i = attemptedCalls.incrementAndGet(); |
| if (i == 1) { |
| // On first attempt force a split. |
| forceSplit(table); |
| } |
| super.bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, item2RegionMap); |
| } |
| }; |
| |
| // create HFiles for different column families |
| Path dir = buildBulkFiles(table, 2); |
| loader.bulkLoad(table, dir); |
| |
| // check that data was loaded |
| // The three expected attempts are 1) failure because need to split, 2) |
| // load of split top 3) load of split bottom |
| assertEquals(3, attemptedCalls.get()); |
| assertExpectedTable(table, ROWCOUNT, 2); |
| } |
| |
| /** |
| * This test splits a table and attempts to bulk load. The bulk import files should be split |
| * before atomically importing. |
| */ |
| @Test |
| public void testGroupOrSplitPresplit() throws Exception { |
| final TableName table = TableName.valueOf(name.getMethodName()); |
| setupTable(util.getConnection(), table, 10); |
| populateTable(util.getConnection(), table, 1); |
| assertExpectedTable(util.getConnection(), table, ROWCOUNT, 1); |
| forceSplit(table); |
| |
| final AtomicInteger countedLqis = new AtomicInteger(); |
| BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) { |
| |
| @Override |
| protected Pair<List<LoadQueueItem>, String> groupOrSplit(AsyncClusterConnection conn, |
| TableName tableName, Multimap<ByteBuffer, LoadQueueItem> regionGroups, LoadQueueItem item, |
| List<Pair<byte[], byte[]>> startEndKeys) throws IOException { |
| Pair<List<LoadQueueItem>, String> lqis = |
| super.groupOrSplit(conn, tableName, regionGroups, item, startEndKeys); |
| if (lqis != null && lqis.getFirst() != null) { |
| countedLqis.addAndGet(lqis.getFirst().size()); |
| } |
| return lqis; |
| } |
| }; |
| |
| // create HFiles for different column families |
| Path dir = buildBulkFiles(table, 2); |
| loader.bulkLoad(table, dir); |
| assertExpectedTable(util.getConnection(), table, ROWCOUNT, 2); |
| assertEquals(20, countedLqis.get()); |
| } |
| |
| @Test |
| public void testCorrectSplitPoint() throws Exception { |
| final TableName table = TableName.valueOf(name.getMethodName()); |
| byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"), |
| Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"), Bytes.toBytes("row_00000040"), |
| Bytes.toBytes("row_00000050"), Bytes.toBytes("row_00000060"), |
| Bytes.toBytes("row_00000070") }; |
| setupTableWithSplitkeys(table, NUM_CFS, SPLIT_KEYS); |
| |
| final AtomicInteger bulkloadRpcTimes = new AtomicInteger(); |
| BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) { |
| |
| @Override |
| protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName, |
| Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups, |
| boolean copyFiles, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException { |
| bulkloadRpcTimes.addAndGet(1); |
| super.bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, item2RegionMap); |
| } |
| }; |
| |
| Path dir = buildBulkFiles(table, 1); |
| loader.bulkLoad(table, dir); |
| // before HBASE-25281 we need invoke bulkload rpc 8 times |
| assertEquals(4, bulkloadRpcTimes.get()); |
| } |
| |
| /** |
| * This test creates a table with many small regions. The bulk load files would be splitted |
| * multiple times before all of them can be loaded successfully. |
| */ |
| @Test |
| public void testSplitTmpFileCleanUp() throws Exception { |
| final TableName table = TableName.valueOf(name.getMethodName()); |
| byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"), |
| Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"), Bytes.toBytes("row_00000040"), |
| Bytes.toBytes("row_00000050") }; |
| setupTableWithSplitkeys(table, 10, SPLIT_KEYS); |
| |
| BulkLoadHFiles loader = BulkLoadHFiles.create(util.getConfiguration()); |
| |
| // create HFiles |
| Path dir = buildBulkFiles(table, 2); |
| loader.bulkLoad(table, dir); |
| // family path |
| Path tmpPath = new Path(dir, family(0)); |
| // TMP_DIR under family path |
| tmpPath = new Path(tmpPath, BulkLoadHFilesTool.TMP_DIR); |
| FileSystem fs = dir.getFileSystem(util.getConfiguration()); |
| // HFiles have been splitted, there is TMP_DIR |
| assertTrue(fs.exists(tmpPath)); |
| // TMP_DIR should have been cleaned-up |
| assertNull(BulkLoadHFilesTool.TMP_DIR + " should be empty.", |
| CommonFSUtils.listStatus(fs, tmpPath)); |
| assertExpectedTable(util.getConnection(), table, ROWCOUNT, 2); |
| } |
| |
| /** |
| * This simulates an remote exception which should cause LIHF to exit with an exception. |
| */ |
| @Test(expected = IOException.class) |
| public void testGroupOrSplitFailure() throws Exception { |
| final TableName tableName = TableName.valueOf(name.getMethodName()); |
| setupTable(util.getConnection(), tableName, 10); |
| BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) { |
| |
| private int i = 0; |
| |
| @Override |
| protected Pair<List<LoadQueueItem>, String> groupOrSplit(AsyncClusterConnection conn, |
| TableName tableName, Multimap<ByteBuffer, LoadQueueItem> regionGroups, LoadQueueItem item, |
| List<Pair<byte[], byte[]>> startEndKeys) throws IOException { |
| i++; |
| |
| if (i == 5) { |
| throw new IOException("failure"); |
| } |
| return super.groupOrSplit(conn, tableName, regionGroups, item, startEndKeys); |
| } |
| }; |
| |
| // create HFiles for different column families |
| Path dir = buildBulkFiles(tableName, 1); |
| loader.bulkLoad(tableName, dir); |
| } |
| |
| /** |
| * We are testing a split after initial validation but before the atomic bulk load call. |
| * We cannot use presplitting to test this path, so we actually inject a |
| * split just before the atomic region load. However, we will pass null item2RegionMap |
| * and that should not affect the bulk load behavior. |
| */ |
| @Test |
| public void testSplitWhileBulkLoadPhaseWithoutItemMap() throws Exception { |
| final TableName table = TableName.valueOf(name.getMethodName()); |
| setupTable(util.getConnection(), table, 10); |
| populateTable(util.getConnection(), table, 1); |
| assertExpectedTable(table, ROWCOUNT, 1); |
| |
| // Now let's cause trouble. This will occur after checks and cause bulk |
| // files to fail when attempt to atomically import. This is recoverable. |
| final AtomicInteger attemptedCalls = new AtomicInteger(); |
| BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) { |
| |
| @Override |
| protected void bulkLoadPhase(final AsyncClusterConnection conn, final TableName tableName, |
| final Deque<LoadQueueItem> queue, final Multimap<ByteBuffer, LoadQueueItem> regionGroups, |
| final boolean copyFiles, |
| final Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException { |
| |
| int i = attemptedCalls.incrementAndGet(); |
| if (i == 1) { |
| // On first attempt force a split. |
| forceSplit(table); |
| } |
| |
| // Passing item2RegionMap null |
| // In the absence of LoadQueueItem, bulk load should work as expected |
| super.bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, null); |
| } |
| |
| }; |
| |
| // create HFiles for different column families |
| Path dir = buildBulkFiles(table, 2); |
| loader.bulkLoad(table, dir); |
| |
| // check that data was loaded |
| // The three expected attempts are 1) failure because need to split, 2) |
| // load of split top 3) load of split bottom |
| assertEquals(3, attemptedCalls.get()); |
| assertExpectedTable(table, ROWCOUNT, 2); |
| } |
| |
| |
| /** |
| * Checks that all columns have the expected value and that there is the expected number of rows. |
| */ |
| void assertExpectedTable(final Connection connection, TableName table, int count, int value) |
| throws IOException { |
| TableDescriptor htd = util.getAdmin().getDescriptor(table); |
| assertNotNull(htd); |
| try (Table t = connection.getTable(table); ResultScanner sr = t.getScanner(new Scan())) { |
| int i = 0; |
| for (Result r; (r = sr.next()) != null;) { |
| r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream()) |
| .forEach(v -> assertArrayEquals(value(value), v)); |
| i++; |
| } |
| assertEquals(count, i); |
| } catch (IOException e) { |
| fail("Failed due to exception"); |
| } |
| } |
| } |