In the context of increasingly refined data management needs, scheduled tasks play a crucial role. They are typically applied in the following scenarios:
In earlier versions of Apache Doris, meeting the above requirements often depended on external scheduling systems, such as business code-based scheduling or third-party scheduling tools and distributed scheduling platforms. However, these external systems might not meet Doris's flexible scheduling strategies and resource management needs. Additionally, failures in external scheduling systems can increase business risks and require extra maintenance time and effort.
To address these issues, Apache Doris introduced the Job Scheduler feature in version 2.1, enabling autonomous task scheduling with precision down to the second.
This feature ensures data import completeness and consistency while allowing users to flexibly and conveniently adjust scheduling strategies. Reducing reliance on external systems also decreases system failure risks and maintenance costs, providing a more unified and reliable user experience.
Doris Job Scheduler is a task management system that runs based on preset schedules, triggering predefined operations at specific times or intervals for automated task execution. Key features include:
Related Documentation: CREATE-JOB
A valid Job statement must include the following components:
CREATE JOB job_name ON SCHEDULE schedule [COMMENT 'string'] DO execute_sql; schedule: { AT timestamp | EVERY interval [STARTS timestamp ] [ENDS timestamp ] } interval: quantity { WEEK |DAY | HOUR | MINUTE}
CREATE JOB my_job ON SCHEDULE EVERY 1 MINUTE DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2;
This creates a job named “my_job” that executes every minute, importing data from db2.tbl2 into db1.tbl1.
Creating a One-Time Job:
CREATE JOB my_job ON SCHEDULE AT '2025-01-01 00:00:00' DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2;
This creates a job named “my_job” that executes once at 2025-01-01 00:00:00, importing data from db2.tbl2 into db1.tbl1.
Creating a Periodic Job Without End Time:
CREATE JOB my_job ON SCHEDULE EVERY 1 DAY STARTS '2025-01-01 00:00:00' DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2 WHERE create_time >= days_add(now(), -1);
This creates a job named “my_job” that starts on 2025-01-01 00:00:00 and executes every day, importing data from db2.tbl2 into db1.tbl1.
Creating a Periodic Job With End Time:
CREATE JOB my_job ON SCHEDULE EVERY 1 DAY STARTS '2025-01-01 00:00:00' ENDS '2026-01-01 00:10:00' DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2 WHERE create_time >= days_add(now(), -1);
This creates a job named “my_job” that starts on 2025-01-01 00:00:00, executes every day, and ends on 2026-01-01 00:10:00, importing data from db2.tbl2 into db1.tbl1.
Using Job for Asynchronous Execution:
CREATE JOB my_job ON SCHEDULE AT current_timestamp DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2;
Since jobs in Doris are created as synchronous tasks but executed asynchronously, this example sets the job as a one-time task with the start time as the current time, suitable for asynchronous tasks like insert into select.
For instance, in an e-commerce scenario, users often need to extract business data from MySQL and synchronize it to Doris for data analysis, supporting precise marketing activities. The Job Scheduler, combined with Multi Catalog capabilities, can efficiently accomplish periodic data synchronization across data sources.
CREATE TABLE IF NOT EXISTS user.activity ( `user_id` INT NOT NULL, `date` DATE NOT NULL, `city` VARCHAR(20), `age` SMALLINT, `sex` TINYINT, `last_visit_date` DATETIME DEFAULT '1970-01-01 00:00:00', `cost` BIGINT DEFAULT '0', `max_dwell_time` INT DEFAULT '0', `min_dwell_time` INT DEFAULT '99999' ); INSERT INTO user.activity VALUES (10000, '2017-10-01', 'Beijing', 20, 0, '2017-10-01 06:00:00', 20, 10, 10), (10000, '2017-10-01', 'Beijing', 20, 0, '2017-10-01 07:00:00', 15, 2, 2), (10001, '2017-10-01', 'Beijing', 30, 1, '2017-10-01 17:05:00', 2, 22, 22), (10002, '2017-10-02', 'Shanghai', 20, 1, '2017-10-02 12:59:00', 200, 5, 5), (10003, '2017-10-02', 'Guangzhou', 32, 0, '2017-10-02 11:20:00', 30, 11, 11), (10004, '2017-10-01', 'Shenzhen', 35, 0, '2017-10-01 10:00:00', 100, 3, 3), (10004, '2017-10-03', 'Shenzhen', 35, 0, '2017-10-03 10:20:00', 11, 6, 6);
| user_id | date | city | age | sex | last_visit_date | cost | max_dwell_time | min_dwell_time |
|---|---|---|---|---|---|---|---|---|
| 10000 | 2017-10-01 | Beijing | 20 | 0 | 2017-10-01 06:00 | 20 | 10 | 10 |
| 10000 | 2017-10-01 | Beijing | 20 | 0 | 2017-10-01 07:00 | 15 | 2 | 2 |
| 10001 | 2017-10-01 | Beijing | 30 | 1 | 2017-10-01 17:05:45 | 2 | 22 | 22 |
| 10002 | 2017-10-02 | Shanghai | 20 | 1 | 2017-10-02 12:59:12 | 200 | 5 | 5 |
| 10003 | 2017-10-02 | Guangzhou | 32 | 0 | 2017-10-02 11:20:00 | 30 | 11 | 11 |
| 10004 | 2017-10-01 | Shenzhen | 35 | 0 | 2017-10-01 10:00:15 | 100 | 3 | 3 |
| 10004 | 2017-10-03 | Shenzhen | 35 | 0 | 2017-10-03 10:20:22 | 11 | 6 | 6 |
Example Workflow
CREATE TABLE IF NOT EXISTS user_activity ( `user_id` LARGEINT NOT NULL COMMENT "User ID", `date` DATE NOT NULL COMMENT "Data import date", `city` VARCHAR(20) COMMENT "User city", `age` SMALLINT COMMENT "User age", `sex` TINYINT COMMENT "User gender", `last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "Last visit date", `cost` BIGINT SUM DEFAULT "0" COMMENT "Total spending", `max_dwell_time` INT MAX DEFAULT "0" COMMENT "Max dwell time", `min_dwell_time` INT MIN DEFAULT "99999" COMMENT "Min dwell time" ) AGGREGATE KEY(`user_id`, `date`, `city`, `age`, `sex`) DISTRIBUTED BY HASH(`user_id`) BUCKETS 1 PROPERTIES ( "replication_allocation" = "tag.location.default: 1" );
CREATE CATALOG activity PROPERTIES ( "type"="jdbc", "user"="root", "password"="123456", "jdbc_url" = "jdbc:mysql://127.0.0.1:3306/user?useSSL=false", "driver_url" = "mysql-connector-java-5.1.49.jar", "driver_class" = "com.mysql.jdbc.Driver" );
CREATE JOB one_time_load_job ON SCHEDULE AT '2024-08-10 03:00:00' DO INSERT INTO user_activity SELECT * FROM activity.user_activity;
CREATE JOB schedule_load ON SCHEDULE EVERY 1 DAY DO INSERT INTO user_activity SELECT * FROM activity.user_activity WHERE last_visit_date >= days_add(now(), -1);
Efficient scheduling often entails substantial resource consumption, especially with high-precision scheduling. Traditional implementations using Java's built-in scheduling capabilities or other libraries may have significant issues with precision and memory usage. To ensure performance while minimizing resource usage, the TimingWheel algorithm is combined with Disruptor to achieve second-level task scheduling. Technical Details
Using Netty’s HashedWheelTimer to implement the time wheel algorithm, the Job Manager periodically (default every ten minutes) schedules future events into the time wheel. Disruptor constructs a single-producer, multi-consumer model to ensure efficient task triggering without excessive resource usage. The time wheel only triggers tasks and does not execute them directly. For immediate tasks, they are submitted to the respective execution thread pool.
For single-execution events, the event definition is deleted after scheduling. For periodic events, the time wheel’s system events periodically pull the next cycle's execution tasks. This avoids clustering tasks in one bucket, reducing meaningless traversal and improving processing efficiency. For transactional tasks, the Job Scheduler ensures task execution results align with expectations through strong association and callback mechanisms, maintaining data integrity and consistency. Conclusion
Doris Job Scheduler is a powerful and flexible task scheduling tool essential for data processing. Beyond common scenarios like data lake analysis and internal ETL, it plays a crucial role in implementing asynchronous materialized views. Asynchronous materialized views store precomputed result sets, and their update frequency is closely tied to source table changes. Frequent updates to source table data necess