| --- |
| { |
| "title": "Fluent Bit Doris Output Plugin", |
| "language": "zh-CN" |
| } |
| --- |
| |
| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| --> |
| |
| [Fluent Bit](https://fluentbit.io/) 是一个快速的日志处理器和转发器,它支持自定义输出插件将数据写入存储系统,Fluent Bit Doris Output Plugin 是输出到 Doris 的插件。 |
| |
| Fluent Bit Doris Output Plugin 调用 [Doris Stream Load](../data-operate/import/stream-load-manual.md) HTTP 接口将数据实时写入 Doris,提供多线程并发,失败重试,自定义 Stream Load 格式和参数,输出写入速度等能力。 |
| |
| 使用 Fluent Bit Doris Output Plugin 主要有三个步骤: |
| 1. 下载或编译包含 Doris Output Plugin 的 Fluent Bit 二进制程序 |
| 2. 配置 Fluent Bit 输出地址和其他参数 |
| 3. 启动 Fluent Bit 将数据实时写入 Doris |
| |
| ## 安装(alpha 版本) |
| |
| ### 从官网下载 |
| |
| https://github.com/joker-star-l/fluent-bit/releases/download/v3.1.9-alpha/fluent-bit |
| |
| ### 从源码编译 |
| |
| 克隆 https://github.com/joker-star-l/fluent-bit 的 dev 分支,在 build/ 目录下执行 |
| |
| ``` |
| cmake -DFLB_RELEASE=ON .. |
| make |
| ``` |
| |
| 编译产物为 build/bin/fluent-bit。 |
| |
| ## 参数配置 |
| |
| Fluent Bit Doris output plugin 的配置如下: |
| |
| 配置 | 说明 |
| --- | --- |
| `host` | Stream Load HTTP host |
| `port` | Stream Load HTTP port |
| `user` | Doris 用户名,该用户需要有doris对应库表的导入权限 |
| `password` | Doris 用户的密码 |
| `database` | 要写入的 Doris 库名 |
| `table` | 要写入的 Doris 表名 |
| `label_prefix` | Doris Stream Load Label 前缀,最终生成的 Label 为 *{label_prefix}\_{timestamp}\_{uuid}* ,默认值是 fluentbit, 如果设置为 false 则不会添加 Label |
| `time_key` | 数据中要添加的时间戳列的名称,默认值是 date, 如果设置为 false 则不会添加该列 |
| `header` | Doris Stream Load 的 header 参数,可以设置多个 |
| `log_request` | 日志中是否输出 Doris Stream Load 请求和响应元数据,用于排查问题,默认为 true |
| `log_progress_interval` | 日志中输出速度的时间间隔,单位是秒,默认为 10,设置为 0 可以关闭这种日志 |
| `retry_limit` | Doris Stream Load 请求失败重试次数,默认为 1, 如果设置为 false 则不限制重试次数 |
| `workers` | 执行 Doris Stream Load 的 worker 数量,默认为 2 |
| |
| ## 使用示例 |
| |
| ### TEXT 日志采集示例 |
| |
| 该示例以 Doris FE 的日志为例展示 TEXT 日志采集。 |
| |
| **1. 数据** |
| |
| FE 日志文件一般位于 Doris 安装目录下的 `fe/log/fe.log` 文件,是典型的 Java 程序日志,包括时间戳,日志级别,线程名,代码位置,日志内容等字段。不仅有正常的日志,还有带 `stacktrace` 的异常日志,`stacktrace` 是跨行的,日志采集存储需要把主日志和 `stacktrace` 组合成一条日志。 |
| |
| ```java |
| 2024-07-08 21:18:01,432 INFO (Statistics Job Appender|61) [StatisticsJobAppender.runAfterCatalogReady():70] Stats table not available, skip |
| 2024-07-08 21:18:53,710 WARN (STATS_FETCH-0|208) [StmtExecutor.executeInternalQuery():3332] Failed to run internal SQL: OriginStatement{originStmt='SELECT * FROM __internal_schema.column_statistics WHERE part_id is NULL ORDER BY update_time DESC LIMIT 500000', idx=0} |
| org.apache.doris.common.UserException: errCode = 2, detailMessage = tablet 10031 has no queryable replicas. err: replica 10032's backend 10008 does not exist or not alive |
| at org.apache.doris.planner.OlapScanNode.addScanRangeLocations(OlapScanNode.java:931) ~[doris-fe.jar:1.2-SNAPSHOT] |
| at org.apache.doris.planner.OlapScanNode.computeTabletInfo(OlapScanNode.java:1197) ~[doris-fe.jar:1.2-SNAPSHOT] |
| ``` |
| |
| **2. 建表** |
| |
| 表结构包括日志的产生时间,采集时间,主机名,日志文件路径,日志类型,日志级别,线程名,代码位置,日志内容等字段。 |
| |
| ```sql |
| CREATE TABLE `doris_log` ( |
| `log_time` datetime NULL COMMENT 'log content time', |
| `collect_time` datetime NULL COMMENT 'log agent collect time', |
| `host` text NULL COMMENT 'hostname or ip', |
| `path` text NULL COMMENT 'log file path', |
| `type` text NULL COMMENT 'log type', |
| `level` text NULL COMMENT 'log level', |
| `thread` text NULL COMMENT 'log thread', |
| `position` text NULL COMMENT 'log code position', |
| `message` text NULL COMMENT 'log message', |
| INDEX idx_host (`host`) USING INVERTED COMMENT '', |
| INDEX idx_path (`path`) USING INVERTED COMMENT '', |
| INDEX idx_type (`type`) USING INVERTED COMMENT '', |
| INDEX idx_level (`level`) USING INVERTED COMMENT '', |
| INDEX idx_thread (`thread`) USING INVERTED COMMENT '', |
| INDEX idx_position (`position`) USING INVERTED COMMENT '', |
| INDEX idx_message (`message`) USING INVERTED PROPERTIES("parser" = "unicode", "support_phrase" = "true") COMMENT '' |
| ) ENGINE=OLAP |
| DUPLICATE KEY(`log_time`) |
| COMMENT 'OLAP' |
| PARTITION BY RANGE(`log_time`) () |
| DISTRIBUTED BY RANDOM BUCKETS 10 |
| PROPERTIES ( |
| "replication_num" = "1", |
| "dynamic_partition.enable" = "true", |
| "dynamic_partition.time_unit" = "DAY", |
| "dynamic_partition.start" = "-7", |
| "dynamic_partition.end" = "1", |
| "dynamic_partition.prefix" = "p", |
| "dynamic_partition.buckets" = "10", |
| "dynamic_partition.create_history_partition" = "true", |
| "compaction_policy" = "time_series" |
| ); |
| ``` |
| |
| **3. 配置** |
| |
| Fluent Bit 日志采集的配置文件如下,`doris_log.conf` 用于定义 ETL 的各个部分组件,`parsers.conf` 用于定义不同的日志解析器。 |
| |
| doris_log.conf: |
| |
| ```java |
| # config for Fluent Bit service |
| [SERVICE] |
| log_level info |
| # parsers file |
| parsers_file parsers.conf |
| |
| # use input tail |
| [INPUT] |
| name tail |
| path /path/to/your/log |
| # add log file name to the record, key is 'path' |
| path_key path |
| # set multiline parser |
| multiline.parser multiline_java |
| |
| # parse log |
| [FILTER] |
| match * |
| name parser |
| key_name log |
| parser fe_log |
| reserve_data true |
| |
| # add host info |
| [FILTER] |
| name sysinfo |
| match * |
| # add hostname to the record, key is 'host' |
| hostname_key host |
| |
| # output to doris |
| [OUTPUT] |
| name doris |
| match * |
| host fehost |
| port feport |
| user your_username |
| password your_password |
| database your_db |
| table your_table |
| # add 'collect_time' to the record |
| time_key collect_time |
| # 'collect_time' is timestamp, change it to datatime |
| header columns collect_time=from_unixtime(collect_time) |
| log_request true |
| log_progress_interval 10 |
| ``` |
| |
| parsers.conf: |
| |
| ```java |
| [MULTILINE_PARSER] |
| name multiline_java |
| type regex |
| flush_timeout 1000 |
| # Regex rules for multiline parsing |
| # --------------------------------- |
| # |
| # configuration hints: |
| # |
| # - first state always has the name: start_state |
| # - every field in the rule must be inside double quotes |
| # |
| # rules | state name | regex pattern | next state name |
| # --------|----------------|---------------|----------------- |
| rule "start_state" "/(^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2})(.*)/" "cont" |
| rule "cont" "/(^(?![0-9]{4}-[0-9]{2}-[0-9]{2}))(.*)/" "cont" |
| |
| |
| [PARSER] |
| name fe_log |
| format regex |
| # parse and add 'log_time', 'level', 'thread', 'position', 'message' to the record |
| regex ^(?<log_time>[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}) (?<level>[^ ]+) \((?<thread>[^\)]+)\) \[(?<position>[^\]]+)\] (?<message>(\n|.)*)\n$ |
| ``` |
| |
| **4. 运行 Fluent Bit** |
| |
| ```java |
| fluent-bit -c doris_log.conf |
| |
| # log stream load response |
| |
| [2024/10/31 18:39:55] [ info] [output:doris:doris.1] 127.0.0.1:8040, HTTP status=200 |
| { |
| "TxnId": 32155, |
| "Label": "fluentbit_1730371195_91cca1aa-c15f-45d2-b503-fe7d2e839c2a", |
| "Comment": "", |
| "TwoPhaseCommit": "false", |
| "Status": "Success", |
| "Message": "OK", |
| "NumberTotalRows": 1, |
| "NumberLoadedRows": 1, |
| "NumberFilteredRows": 0, |
| "NumberUnselectedRows": 0, |
| "LoadBytes": 836, |
| "LoadTimeMs": 298, |
| "BeginTxnTimeMs": 0, |
| "StreamLoadPutTimeMs": 3, |
| "ReadDataTimeMs": 0, |
| "WriteDataTimeMs": 268, |
| "CommitAndPublishTimeMs": 25 |
| } |
| |
| # log speed info |
| |
| [2024/10/31 18:40:13] [ info] [output:doris:doris.1] total 0 MB 2 ROWS, total speed 0 MB/s 0 R/s, last 10 seconds speed 0 MB/s 0 R/s |
| ``` |
| |
| ### JSON 日志采集示例 |
| |
| 该样例以 github events archive 的数据为例展示 JSON 日志采集。 |
| |
| **1. 数据** |
| |
| github events archive 是 github 用户操作事件的归档数据,格式是 JSON,可以从 https://www.gharchive.org/ 下载,比如下载 2024年1月1日15点的数据。 |
| |
| ```shell |
| wget https://data.gharchive.org/2024-01-01-15.json.gz |
| ``` |
| |
| 下面是一条数据样例,实际一条数据一行,这里为了方便展示进行了格式化。 |
| |
| ```jason |
| { |
| "id": "37066529221", |
| "type": "PushEvent", |
| "actor": { |
| "id": 46139131, |
| "login": "Bard89", |
| "display_login": "Bard89", |
| "gravatar_id": "", |
| "url": "https://api.github.com/users/Bard89", |
| "avatar_url": "https://avatars.githubusercontent.com/u/46139131?" |
| }, |
| "repo": { |
| "id": 780125623, |
| "name": "Bard89/talk-to-me", |
| "url": "https://api.github.com/repos/Bard89/talk-to-me" |
| }, |
| "payload": { |
| "repository_id": 780125623, |
| "push_id": 17799451992, |
| "size": 1, |
| "distinct_size": 1, |
| "ref": "refs/heads/add_mvcs", |
| "head": "f03baa2de66f88f5f1754ce3fa30972667f87e81", |
| "before": "85e6544ede4ae3f132fe2f5f1ce0ce35a3169d21" |
| }, |
| "public": true, |
| "created_at": "2024-04-01T23:00:00Z" |
| } |
| ``` |
| |
| **2. 建表** |
| |
| ```sql |
| CREATE TABLE github_events |
| ( |
| `created_at` DATETIME, |
| `id` BIGINT, |
| `type` TEXT, |
| `public` BOOLEAN, |
| `actor` VARIANT, |
| `repo` VARIANT, |
| `payload` TEXT, |
| INDEX `idx_id` (`id`) USING INVERTED, |
| INDEX `idx_type` (`type`) USING INVERTED, |
| INDEX `idx_actor` (`actor`) USING INVERTED, |
| INDEX `idx_host` (`repo`) USING INVERTED, |
| INDEX `idx_payload` (`payload`) USING INVERTED PROPERTIES("parser" = "unicode", "support_phrase" = "true") |
| ) |
| ENGINE = OLAP |
| DUPLICATE KEY(`created_at`) |
| PARTITION BY RANGE(`created_at`) () |
| DISTRIBUTED BY RANDOM BUCKETS 10 |
| PROPERTIES ( |
| "replication_num" = "1", |
| "compaction_policy" = "time_series", |
| "enable_single_replica_compaction" = "true", |
| "dynamic_partition.enable" = "true", |
| "dynamic_partition.create_history_partition" = "true", |
| "dynamic_partition.time_unit" = "DAY", |
| "dynamic_partition.start" = "-30", |
| "dynamic_partition.end" = "1", |
| "dynamic_partition.prefix" = "p", |
| "dynamic_partition.buckets" = "10", |
| "dynamic_partition.replication_num" = "1" |
| ); |
| ``` |
| |
| **3. 配置** |
| |
| 和之前 TEXT 日志采集相比,该配置文件没有使用 FILTER,因为不需要额外的处理转换。 |
| |
| github_events.conf: |
| |
| ``` |
| [SERVICE] |
| log_level info |
| parsers_file github_parsers.conf |
| |
| [INPUT] |
| name tail |
| parser github |
| path /path/to/your/log |
| |
| [OUTPUT] |
| name doris |
| match * |
| host fehost |
| port feport |
| user your_username |
| password your_password |
| database your_db |
| table your_table |
| time_key false |
| log_request true |
| log_progress_interval 10 |
| ``` |
| |
| github_parsers.conf: |
| ``` |
| [PARSER] |
| name github |
| format json |
| ``` |
| |
| **4. 运行 Fluent Bit** |
| |
| ``` |
| fluent-bit -c github_events.conf |
| ``` |