GeaFlow 支持使用Java SPI方式定义用户自定义Connector。
用户应该实现一个 TableConnector 接口。我们支持使用 TableReadableConnector 用于读取数据,使用 TableWritableConnector 用于写入数据。如果两个接口都实现了,连接器将同时支持读和写操作。
/** * The interface for table connector. */ public interface TableConnector { /** * Return table connector type. */ String getType(); } /** * A readable table connector. */ public interface TableReadableConnector extends TableConnector { TableSource createSource(Configuration conf); } /** * A writable table connector. */ public interface TableWritableConnector extends TableConnector { /** * Create the {@link TableSink} for the table connector. */ TableSink createSink(Configuration conf); }
TableSource 接口用于从连接器中读取数据。
/** * Interface for table source. */ public interface TableSource extends Serializable { /** * The init method for compile time. */ void init(Configuration tableConf, TableSchema tableSchema); /** * The init method for runtime. */ void open(RuntimeContext context); /** * List all the partitions for the source. */ List<Partition> listPartitions(); /** * Returns the {@link TableDeserializer} for the source to convert data read from * the source to {@link Row}. */ <IN> TableDeserializer<IN> getDeserializer(Configuration conf); /** * Fetch data for the partition from start offset. if the windowSize is -1, it represents an * all-window which will read all the data from the source, else return widow size for data. */ <T> FetchData<T> fetch(Partition partition, Optional<Offset> startOffset, long windowSize) throws IOException; /** * The close callback for the job finish the execution. */ void close(); }
TableSink 接口用于将数据写入连接器。
/** * Interface for table sink. */ public interface TableSink extends Serializable { /** * The init method for compile time. */ void init(Configuration tableConf, StructType schema); /** * The init method for runtime. */ void open(RuntimeContext context); /** * The write method for writing row to the table. */ void write(Row row) throws IOException; /** * The finish callback for each window finished. */ void finish() throws IOException; /** * The close callback for the job finish the execution. */ void close(); }
下面是一个用于控制台的Table Connector的示例。
public class ConsoleTableConnector implements TableWritableConnector { @Override public String getType() { return "CONSOLE"; } @Override public TableSink createSink(Configuration conf) { return new ConsoleTableSink(); } } public class ConsoleTableSink implements TableSink { private static final Logger LOGGER = LoggerFactory.getLogger(ConsoleTableSink.class); private boolean skip; @Override public void init(Configuration tableConf, StructType schema) { skip = tableConf.getBoolean(ConsoleConfigKeys.GEAFLOW_DSL_CONSOLE_SKIP); } @Override public void open(RuntimeContext context) { } @Override public void write(Row row) { if (!skip) { LOGGER.info(row.toString()); } } @Override public void finish() { } @Override public void close() { } }
在实现了 ConsoleTableConnector 后,您需要将完整的类名添加到 resources/META-INF.services/com.antgroup.gryphon.dsl.connector.api.TableConnector 文件中。该文件应列出所有实现了 TableConnector 接口的连接器类的全名,以便 GeaFlow 在启动时能够扫描到这些类,并将它们注册为可用的Connector。
CREATE TABLE file_source ( id BIGINT, name VARCHAR, age INT ) WITH ( type='file', geaflow.dsl.file.path = '/path/to/file' ); CREATE TABLE console_sink ( id BIGINT, name VARCHAR, age INT ) WITH ( type='console' ); INSERT INTO console_sink SELECT * FROM file_source;