blob: 50ceeab5c9f229c1241e21b91a36a7c293dce0a2 [file] [log] [blame] [view]
Prompt:
Write a sample Java code snippet that writes data to a Google Spanner table using Apache Beam.
Response:
Your Apache Beam pipeline can write data to a Spanner table using the Apache Beam SpannerIO connector.
Here is an example of how to use the SpannerIO connector to accomplish this:
```java
package spanner;
import com.google.cloud.spanner.Mutation;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
// Pipeline to write data to a Spanner table.
public class WriteSpannerTable {
private static final Logger LOG = LoggerFactory.getLogger(WriteSpannerTable.class);
// Pipeline options for writing to Spanner tables.
public interface WriteSpannerTableOptions extends DataflowPipelineOptions {
@Description("Spanner Instance ID")
@Default.String("spanner-instance")
String getInstanceId();
void setInstanceId(String value);
@Description("Spanner Database Name")
@Default.String("spanner-db")
String getDatabaseName();
void setDatabaseName(String value);
@Description("Spanner Table Name")
@Default.String("spanner-table")
String getTableName();
void setTableName(String value);
@Nullable
@Description("Spanner Project ID")
String getSpannerProjectId();
void setSpannerProjectId(String value);
}
public static void main(String[] args) {
// Parse the pipeline options from the command line.
WriteSpannerTableOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(WriteSpannerTableOptions.class);
// Create the pipeline.
Pipeline p = Pipeline.create(options);
// Sample data to write to the Spanner table.
final List<String> elements = Arrays.asList(
"1, January, $1000",
"2, February, $2000",
"3, March , $3000",
"4, April, $4000",
);
String table = options.getTableName();
// Get the project ID from the pipeline options or the default project.
String project = (options.getSpannerProjectId() == null) ? options.getProject() : options.getSpannerProjectId();
p
// Create a PCollection of strings from the sample data.
.apply(Create.of(elements))
// Convert the strings to Spanner mutations.
.apply("To Spanner Mutation", ParDo.of(new DoFn<String, Mutation>() {
@ProcessElement
public void processElement(ProcessContext c) {
String[] records = c.element().split(", ");
Long recordId = Long.parseLong(records[0]);
// Create a Spanner mutation for each record.
Mutation mutation = Mutation.newInsertOrUpdateBuilder(table)
.set("id").to(recordId)
.set("month").to(records[1])
.set("amount").to(records[2])
.build();
c.output(mutation);
}
})
)
// Write the mutations to the Spanner table.
.apply(SpannerIO.write()
.withInstanceId(options.getInstanceId())
.withDatabaseId(options.getDatabaseName())
.withProjectId(project)
);
// Execute the pipeline.
p.run();
}
}
```
This code snippet utilizes the pipeline options pattern to parse command-line arguments.