Vector is a high-performance observability data pipeline written in Rust, specifically designed for collecting, transforming, and routing logs, metrics, and traces. To better support the Doris ecosystem, we have developed a dedicated Doris Sink component for Vector, enabling efficient data ingestion from various data sources into Doris for analysis.
wget https://apache-doris-releases.oss-cn-beijing.aliyuncs.com/extension/vector-x86_64-unknown-linux-gnu.tar.gz
cd ${Vector_HOME} ## Choose the appropriate option based on your deployment environment. Multiple options are available in the Makefile. make package-x86_64-unknown-linux-gnu
Doris Sink supports extensive configuration options to meet data writing requirements in different scenarios:
| Parameter | Type | Default | Description |
|---|---|---|---|
type | string | - | Fixed as doris |
inputs | array | - | List of upstream data source names |
endpoints | array<string> | - | Doris FE HTTP/HTTPS addresses, must include protocol and port, e.g., ["https://fe1:8030"] |
database | string/template | - | Target database name, supports Template |
table | string/template | - | Target table name, supports template |
label_prefix | string | "vector" | Stream Load label prefix, final label format is {label_prefix}_{database}_{table}_{timestamp}_{uuid} |
| Parameter | Type | Default | Description |
|---|---|---|---|
auth.strategy | string | "basic" | Authentication strategy, Doris currently only supports Basic Auth |
auth.user | string | - | Doris username |
auth.password | string | - | Doris password, can be used with environment variables or secret management systems |
| Parameter | Type | Default | Description |
|---|---|---|---|
request.concurrency | string/integer | "adaptive" | Controls concurrency strategy, supports "adaptive", "none" (serial), or a positive integer for concurrency limit |
request.timeout_secs | integer | 60 | Timeout for a single Stream Load request (seconds) |
request.rate_limit_duration_secs | integer | 1 | Rate limit time window (seconds) |
request.rate_limit_num | integer | i64::MAX | Number of requests allowed per time window, default is virtually unlimited |
request.retry_attempts | integer | usize::MAX | Maximum retry attempts for Tower middleware, default means unlimited retries |
request.retry_initial_backoff_secs | integer | 1 | Wait time before the first retry (seconds), subsequent retries use Fibonacci backoff |
request.retry_max_duration_secs | integer | 30 | Maximum wait time for a single retry backoff (seconds) |
request.retry_jitter_mode | string | "full" | Retry jitter mode, supports full or none |
Adaptive Concurrency (request.adaptive_concurrency, only effective when request.concurrency = "adaptive")
| Parameter | Type | Default | Description |
|---|---|---|---|
request.adaptive_concurrency.initial_concurrency | integer | 1 | Initial value for adaptive concurrency |
request.adaptive_concurrency.max_concurrency_limit | integer | 200 | Upper limit for adaptive concurrency to prevent overload |
request.adaptive_concurrency.decrease_ratio | float | 0.9 | Reduction ratio used when triggering slowdown |
request.adaptive_concurrency.ewma_alpha | float | 0.4 | Exponential moving average weight for RTT metrics |
request.adaptive_concurrency.rtt_deviation_scale | float | 2.5 | RTT deviation amplification factor, used to ignore normal fluctuations |
Doris Sink uses the encoding block to control event serialization behavior, defaulting to NDJSON (newline-delimited JSON):
| Parameter | Type | Default | Description |
|---|---|---|---|
encoding.codec | string | "json" | Serialization encoding, options include json, text, csv, etc. |
encoding.timestamp_format | string | - | Adjust timestamp output format, supports rfc3339, unix, etc. |
encoding.only_fields / encoding.except_fields | array<string> | - | Control field whitelist or blacklist |
encoding.framing.method | string | auto-inferred | Set when custom framing format is needed, e.g., newline_delimited, character_delimited |
headers)headers is a key-value pair mapping that is passed directly as HTTP headers for Doris Stream Load. You can use all parameters available in stream load headers. Common settings are as follows (all values must be strings):
| Parameter | Type | Default | Description |
|---|---|---|---|
headers.format | string | "json" | Data format, supports json, csv, parquet, etc. |
headers.read_json_by_line | string | "true" | Whether to read JSON line by line (NDJSON) |
headers.strip_outer_array | string | "false" | Whether to remove the outermost array |
headers.column_separator | string | - | CSV column separator (effective when format = csv) |
headers.columns | string | - | Column order for CSV/JSON mapping, e.g., timestamp,client_ip,status_code |
headers.where | string | - | Stream Load where filter condition |
| Parameter | Type | Default | Description |
|---|---|---|---|
batch.max_bytes | integer | 10485760 | Maximum bytes per batch (10 MB) |
batch.max_events | integer/null | null | Maximum events per batch, default is unlimited, primarily controlled by byte count |
batch.timeout_secs | float | 1 | Maximum wait time for a batch (seconds) |
| Parameter | Type | Default | Description |
|---|---|---|---|
max_retries | integer | -1 | Maximum retries at Sink level, -1 means unlimited |
log_request | boolean | false | Whether to print each Stream Load request and response (enable as needed in production) |
compression | - | Not supported | - |
distribution.retry_initial_backoff_secs | integer | 1 | Initial backoff time for endpoint health check recovery (seconds) |
distribution.retry_max_duration_secs | integer | 3600 | Maximum health check backoff duration (seconds) |
tls.verify_certificate | boolean | true | Enable/disable upstream certificate verification |
tls.verify_hostname | boolean | true | Enable/disable hostname verification |
tls.ca_file / tls.crt_file / tls.key_file / tls.key_pass / tls.alpn_protocols / tls.server_name | various | - | Standard Vector TLS client configuration options for custom CA, mutual authentication, or SNI |
acknowledgements.enabled | boolean | false | Enable end-to-end acknowledgements for use with Sources that support acknowledgements |
This example demonstrates TEXT log collection using Doris FE logs as an example.
1. Data
FE log files are typically located at fe/log/fe.log under the Doris installation directory. This is a typical Java application log containing fields such as timestamp, log level, thread name, code position, and log message. In addition to regular logs, there are exception logs with stack traces that span multiple lines. Log collection and storage need to combine the main log and stack trace into a single log entry.
2024-07-08 21:18:01,432 INFO (Statistics Job Appender|61) [StatisticsJobAppender.runAfterCatalogReady():70] Stats table not available, skip
2024-07-08 21:18:53,710 WARN (STATS_FETCH-0|208) [StmtExecutor.executeInternalQuery():3332] Failed to run internal SQL: OriginStatement{originStmt='SELECT * FROM __internal_schema.column_statistics WHERE part_id is NULL ORDER BY update_time DESC LIMIT 500000', idx=0}
org.apache.doris.common.UserException: errCode = 2, detailMessage = tablet 10031 has no queryable replicas. err: replica 10032's backend 10008 does not exist or not alive
at org.apache.doris.planner.OlapScanNode.addScanRangeLocations(OlapScanNode.java:931) ~[doris-fe.jar:1.2-SNAPSHOT]
at org.apache.doris.planner.OlapScanNode.computeTabletInfo(OlapScanNode.java:1197) ~[doris-fe.jar:1.2-SNAPSHOT]
2. Create Table
The table structure includes fields for log generation time, collection time, hostname, log file path, log type, log level, thread name, code position, and log message.
CREATE TABLE `doris_log` ( `log_time` datetime NULL COMMENT 'log content time', `collect_time` datetime NULL COMMENT 'log agent collect time', `host` text NULL COMMENT 'hostname or ip', `path` text NULL COMMENT 'log file path', `type` text NULL COMMENT 'log type', `level` text NULL COMMENT 'log level', `thread` text NULL COMMENT 'log thread', `position` text NULL COMMENT 'log code position', `message` text NULL COMMENT 'log message', INDEX idx_host (`host`) USING INVERTED COMMENT '', INDEX idx_path (`path`) USING INVERTED COMMENT '', INDEX idx_type (`type`) USING INVERTED COMMENT '', INDEX idx_level (`level`) USING INVERTED COMMENT '', INDEX idx_thread (`thread`) USING INVERTED COMMENT '', INDEX idx_position (`position`) USING INVERTED COMMENT '', INDEX idx_message (`message`) USING INVERTED PROPERTIES("parser" = "unicode", "support_phrase" = "true") COMMENT '' ) ENGINE=OLAP DUPLICATE KEY(`log_time`) COMMENT 'OLAP' PARTITION BY RANGE(`log_time`) () DISTRIBUTED BY RANDOM BUCKETS 10 PROPERTIES ( "replication_num" = "1", "dynamic_partition.enable" = "true", "dynamic_partition.time_unit" = "DAY", "dynamic_partition.start" = "-7", "dynamic_partition.end" = "1", "dynamic_partition.prefix" = "p", "dynamic_partition.buckets" = "10", "dynamic_partition.create_history_partition" = "true", "compaction_policy" = "time_series" );
3. Vector Configuration
# ==================== Sources ==================== [sources.fe_log_input] type = "file" include = ["/path/fe/log/fe.log"] start_at_beginning = true max_line_bytes = 102400 ignore_older_secs = 0 fingerprint.strategy = "device_and_inode" # Multi-line log handling - corresponds to Logstash's multiline codec # Lines starting with a timestamp are new logs, other lines are merged with the previous line (handling stack traces) multiline.start_pattern = "^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2},\\d{3}" multiline.mode = "halt_before" multiline.condition_pattern = "^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2},\\d{3}" multiline.timeout_ms = 10000 # ==================== Transforms ==================== # Use grok to parse log content [transforms.parse_log] inputs = ["fe_log_input"] type = "remap" source = ''' # Add type field (corresponds to Logstash's add_field) .type = "fe.log" # Add collect_time (corresponds to Logstash's @timestamp) # Use Asia/Shanghai timezone, consistent with log_time .collect_time = format_timestamp!(.timestamp, format: "%Y-%m-%d %H:%M:%S", timezone: "Asia/Shanghai") # Parse log format: 2024-01-01 12:00:00,123 INFO (thread-name) [position] message # Use (?s) to enable DOTALL mode, allowing .* to match newlines (handling multi-line logs) parsed, err = parse_regex(.message, r'(?s)^(?P<log_time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) (?P<level>[A-Z]+) \((?P<thread>[^\)]+)\) \[(?P<position>[^\]]+)\] (?P<content>.*)') # Extract parsed fields if err == null { .log_time = parsed.log_time .level = parsed.level .thread = parsed.thread .position = parsed.position # Keep the complete original message (including multi-line stack traces) } else { # If parsing fails, set default values to avoid NULL (avoid partition errors) .log_time = .collect_time .level = "UNKNOWN" .thread = "" .position = "" } # Extract host and path (Vector automatically adds these metadata) .host = .host .path = .file ''' # ==================== Sinks ==================== [sinks.doris] inputs = ["parse_log"] type = "doris" endpoints = ["http://fe_ip:http_port"] database = "log_db" table = "doris_log" label_prefix = "vector_fe_log" log_request = true [sinks.doris.auth] user = "root" password = "" strategy = "basic" [sinks.doris.encoding] codec = "json" [sinks.doris.framing] method = "newline_delimited" [sinks.doris.request] concurrency = 10 [sinks.doris.headers] format = "json" read_json_by_line = "true" load_to_single_tablet = "true" [sinks.doris.batch] max_events = 10000 timeout_secs = 3 max_bytes = 100000000
4. Run Vector
${VECTOR_HOME}/bin/vector --config vector_fe_log.toml # When log_request is true, the log will output the request parameters and response results of each Stream Load 2025-11-19T10:14:40.822071Z INFO sink{component_kind="sink" component_id=doris component_type=doris}:request{request_id=82}: vector::sinks::doris::service: Doris stream load response received. status_code=200 OK stream_load_status=Successful response={ "TxnId": 169721, "Label": "vector_fe_log_log_db_doris_log_1763547280791_e2e619ee-4067-4fe8-974e-9f35f0d4e48e", "Comment": "", "TwoPhaseCommit": "false", "Status": "Success", "Message": "OK", "NumberTotalRows": 10, "NumberLoadedRows": 10, "NumberFilteredRows": 0, "NumberUnselectedRows": 0, "LoadBytes": 7301, "LoadTimeMs": 30, "BeginTxnTimeMs": 0, "StreamLoadPutTimeMs": 1, "ReadDataTimeMs": 0, "WriteDataTimeMs": 8, "ReceiveDataTimeMs": 2, "CommitAndPublishTimeMs": 18 } internal_log_rate_limit=true
This example demonstrates JSON log collection using GitHub events archive data.
1. Data
GitHub events archive contains archived data of GitHub user operation events in JSON format. You can download it from https://www.gharchive.org/, for example, downloading data from 15:00 on January 1, 2024.
wget https://data.gharchive.org/2024-01-01-15.json.gz
Below is a sample data entry. The actual data is one entry per line; formatting is added here for display purposes.
{
"id": "37066529221",
"type": "PushEvent",
"actor": {
"id": 46139131,
"login": "Bard89",
"display_login": "Bard89",
"gravatar_id": "",
"url": "https://api.github.com/users/Bard89",
"avatar_url": "https://avatars.githubusercontent.com/u/46139131?"
},
"repo": {
"id": 780125623,
"name": "Bard89/talk-to-me",
"url": "https://api.github.com/repos/Bard89/talk-to-me"
},
"payload": {
"repository_id": 780125623,
"push_id": 17799451992,
"size": 1,
"distinct_size": 1,
"ref": "refs/heads/add_mvcs",
"head": "f03baa2de66f88f5f1754ce3fa30972667f87e81",
"before": "85e6544ede4ae3f132fe2f5f1ce0ce35a3169d21"
},
"public": true,
"created_at": "2024-04-01T23:00:00Z"
}
2. Create Doris Table
CREATE DATABASE log_db;
USE log_db;
CREATE TABLE github_events
(
`created_at` DATETIME,
`id` BIGINT,
`type` TEXT,
`public` BOOLEAN,
`actor` VARIANT,
`repo` VARIANT,
`payload` TEXT,
INDEX `idx_id` (`id`) USING INVERTED,
INDEX `idx_type` (`type`) USING INVERTED,
INDEX `idx_actor` (`actor`) USING INVERTED,
INDEX `idx_host` (`repo`) USING INVERTED,
INDEX `idx_payload` (`payload`) USING INVERTED PROPERTIES("parser" = "unicode", "support_phrase" = "true")
)
ENGINE = OLAP
DUPLICATE KEY(`created_at`)
PARTITION BY RANGE(`created_at`) ()
DISTRIBUTED BY RANDOM BUCKETS 10
PROPERTIES (
"replication_num" = "1",
"inverted_index_storage_format"= "v2",
"compaction_policy" = "time_series",
"enable_single_replica_compaction" = "true",
"dynamic_partition.enable" = "true",
"dynamic_partition.create_history_partition" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-30",
"dynamic_partition.end" = "1",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "10",
"dynamic_partition.replication_num" = "1"
);
3. Vector Configuration
# ==================== Sources ==================== [sources.github_events_reload] type = "file" include = ["/path/2024-01-01-15.json"] read_from = "beginning" ignore_checkpoints = true max_line_bytes = 10485760 ignore_older_secs = 0 line_delimiter = "\n" fingerprint.strategy = "device_and_inode" # ==================== Transforms ==================== # Parse JSON format GitHub Events data, VARIANT type can directly store nested objects [transforms.parse_json] inputs = ["github_events_reload"] type = "remap" source = ''' # Parse JSON data (each line is a complete JSON object) . = parse_json!(.message) # Convert payload field to JSON string (TEXT type) .payload = encode_json(.payload) # Keep only the fields needed for the table . = { "created_at": .created_at, "id": .id, "type": .type, "public": .public, "actor": .actor, "repo": .repo, "payload": .payload } ''' # ==================== Sinks ==================== [sinks.doris] inputs = ["parse_json"] type = "doris" endpoints = ["http://fe_ip:http_port"] database = "log_db" table = "github_events" label_prefix = "vector_github_events" log_request = true [sinks.doris.auth] user = "root" password = "" strategy = "basic" [sinks.doris.encoding] codec = "json" [sinks.doris.framing] method = "newline_delimited" [sinks.doris.request] concurrency = 10 [sinks.doris.headers] format = "json" read_json_by_line = "true" load_to_single_tablet = "true" [sinks.doris.batch] max_events = 10000 timeout_secs = 3 max_bytes = 100000000
Use the following command to start the Vector service:
vector --config vector_config.toml