| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.accumulo.test.functional; |
| |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Random; |
| |
| import org.apache.accumulo.core.client.Accumulo; |
| import org.apache.accumulo.core.client.AccumuloClient; |
| import org.apache.accumulo.core.client.BatchScanner; |
| import org.apache.accumulo.core.client.admin.InstanceOperations; |
| import org.apache.accumulo.core.client.admin.NewTableConfiguration; |
| import org.apache.accumulo.core.conf.Property; |
| import org.apache.accumulo.core.data.Key; |
| import org.apache.accumulo.core.data.Range; |
| import org.apache.accumulo.core.data.Value; |
| import org.apache.accumulo.harness.AccumuloClusterHarness; |
| import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; |
| import org.apache.accumulo.test.TestIngest; |
| import org.apache.accumulo.test.TestIngest.IngestParams; |
| import org.apache.accumulo.test.VerifyIngest; |
| import org.apache.hadoop.conf.Configuration; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; |
| |
| /** |
| * A functional test that exercises hitting the max open file limit on a tablet server. This test |
| * assumes there are one or two tablet servers. |
| */ |
| @SuppressWarnings("removal") |
| public class MaxOpenIT extends AccumuloClusterHarness { |
| |
| @Override |
| public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { |
| Map<String,String> conf = cfg.getSiteConfig(); |
| conf.put(Property.TSERV_SCAN_MAX_OPENFILES.getKey(), "4"); |
| conf.put(Property.TSERV_MAJC_MAXCONCURRENT.getKey(), "1"); |
| conf.put(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), "2"); |
| cfg.setSiteConfig(conf); |
| } |
| |
| @Override |
| protected int defaultTimeoutSeconds() { |
| return 3 * 60; |
| } |
| |
| private String scanMaxOpenFiles, majcConcurrent, majcThreadMaxOpen; |
| |
| @Before |
| public void alterConfig() throws Exception { |
| try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { |
| InstanceOperations iops = client.instanceOperations(); |
| Map<String,String> sysConfig = iops.getSystemConfiguration(); |
| scanMaxOpenFiles = sysConfig.get(Property.TSERV_SCAN_MAX_OPENFILES.getKey()); |
| majcConcurrent = sysConfig.get(Property.TSERV_MAJC_MAXCONCURRENT.getKey()); |
| majcThreadMaxOpen = sysConfig.get(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey()); |
| } |
| } |
| |
| @After |
| public void restoreConfig() throws Exception { |
| try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { |
| InstanceOperations iops = client.instanceOperations(); |
| if (scanMaxOpenFiles != null) { |
| iops.setProperty(Property.TSERV_SCAN_MAX_OPENFILES.getKey(), scanMaxOpenFiles); |
| } |
| if (majcConcurrent != null) { |
| iops.setProperty(Property.TSERV_MAJC_MAXCONCURRENT.getKey(), majcConcurrent); |
| } |
| if (majcThreadMaxOpen != null) { |
| iops.setProperty(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), majcThreadMaxOpen); |
| } |
| } |
| } |
| |
| private static final int NUM_TABLETS = 16; |
| private static final int NUM_TO_INGEST = 10000; |
| |
| @Test |
| public void run() throws Exception { |
| try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { |
| final String tableName = getUniqueNames(1)[0]; |
| HashMap<String,String> props = new HashMap<>(); |
| props.put(Property.TABLE_MAJC_RATIO.getKey(), "10"); |
| NewTableConfiguration ntc = new NewTableConfiguration().setProperties(props) |
| .withSplits(TestIngest.getSplitPoints(0, NUM_TO_INGEST, NUM_TABLETS)); |
| c.tableOperations().create(tableName, ntc); |
| |
| // the following loop should create three tablets in each map file |
| for (int i = 0; i < 3; i++) { |
| IngestParams params = new IngestParams(getClientProps(), tableName, NUM_TO_INGEST); |
| params.timestamp = i; |
| params.dataSize = 50; |
| params.rows = NUM_TO_INGEST; |
| params.cols = 1; |
| params.random = i; |
| TestIngest.ingest(c, params); |
| |
| c.tableOperations().flush(tableName, null, null, true); |
| FunctionalTestUtils.checkRFiles(c, tableName, NUM_TABLETS, NUM_TABLETS, i + 1, i + 1); |
| } |
| |
| List<Range> ranges = new ArrayList<>(NUM_TO_INGEST); |
| |
| for (int i = 0; i < NUM_TO_INGEST; i++) { |
| ranges.add(new Range(TestIngest.generateRow(i, 0))); |
| } |
| |
| long time1 = batchScan(c, tableName, ranges, 1); |
| // run it again, now that stuff is cached on the client and sever |
| time1 = batchScan(c, tableName, ranges, 1); |
| long time2 = batchScan(c, tableName, ranges, NUM_TABLETS); |
| |
| System.out.printf("Single thread scan time %6.2f %n", time1 / 1000.0); |
| System.out.printf("Multiple thread scan time %6.2f %n", time2 / 1000.0); |
| } |
| } |
| |
| @SuppressFBWarnings(value = "PREDICTABLE_RANDOM", |
| justification = "predictable random is okay for testing") |
| private long batchScan(AccumuloClient c, String tableName, List<Range> ranges, int threads) |
| throws Exception { |
| try (BatchScanner bs = c.createBatchScanner(tableName, TestIngest.AUTHS, threads)) { |
| |
| bs.setRanges(ranges); |
| |
| int count = 0; |
| |
| long t1 = System.currentTimeMillis(); |
| |
| byte[] rval = new byte[50]; |
| Random random = new Random(); |
| |
| for (Entry<Key,Value> entry : bs) { |
| count++; |
| int row = VerifyIngest.getRow(entry.getKey()); |
| int col = VerifyIngest.getCol(entry.getKey()); |
| |
| if (row < 0 || row >= NUM_TO_INGEST) { |
| throw new Exception("unexcepted row " + row); |
| } |
| |
| rval = TestIngest.genRandomValue(random, rval, 2, row, col); |
| |
| if (entry.getValue().compareTo(rval) != 0) { |
| throw new Exception("unexcepted value row=" + row + " col=" + col); |
| } |
| } |
| |
| long t2 = System.currentTimeMillis(); |
| |
| if (count != NUM_TO_INGEST) { |
| throw new Exception("Batch Scan did not return expected number of values " + count); |
| } |
| |
| return t2 - t1; |
| } |
| } |
| |
| } |