blob: 6b126947e042bafe300625e2d4d65616febd0b4f [file] [log] [blame] [view]
Prompt:
Write a sample Java code snippet that writes data to a Google Bigtable table using Apache Beam.
Response:
Your Apache Beam pipeline can write data to a Bigtable table using the Apache Beam BigTableIO connector.
Here is an example of how to use the BigTableIO connector to accomplish this:
```java
package bigtable;
import com.google.bigtable.v2.Mutation;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Ints;
import com.google.protobuf.ByteString;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.values.KV;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
import java.math.BigInteger;
import java.util.stream.IntStream;
// Pipeline to write data to Bigtable table
public class BigTableWriteTable {
// Pipeline options for writing to BigTable table
public interface BigTableWriteTableOptions extends DataflowPipelineOptions {
@Description("Bigtable Instance ID")
@Default.String("bigtable-instance")
String getInstanceId();
void setInstanceId(String value);
@Description("Bigtable Table Name")
@Default.String("bigtable-table")
String getTableName();
void setTableName(String value);
@Nullable
@Description("Bigtable Project ID")
String getBigTableProject();
void setBigTableProject(String value);
}
public static void main(String[] args) {
// Parse the pipeline options from the command line.
BigTableWriteTableOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(BigTableWriteTableOptions.class);
// Set the project ID to the Bigtable project ID if it is not null, otherwise set it to the project ID
String project = (options.getBigTableProject() == null) ? options.getProject() : options.getBigTableProject();
// Create the pipeline
Pipeline p = Pipeline.create(options);
// Sample data to write to Bigtable
int[] rangeIntegers = IntStream.range(2, 1000).toArray();
Iterable<Integer> elements = Ints.asList(rangeIntegers);
p
// Create a PCollection from the sample data
.apply(Create.of(elements))
// Apply a ParDo transformation to create mutations
.apply("Group in Mutations", ParDo.of(new CreateMutations()))
// Write the mutations to Bigtable table
.apply(BigtableIO.write()
.withInstanceId(options.getInstanceId())
.withProjectId(project)
.withTableId(options.getTableName()));
// Execute the pipeline.
p.run();
}
// Class to create mutations for each element in the PCollection
public static class CreateMutations extends DoFn<Integer, KV<ByteString, Iterable<Mutation>>> {
public ImmutableList.Builder<Mutation> mutations;
@ProcessElement
public void processElement(ProcessContext c) {
BigInteger b = new BigInteger(c.element().toString());
String isPrime = b.isProbablePrime(1) ? "Prime" : "Composite";
// Set the cell value for the element
Mutation.SetCell setCell =
Mutation.SetCell.newBuilder()
// Set the family name to the prime status
.setFamilyName(isPrime)
// Set the column qualifier to the element value
.setColumnQualifier(toByteString(c.element().toString()))
// Set the cell value to the element value
.setValue(toByteString("value-" + c.element()))
// Set the timestamp to the current time
.setTimestampMicros(Instant.now().getMillis() * 1000)
.build();
// Add the mutation to the list of mutations
this.mutations.add(Mutation.newBuilder().setSetCell(setCell).build());
}
// Initialize the list of mutations
@StartBundle
public void startBundle() {
this.mutations = ImmutableList.builder();
}
// Finish the bundle and output the mutations
@FinishBundle
public void finishBundle(FinishBundleContext c) {
KV<ByteString, Iterable<Mutation>> row = KV.of(toByteString("numbers"), this.mutations.build());
c.output(row, Instant.now(), GlobalWindow.INSTANCE);
}
// Convert a string to a ByteString
private static ByteString toByteString(String value) {
return ByteString.copyFromUtf8(value);
}
}
}
```
This code snippet utilizes the pipeline options pattern to parse command-line arguments.