@zhangyue19921010
@leesf
JIRA: https://issues.apache.org/jira/browse/HUDI-3963
New option which use Lock-Free Message Queue called Disruptor as inner message queue to improve hoodie writing performance and optimize writing efficiency.
Disruptor linked: https://lmax-exchange.github.io/disruptor/user-guide/index.html#_introduction
Based on master branch, hoodie consumes upstream data (Kafka or S3 files) into the lake is a standard production-consumption model. Currently, hoodie uses LinkedBlockingQueue
as a inner message queue between Producer and Consumer.
However, this lock model may become the bottleneck of application throughput when data volume is much larger. What's worse is that even if we increase the number of the executors, it is still difficult to improve the throughput.
In other words, users may encounter throughput bottlenecks when writing data into hudi in some scenarios, for example the schema is relatively simple, but the volume of data is pretty large or users observed insufficient data throughput and low cpu usage, etc.
This RFC is to solve the performance bottleneck problem caused by locking in some large data volume scenarios
This RFC provides a new option which use Lock-Free Message Queue called Disruptor as inner message queue to The advantages are that:
This RFC mainly does two things: One is to do the code abstraction about hoodie consuming upstream data and writing into hudi format. The other thing is to implement disruptor based producer, inner message queue executor and message handler based on this new abstraction.
Firstly, briefly introduce code abstraction(take [based-master]
as current logic/option, and [rfc-new]
for new option provided by this rfc)
HoodieMessageQueue
: Hold the inner message queue, control the initialization of the inner message queue, control its life cycle, and provide a unified insert api, speed limit, memory control and other enrich functions. The current implementations are as follows:BoundedInMemoryQueue
which hold a LinkedBlockingQueue
as inner message queue.DisruptorMessageQueue
which hold a lock free ringbuffer called disruptor as inner message queue.HoodieProducer
: Controls the producer behaviors and life cycle of hoodie reading upstream data and writing it into the inner message queue. The current implementations are as follows:BoundedInMemoryQueueProducer
Producer for BoundedInMemoryQueue
IteratorBasedQueueProducer
Iterator based producer which pulls entry from iterator and produces items into the LinkedBlockingQueue
FunctionBasedQueueProducer
Buffer producer which allows custom functions to insert entries to the LinkedBlockingQueue
DisruptorBasedProducer
Producer for DisruptorMessageQueue
IteratorBasedDisruptorProducer
Iterator based producer which pulls entry from iterator and produces items into the DisruptorMessageQueue
FunctionBasedDisruptorQueueProducer
Buffer producer which allows custom functions to insert entries to the DisruptorMessageQueue
HoodieConsumer
Control hoodie to read the data from inner message queue and write them as hudi data files, and execute callback function. The current implementations are as follows:BoundedInMemoryQueueConsumer
Consume entries directly from LinkedBlockingQueue
and execute callback function.DisruptorMessageHandler
which hold the same BoundedInMemoryQueueConsumer
instant mentioned before. Use DisruptorMessageHandler
extracts each record in disruptor then using BoundedInMemoryQueueConsumer
writing hudi data file.HoodieExecutor
: Executor which orchestrates concurrent producers and consumers communicating through a inner message queue. The current implementations are as follows:BoundedInMemoryExecutor
takes as input the size limit, queue producer(s), consumer and transformer and exposes API to orchestrate concurrent execution of these actors communicating through a central LinkedBlockingQueue.DisruptorExecutor
Control the initialization, life cycle of the disruptor, and coordinate the work of the producer, consumer, and ringbuffer related to the disruptor, etc.Secondly, This rfc implements disruptor related producers, message handlers and executor which use this lock-free message queue based on the above abstraction. Some compents are introduced in the first part. In this phase, we discuss how to use disruptor in hoodie writing stages.
The Disruptor is a library that provides a concurrent ring buffer data structure. It is designed to provide a low-latency, high-throughput work queue in asynchronous event processing architectures.
We use the Disruptor multi-producer single-consumer working model:
DisruptorPublisher
to register producers into Disruptor and control the produce behaviors including life cycle.DisruptorMessageHandler
to register consumers into Disruptor and write consumption data from disruptor to hudi data file. For example we will clear out the event after processing it to avoid unnecessary memory and GC pressureHoodieDisruptorEvent
as the carrier of the hoodie messageHoodieDisruptorEventFactory
: Pre-populate all the hoodie events to fill the RingBuffer. We can use HoodieDisruptorEventFactory
to create HoodieDisruptorEvent
storing the data for sharing during exchange or parallel coordination of an event.Finally, let me introduce the new parameters:
hoodie.write.executor.type
: Choose the type of executor to use, which orchestrates concurrent producers and consumers communicating through a inner message queue. Default value is BOUNDED_IN_MEMORY_EXECUTOR
which used a bounded in-memory queue LinkedBlockingQueue
. Also users could use DISRUPTOR_EXECUTOR
, which use disruptor as a lock free message queue to gain better writing performance. Although DISRUPTOR_EXECUTOR
is still an experimental feature.hoodie.write.buffer.size
: The size of the Disruptor Executor ring buffer, must be power of 2. Also the default/recommended value is 1024.hoodie.write.wait.strategy
: Used for disruptor wait strategy. The Wait Strategy determines how a consumer will wait for events to be placed into the Disruptor by a producer. More details are available in followed table about being optionally lock-free.Alternative Wait Strategies
The default WaitStrategy used by the Disruptor is the BlockingWaitStrategy
. Internally the BlockingWaitStrategy
uses a typical lock and condition variable to handle thread wake-up. The BlockingWaitStrategy is the slowest of the available wait strategies, but is the most conservative with the respect to CPU usage and will give the most consistent behaviour across the widest variety of deployment options.
Knowledge of the deployed system can allow for additional performance by choosing a more appropriate wait strategy:
SleepingWaitStrategy
:
Like the BlockingWaitStrategy
the SleepingWaitStrategy
it attempts to be conservative with CPU usage by using a simple busy wait loop. The difference is that the SleepingWaitStrategy
uses a call to LockSupport.parkNanos(1)
in the middle of the loop. On a typical Linux system this will pause the thread for around 60µs.
This has the benefits that the producing thread does not need to take any action other increment the appropriate counter and that it does not require the cost of signalling a condition variable. However, the mean latency of moving the event between the producer and consumer threads will be higher.
It works best in situations where low latency is not required, but a low impact on the producing thread is desired. A common use case is for asynchronous logging.
YieldingWaitStrategy
The YieldingWaitStrategy
is one of two WaitStrategies that can be use in low-latency systems. It is designed for cases where there is the option to burn CPU cycles with the goal of improving latency.
The YieldingWaitStrategy
will busy spin, waiting for the sequence to increment to the appropriate value. Inside the body of the loop Thread#yield()
will be called allowing other queued threads to run.
This is the recommended wait strategy when you need very high performance, and the number of EventHandler
threads is lower than the total number of logical cores, e.g. you have hyper-threading enabled.
BusySpinWaitStrategy
The BusySpinWaitStrategy
is the highest performing WaitStrategy. Like the YieldingWaitStrategy
, it can be used in low-latency systems, but puts the highest constraints on the deployment environment.
This wait strategy should only be used if the number of EventHandler
threads is lower than the number of physical cores on the box, e.g. hyper-threading should be disabled.
Default executor is BOUNDED_IN_MEMORY_EXECUTOR
which use a bounded in-memory queue using LinkedBlockingQueue
same as master.
So there is no impact on existing users.
TestDisruptorMessageQueue
and TestDisruptorExecutionInSpark
to guard above logic, also validate data correctness.BoundInMemoryExecutorBenchmark
benchmark with BoundInMemoryExecutor(based-master) and DisruptorExecutor(new option)For now, this DisruptorExecutor is supported for spark insert and spark bulk insert operations as an experimental feature. So that there're also several further steps need to be done:
DirectExecutor
which use no inner message queue and read messages from iterator directly ,writing into hudi(remove the producer/consumer at all).