blob: 7f33736ab05a4b2c63a748cc9f36ac27b5cc5e78 [file] [log] [blame] [view]
# 事件监听器
## 介绍
SeaTunnel提供了丰富的事件监听器功能,用于管理数据同步时的状态。此功能在需要监听任务运行状态时十分重要(`org.apache.seatunnel.api.event`)。本文档将指导您如何使用这些参数并有效地利用他们。
## 支持的引擎
> SeaTunnel Zeta<br/>
> Flink<br/>
> Spark<br/>
## API
事件(event)API的定义在 `org.apache.seatunnel.api.event`包中。
### Event Data API
- `org.apache.seatunnel.api.event.Event` - 事件数据的接口。
- `org.apache.seatunnel.api.event.EventType` - 事件数据的枚举值。
#### EventType 枚举说明
`EventType`枚举定义了系统中所有可能的事件类型,主要包括:
| 事件类型 | 说明 | 关联事件类 |
|--------------------------------|----------|-------------------------------|
| `JOB_STATUS` | 作业状态变更事件 | `JobStateEvent` |
| `SCHEMA_CHANGE_UPDATE_COLUMNS` | 表结构更新事件 | `AlterTableColumnsEvent` |
| `SCHEMA_CHANGE_ADD_COLUMN` | 表添加列事件 | `AlterTableAddColumnEvent` |
| `SCHEMA_CHANGE_DROP_COLUMN` | 表删除列事件 | `AlterTableDropColumnEvent` |
| `SCHEMA_CHANGE_MODIFY_COLUMN` | 表修改列事件 | `AlterTableModifyColumnEvent` |
| `READER_OPEN` | 读取器打开事件 | `ReaderOpenEvent` |
| `READER_CLOSE` | 读取器关闭事件 | `ReaderCloseEvent` |
| `WRITER_OPEN` | 写入器打开事件 | `WriterOpenEvent` |
| `WRITER_CLOSE` | 写入器关闭事件 | `WriterCloseEvent` |
> 注意:不同事件类型对应不同的事件数据结构,在自定义事件处理器时需通过`event.getEventType()`进行类型判断,以确保类型安全转换。
### Event Listener API
您可以自定义事件处理器,例如将事件发送到外部系统。
- `org.apache.seatunnel.api.event.EventHandler` - 事件处理器的接口,SPI将会自动从类路径中加载子类。
### Event Collect API
- `org.apache.seatunnel.api.source.SourceSplitEnumerator` - 在`SourceSplitEnumerator`加载事件监听器。
```java
package org.apache.seatunnel.api.source;
public interface SourceSplitEnumerator {
interface Context {
/**
* Get the {@link org.apache.seatunnel.api.event.EventListener} of this enumerator.
*
* @return
*/
EventListener getEventListener();
}
}
```
- `org.apache.seatunnel.api.source.SourceReader` - 在`SourceReader`加载事件监听器。
```java
package org.apache.seatunnel.api.source;
public interface SourceReader {
interface Context {
/**
* Get the {@link org.apache.seatunnel.api.event.EventListener} of this reader.
*
* @return
*/
EventListener getEventListener();
}
}
```
- `org.apache.seatunnel.api.sink.SinkWriter` - 在`SinkWriter`加载事件监听器。
```java
package org.apache.seatunnel.api.sink;
public interface SinkWriter {
interface Context {
/**
* Get the {@link org.apache.seatunnel.api.event.EventListener} of this writer.
*
* @return
*/
EventListener getEventListener();
}
}
```
## 设置监听器
您需要设置引擎配置以使用事件监听器功能。
### Zeta 引擎
配置样例(seatunnel.yaml):
```
seatunnel:
engine:
event-report-http:
url: "http://example.com:1024/event/report"
headers:
Content-Type: application/json
```
### Flink 引擎
您可以定义 `org.apache.seatunnel.api.event.EventHandler` 接口并添加到类路径,SPI会自动加载。
支持的flink版本: 1.14.0+
样例: `org.apache.seatunnel.api.event.LoggingEventHandler`
### Spark 引擎
您可以定义 `org.apache.seatunnel.api.event.EventHandler` 接口并添加到类路径,SPI会自动加载。
## 自定义事件处理器实现步骤
下面以 `JobStateEvent` 为例,介绍如何实现一个自定义事件处理器,您可以根据需要扩展此方法以处理其他类型的事件。
### 1. 添加依赖
在项目 `pom.xml` 中引入必要依赖:
```xml
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api</artifactId>
<version>${seatunnel.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-engine-common</artifactId>
<version>${seatunnel.version}</version>
<scope>provided</scope>
</dependency>
```
> 注意:需将 `${seatunnel.version}` 替换为实际使用的 SeaTunnel 版本。
### 2. 实现事件处理器
自定义类实现 `org.apache.seatunnel.api.event.EventHandler` 接口,并重写 `handle` 方法,针对需要处理的事件类型进行业务逻辑处理。
**核心逻辑**:通过 `event.getEventType()` 过滤事件类型——由于 SeaTunnel 引擎会分发多种类型的事件,需显式判断事件类型,以确保仅处理目标事件。
```java
import lombok.extern.slf4j.Slf4j;
import org.apache.seatunnel.api.event.Event;
import org.apache.seatunnel.api.event.EventHandler;
import org.apache.seatunnel.api.event.EventType;
import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.common.job.JobStateEvent;
import org.apache.seatunnel.api.event.schema.AlterTableAddColumnEvent;
import org.apache.seatunnel.api.event.source.ReaderOpenEvent;
import org.apache.seatunnel.api.event.sink.WriterCloseEvent;
/**
* 自定义多类型事件处理器示例,包含多种事件的处理逻辑
*/
@Slf4j
public class CustomMultiEventHandler implements EventHandler {
@Override
public void handle(Event event) {
// 根据事件类型进行不同处理
EventType eventType = event.getEventType();
switch (eventType) {
case JOB_STATUS:
handleJobStateEvent((JobStateEvent) event);
break;
case SCHEMA_CHANGE_ADD_COLUMN:
handleAddColumnEvent((AlterTableAddColumnEvent) event);
break;
case READER_OPEN:
handleReaderOpenEvent((ReaderOpenEvent) event);
break;
case WRITER_CLOSE:
handleWriterCloseEvent((WriterCloseEvent) event);
break;
// 可根据需要添加其他事件类型的处理
default:
// 忽略不处理的事件类型
log.debug("忽略未处理的事件类型: {}", eventType);
}
}
/**
* 处理作业状态事件
*/
private void handleJobStateEvent(JobStateEvent jobEvent) {
String jobId = jobEvent.getJobId();
String jobName = jobEvent.getJobName();
JobStatus status = jobEvent.getJobStatus();
long eventTime = jobEvent.getCreatedTime();
switch (status) {
case FAILED:
log.error("任务失败 | jobId: {}, jobName: {}, 时间: {}",
jobId, jobName, eventTime);
// 添加失败告警逻辑
sendAlert("任务失败", "jobId: " + jobId);
break;
case FINISHED:
log.info("任务完成 | jobId: {}, jobName: {}, 时间: {}",
jobId, jobName, eventTime);
break;
// 处理其他状态...
default:
log.info("任务状态变更 | jobId: {}, 状态: {}, 时间: {}",
jobId, status, eventTime);
}
}
/**
* 处理表添加列事件
*/
private void handleAddColumnEvent(AlterTableAddColumnEvent event) {
log.info("表添加列 | 表名: {}, 新增列: {}, 时间: {}",
event.getTableName(), event.getAddedColumns(), event.getEventTime());
// 处理表结构变更逻辑
}
/**
* 处理读取器打开事件
*/
private void handleReaderOpenEvent(ReaderOpenEvent event) {
log.info("读取器打开 | 插件ID: {}, 并行度: {}, 时间: {}",
event.getPluginId(), event.getParallelism(), event.getEventTime());
// 处理读取器初始化逻辑
}
/**
* 处理写入器关闭事件
*/
private void handleWriterCloseEvent(WriterCloseEvent event) {
log.info("写入器关闭 | 插件ID: {}, 处理记录数: {}, 时间: {}",
event.getPluginId(), event.getRecordCount(), event.getEventTime());
// 处理写入器资源清理逻辑
}
/**
* 发送告警通知
*/
private void sendAlert(String title, String content) {
// 实现告警逻辑(如调用HTTP接口、发送邮件等)
log.info("[告警] {}: {}", title, content);
}
}
```
### 3. 配置 SPI 加载
为使引擎自动发现并加载自定义处理器,需在项目资源目录中添加 SPI 配置文件:
1. 创建目录:`src/main/resources/META-INF/services/`
2. 新建文件:`org.apache.seatunnel.api.event.EventHandler`
3. 在文件中添加自定义处理器的全类名:
```
com.example.CustomMultiEventHandler
```
### 4. 部署与验证
- 将包含自定义处理器的 JAR 包放入 SeaTunnel 引擎的类路径(如 `lib/` 目录)
- 启动任务后,当对应事件发生时,处理器会自动触发并执行相应的处理逻辑
- 可通过日志输出验证处理器是否生效
### 注意事项
- 处理器逻辑应尽量轻量,避免阻塞事件处理线程
- 若需网络调用(如发送告警),建议使用异步方式实现,防止超时影响任务本身
- 不同引擎对事件的支持情况可能不同,例如 `JobStateEvent` 目前仅支持 Zeta 引擎
- 事件类型与事件类是一一对应的,转换时需确保类型匹配,避免 `ClassCastException`
- 可以根据业务需求,实现多个事件处理器分别处理不同类型的事件,也可以在一个处理器中处理多种事件类型
通过上述步骤,您可以灵活地监听和处理 SeaTunnel 中的各种事件,实现自定义的业务逻辑,如状态监控、告警通知、数据统计等功能。