I/O connectors enable reading from and writing to external data sources. Beam provides 51+ Java I/O connectors and several Python connectors.
sdks/java/io/
| Category | Connectors |
|---|---|
| Cloud Storage | google-cloud-platform (BigQuery, Bigtable, Spanner, Pub/Sub, GCS), amazon-web-services2, azure, azure-cosmos |
| Databases | jdbc, mongodb, cassandra, hbase, redis, neo4j, clickhouse, influxdb, singlestore, elasticsearch |
| Messaging | kafka, pulsar, rabbitmq, amqp, jms, mqtt, solace |
| File Formats | parquet, csv, json, xml, thrift, iceberg |
| Other | snowflake, splunk, cdap, debezium, hadoop-format, kudu, solr, tika |
./gradlew :sdks:java:io:kafka:test ./gradlew :sdks:java:io:jdbc:test
./gradlew :sdks:java:io:google-cloud-platform:integrationTest
./gradlew :sdks:java:io:google-cloud-platform:integrationTest \ -PgcpProject=<project> \ -PgcpTempRoot=gs://<bucket>/path
./gradlew :sdks:java:io:jdbc:integrationTest \ -DbeamTestPipelineOptions='["--runner=TestDirectRunner"]'
Located at it/ directory:
it/common/ - Common test utilitiesit/google-cloud-platform/ - GCP-specific test infrastructureit/jdbc/ - JDBC test infrastructureit/kafka/ - Kafka test infrastructureit/testcontainers/ - Testcontainers support@RunWith(JUnit4.class) public class MyIOIT { @Rule public TestPipeline readPipeline = TestPipeline.create(); @Rule public TestPipeline writePipeline = TestPipeline.create(); @Test public void testWriteAndRead() { // Write data writePipeline.apply(Create.of(testData)) .apply(MyIO.write().to(destination)); writePipeline.run().waitUntilFinish(); // Read and verify PCollection<String> results = readPipeline.apply(MyIO.read().from(destination)); PAssert.that(results).containsInAnyOrder(expectedData); readPipeline.run().waitUntilFinish(); } }
@Rule public TestPipeline pipeline = TestPipeline.create();
TestPipeline:
beamTestPipelineOptions system property// Read pipeline.apply(BigQueryIO.readTableRows().from("project:dataset.table")); // Write data.apply(BigQueryIO.writeTableRows() .to("project:dataset.table") .withSchema(schema) .withWriteDisposition(WriteDisposition.WRITE_APPEND));
// Read pipeline.apply(PubsubIO.readStrings().fromTopic("projects/project/topics/topic")); // Write data.apply(PubsubIO.writeStrings().to("projects/project/topics/topic"));
// Read pipeline.apply(TextIO.read().from("gs://bucket/path/*.txt")); // Write data.apply(TextIO.write().to("gs://bucket/output").withSuffix(".txt"));
// Read pipeline.apply(KafkaIO.<String, String>read() .withBootstrapServers("localhost:9092") .withTopic("topic") .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(StringDeserializer.class)); // Write data.apply(KafkaIO.<String, String>write() .withBootstrapServers("localhost:9092") .withTopic("topic") .withKeySerializer(StringSerializer.class) .withValueSerializer(StringSerializer.class));
// Read pipeline.apply(JdbcIO.<Row>read() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration .create("org.postgresql.Driver", "jdbc:postgresql://host/db")) .withQuery("SELECT * FROM table")); // Write data.apply(JdbcIO.<Row>write() .withDataSourceConfiguration(config) .withStatement("INSERT INTO table VALUES (?, ?)"));
sdks/python/apache_beam/io/
textio - Text filesfileio - General file operationsavroio - Avro filesparquetio - Parquet filesgcp/ - GCP connectors (BigQuery, Pub/Sub, Datastore, etc.)Beam supports using I/O connectors from one SDK in another via the expansion service.
# Start Java expansion service ./gradlew :sdks:java:io:expansion-service:runExpansionService
Key components: