blob: 1baef24ae3fee42348c02019dbd05cb7559f15a2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_RUNTIME_DISK_IO_MGR_STRESS_H
#define IMPALA_RUNTIME_DISK_IO_MGR_STRESS_H
#include <memory>
#include <vector>
#include <boost/scoped_ptr.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/thread/thread.hpp>
#include "common/object-pool.h"
#include "runtime/bufferpool/buffer-pool.h"
#include "runtime/io/disk-io-mgr.h"
#include "runtime/mem-tracker.h"
#include "runtime/thread-resource-mgr.h"
namespace impala {
namespace io {
/// Test utility to stress the disk io mgr. It allows for a configurable
/// number of clients. The clients continuously issue work to the io mgr and
/// asynchronously get cancelled. The stress test can be run forever or for
/// a fixed duration. The unit test runs this for a fixed duration.
class DiskIoMgrStress {
public:
DiskIoMgrStress(int num_disks, int num_threads_per_disk, int num_clients,
bool includes_cancellation);
~DiskIoMgrStress();
/// Run the test for 'sec'. If 0, run forever
void Run(int sec);
static constexpr float ABORT_CHANCE = .10f;
static const int MIN_READ_LEN = 1;
static const int MAX_READ_LEN = 20;
static const int MIN_FILE_LEN = 10;
static const int MAX_FILE_LEN = 1024;
// Make sure this is between MIN/MAX FILE_LEN to test more cases
static const int MIN_READ_BUFFER_SIZE = 64;
static const int MAX_READ_BUFFER_SIZE = 128;
// Maximum bytes to allocate per scan range.
static const int MAX_BUFFER_BYTES_PER_SCAN_RANGE = MAX_READ_BUFFER_SIZE * 3;
static const int CANCEL_READER_PERIOD_MS = 20;
private:
struct Client;
struct File {
std::string filename;
std::string data; // the data in the file, used to validate
};
/// Files used for testing. These are created at startup and recycled
/// during the test
std::vector<File> files_;
/// Root mem tracker.
MemTracker mem_tracker_;
/// io manager
boost::scoped_ptr<DiskIoMgr> io_mgr_;
/// Thread group for reader threads
boost::thread_group readers_;
/// Array of clients
int num_clients_;
std::unique_ptr<Client[]> clients_;
/// Client MemTrackers, one per client.
std::vector<std::unique_ptr<MemTracker>> client_mem_trackers_;
/// Buffer pool clients, one per client.
std::unique_ptr<BufferPool::ClientHandle[]> buffer_pool_clients_;
/// If true, tests cancelling readers
bool includes_cancellation_;
/// Flag to signal that client reader threads should exit
volatile bool shutdown_;
/// Helper to initialize a new reader client, registering a new reader with the
/// io mgr and initializing the scan ranges
void NewClient(int i);
/// Thread running the reader. When the current reader is done (either normally
/// or cancelled), it picks up a new reader
void ClientThread(int client_id);
/// Possibly cancels a random reader.
void CancelRandomReader();
static std::string GenerateRandomData();
};
}
}
#endif