blob: 98b9132ee3f38d1e3f16e6269b193b538966a81b [file] [log] [blame] [view]
Prompt:
What does this code do?
```java
package jdbc;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.sql.ResultSet;
import java.util.Objects;
public class ReadFormJdbcPartition {
private static final Logger LOG = LoggerFactory.getLogger(ReadFormJdbcPartition.class);
public static class SampleRow implements Serializable {
public static final String ID_COLUMN = "id";
public static final String MONTH_COLUMN = "month";
public static final String AMOUNT_COLUMN = "amount";
private int id;
private String month;
private String amount;
public SampleRow() {}
public SampleRow(int id, String month, String amount) {
this.id = id;
this.month = month;
this.amount = amount;
}
@Override
public String toString() {
return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" + amount + '\'' + '}';
}
}
public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow> {
@Override
public ExampleRow mapRow(ResultSet resultSet) throws Exception {
return new ExampleRow(
Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
resultSet.getString(ExampleRow.MONTH_COLUMN),
resultSet.getString(ExampleRow.AMOUNT_COLUMN));
}
}
public interface ReadFromJdbcPartitionOptions extends PipelineOptions {
@Description("Table name to read from")
@Validation.Required
String getTableName();
void setTableName(String tableName);
@Description("JDBC Source URL")
@Validation.Required
String getJdbcSourceUrl();
void setJdbcSourceUrl(String jdbcUrl);
@Description("JDBC driver class name")
@Default.String("org.postgresql.Driver")
String getDriverClassName();
void setDriverClassName(String driverClassName);
@Description("Jdbc DB Username")
@Validation.Required
String getSourceUsername();
void setSourceUsername(String username);
@Description("Jdbc DB password")
@Validation.Required
String getSourcePassword();
void setSourcePassword(String password);
}
public static void main(String[] args) {
ReadFromJdbcPartitionOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadFromJdbcPartitionOptions.class);
JdbcIO.DataSourceConfiguration config =
JdbcIO.DataSourceConfiguration.create(options.getDriverClassName(), options.getJdbcSourceUrl())
.withUsername(options.getSourceUsername())
.withPassword(options.getSourcePassword());
Pipeline p = Pipeline.create(options);
p.apply(
"Read from Jdbc source",
JdbcIO.<ExampleRow>readWithPartitions()
.withDataSourceConfiguration(config)
.withRowMapper(new CreateExampleRow())
.withTable(options.getTableName())
.withNumPartitions(1)
.withPartitionColumn(ExampleRow.ID_COLUMN)
.withLowerBound(0L)
.withUpperBound(1000L))
.apply(
"Log Source Data",
ParDo.of(
new DoFn<ExampleRow, ExampleRow>() {
@ProcessElement
public void processElement(ProcessContext c) {
LOG.info(Objects.requireNonNull(c.element()).toString());
c.output(c.element());
}
}));
p.run();
}
}
```
Response:
This code reads data from an Oracle Java Database Connectivity (JDBC) enabled database using the Apache Beam JdbcIO connector.
The `SampleRow` class models the rows fetched from the JDBC source. It includes the fields `id`, `month`, and `amount` that correspond to the constants representing column names in the database table. Additionally, it overrides the `toString` method to provide a string representation of the `SampleRow` object.
```java
public static class SampleRow implements Serializable {
public static final String ID_COLUMN = "id";
public static final String MONTH_COLUMN = "month";
public static final String AMOUNT_COLUMN = "amount";
private int id;
private String month;
private String amount;
public SampleRow() {}
public SampleRow(int id, String month, String amount) {
this.id = id;
this.month = month;
this.amount = amount;
}
@Override
public String toString() {
return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" + amount + '\'' + '}';
}
}
```
The `CreateExampleRow` class implements the `JdbcIO.RowMapper` interface to map rows from the JDBC source to `ExampleRow` objects. The `mapRow` method extracts column values from the `ResultSet` interface and creates an `ExampleRow` object.
```java
public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow> {
@Override
public ExampleRow mapRow(ResultSet resultSet) throws Exception {
return new ExampleRow(
Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
resultSet.getString(ExampleRow.MONTH_COLUMN),
resultSet.getString(ExampleRow.AMOUNT_COLUMN));
}
}
```
The `ReadFromJdbcPartitionOptions` interface specifies the table name, JDBC source URL, driver class name, username, and password. The `@Description` annotation provides a description of the option. Use the command-line arguments `--tableName`, `--jdbcSourceUrl`, `--driverClassName`, `--sourceUsername`, and `--sourcePassword` to set these options when executing the pipeline.
```java
public interface ReadFromJdbcPartitionOptions extends PipelineOptions {
@Description("Table name to read from")
@Validation.Required
String getTableName();
void setTableName(String tableName);
@Description("JDBC Source URL")
@Validation.Required
String getJdbcSourceUrl();
void setJdbcSourceUrl(String jdbcUrl);
@Description("JDBC driver class name")
@Default.String("org.postgresql.Driver")
String getDriverClassName();
void setDriverClassName(String driverClassName);
@Description("Jdbc DB Username")
@Validation.Required
String getSourceUsername();
void setSourceUsername(String username);
@Description("Jdbc DB password")
@Validation.Required
String getSourcePassword();
void setSourcePassword(String password);
}
```
In the `main method`, the `PipelineOptionsFactory` class creates a `ReadFromJdbcPartitionOptions` object from the command-line arguments. The `Pipeline.create` method creates a new pipeline with the specified options. The `DataSourceConfiguration` class represents a configuration object that encapsulates the necessary information for establishing the connection to the JDBC source.
```java
ReadFromJdbcPartitionOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadFromJdbcPartitionOptions.class);
JdbcIO.DataSourceConfiguration config =
JdbcIO.DataSourceConfiguration.create(options.getDriverClassName(), options.getJdbcSourceUrl())
.withUsername(options.getSourceUsername())
.withPassword(options.getSourcePassword());
Pipeline p = Pipeline.create(options);
```
To read data from the JDBC source in parallel, the code snippet uses the `JdbcIO.readWithPartitions` method. The `.withNumPartitions()` method determines how many partitions the data will be split into for parallel processing. The `withPartitionColumn` method specifies the column to use for partitioning the data. The `withLowerBound` and `withUpperBound` methods specify the lower and upper bounds of the data to read.
```java
p.apply(
"Read from Jdbc source",
JdbcIO.<ExampleRow>readWithPartitions()
.withDataSourceConfiguration(config)
.withRowMapper(new CreateExampleRow())
.withTable(options.getTableName())
.withNumPartitions(1)
.withPartitionColumn(ExampleRow.ID_COLUMN)
.withLowerBound(0L)
.withUpperBound(1000L))
.apply(
"Log Source Data",
ParDo.of(
new DoFn<ExampleRow, ExampleRow>() {
@ProcessElement
public void processElement(ProcessContext c) {
LOG.info(Objects.requireNonNull(c.element()).toString());
c.output(c.element());
}
}));
```
Finally, the `run` method executes the pipeline.
```java
p.run();
```