Prompt: Write a sample Java code snippet that writes data to a Google Bigtable table using Apache Beam.
Response: Your Apache Beam pipeline can write data to a Bigtable table using the Apache Beam BigTableIO connector.
Here is an example of how to use the BigTableIO connector to accomplish this:
package bigtable; import com.google.bigtable.v2.Mutation; import com.google.common.collect.ImmutableList; import com.google.common.primitives.Ints; import com.google.protobuf.ByteString; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.values.KV; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; import java.math.BigInteger; import java.util.stream.IntStream; // Pipeline to write data to Bigtable table public class BigTableWriteTable { // Pipeline options for writing to BigTable table public interface BigTableWriteTableOptions extends DataflowPipelineOptions { @Description("Bigtable Instance ID") @Default.String("bigtable-instance") String getInstanceId(); void setInstanceId(String value); @Description("Bigtable Table Name") @Default.String("bigtable-table") String getTableName(); void setTableName(String value); @Nullable @Description("Bigtable Project ID") String getBigTableProject(); void setBigTableProject(String value); } public static void main(String[] args) { // Parse the pipeline options from the command line. BigTableWriteTableOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(BigTableWriteTableOptions.class); // Set the project ID to the Bigtable project ID if it is not null, otherwise set it to the project ID String project = (options.getBigTableProject() == null) ? options.getProject() : options.getBigTableProject(); // Create the pipeline Pipeline p = Pipeline.create(options); // Sample data to write to Bigtable int[] rangeIntegers = IntStream.range(2, 1000).toArray(); Iterable<Integer> elements = Ints.asList(rangeIntegers); p // Create a PCollection from the sample data .apply(Create.of(elements)) // Apply a ParDo transformation to create mutations .apply("Group in Mutations", ParDo.of(new CreateMutations())) // Write the mutations to Bigtable table .apply(BigtableIO.write() .withInstanceId(options.getInstanceId()) .withProjectId(project) .withTableId(options.getTableName())); // Execute the pipeline. p.run(); } // Class to create mutations for each element in the PCollection public static class CreateMutations extends DoFn<Integer, KV<ByteString, Iterable<Mutation>>> { public ImmutableList.Builder<Mutation> mutations; @ProcessElement public void processElement(ProcessContext c) { BigInteger b = new BigInteger(c.element().toString()); String isPrime = b.isProbablePrime(1) ? "Prime" : "Composite"; // Set the cell value for the element Mutation.SetCell setCell = Mutation.SetCell.newBuilder() // Set the family name to the prime status .setFamilyName(isPrime) // Set the column qualifier to the element value .setColumnQualifier(toByteString(c.element().toString())) // Set the cell value to the element value .setValue(toByteString("value-" + c.element())) // Set the timestamp to the current time .setTimestampMicros(Instant.now().getMillis() * 1000) .build(); // Add the mutation to the list of mutations this.mutations.add(Mutation.newBuilder().setSetCell(setCell).build()); } // Initialize the list of mutations @StartBundle public void startBundle() { this.mutations = ImmutableList.builder(); } // Finish the bundle and output the mutations @FinishBundle public void finishBundle(FinishBundleContext c) { KV<ByteString, Iterable<Mutation>> row = KV.of(toByteString("numbers"), this.mutations.build()); c.output(row, Instant.now(), GlobalWindow.INSTANCE); } // Convert a string to a ByteString private static ByteString toByteString(String value) { return ByteString.copyFromUtf8(value); } } }
This code snippet utilizes the pipeline options pattern to parse command-line arguments.