blob: fd7c3ae2bc1d50e9e118edd4d055808543fa43ed [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao;
import static org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao.DETECT_NEW_PARTITION_SUFFIX;
import static org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao.NEW_PARTITION_PREFIX;
import static org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao.STREAM_PARTITION_PREFIX;
import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken;
import com.google.cloud.bigtable.data.v2.models.Filters;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Range;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.protobuf.ByteString;
import javax.annotation.Nullable;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Data access object for managing the state of the metadata Bigtable table.
*
* <p>Metadata table is shared across many beam jobs. Each beam job uses a specific prefix to
* identify itself which is used as the row prefix.
*/
@SuppressWarnings({"UnusedVariable", "UnusedMethod"})
public class MetadataTableDao {
private static final Logger LOG = LoggerFactory.getLogger(MetadataTableDao.class);
private final BigtableDataClient dataClient;
private final String tableId;
private final ByteString changeStreamNamePrefix;
public MetadataTableDao(
BigtableDataClient dataClient, String tableId, ByteString changeStreamNamePrefix) {
this.dataClient = dataClient;
this.tableId = tableId;
this.changeStreamNamePrefix = changeStreamNamePrefix;
}
/** @return the prefix that is prepended to every row belonging to this beam job. */
public ByteString getChangeStreamNamePrefix() {
return changeStreamNamePrefix;
}
/**
* Return new partition row key prefix concatenated with change stream name.
*
* @return new partition row key prefix concatenated with change stream name.
*/
private ByteString getFullNewPartitionPrefix() {
return changeStreamNamePrefix.concat(NEW_PARTITION_PREFIX);
}
/**
* Return stream partition row key prefix concatenated with change stream name.
*
* @return stream partition row key prefix concatenated with change stream name.
*/
private ByteString getFullStreamPartitionPrefix() {
return changeStreamNamePrefix.concat(STREAM_PARTITION_PREFIX);
}
/**
* Return detect new partition row key concatenated with change stream name.
*
* @return detect new partition row key concatenated with change stream name.
*/
private ByteString getFullDetectNewPartition() {
return changeStreamNamePrefix.concat(DETECT_NEW_PARTITION_SUFFIX);
}
/**
* Convert partition to a Stream Partition row key to query for metadata of partitions that are
* currently being streamed.
*
* @param partition convert to row key
* @return row key to insert to Cloud Bigtable.
*/
public ByteString convertPartitionToStreamPartitionRowKey(Range.ByteStringRange partition) {
return getFullStreamPartitionPrefix().concat(Range.ByteStringRange.toByteString(partition));
}
/**
* Convert partition to a New Partition row key to query for partitions ready to be streamed as
* the result of splits and merges.
*
* @param partition convert to row key
* @return row key to insert to Cloud Bigtable.
*/
public ByteString convertPartitionToNewPartitionRowKey(Range.ByteStringRange partition) {
return getFullNewPartitionPrefix().concat(Range.ByteStringRange.toByteString(partition));
}
/**
* @return stream of all the new partitions resulting from splits and merges waiting to be
* streamed.
*/
public ServerStream<Row> readNewPartitions() {
// It's important that we limit to the latest value per column because it's possible to write to
// the same column multiple times. We don't want to read and send duplicate tokens to the
// server.
Query query =
Query.create(tableId)
.prefix(getFullNewPartitionPrefix())
.filter(Filters.FILTERS.limit().cellsPerColumn(1));
return dataClient.readRows(query);
}
/**
* After a split or merge from a close stream, write the new partition's information to the
* metadata table.
*
* @param changeStreamContinuationToken the token that can be used to pick up from where the
* parent left off
* @param parentPartition the parent that stopped and split or merged
* @param lowWatermark the low watermark of the parent stream
*/
public void writeNewPartition(
ChangeStreamContinuationToken changeStreamContinuationToken,
Range.ByteStringRange parentPartition,
Instant lowWatermark) {
writeNewPartition(
changeStreamContinuationToken.getPartition(),
changeStreamContinuationToken.toByteString(),
Range.ByteStringRange.toByteString(parentPartition),
lowWatermark);
}
/**
* After a split or merge from a close stream, write the new partition's information to the
* metadata table.
*
* @param newPartition the new partition
* @param newPartitionContinuationToken continuation token for the new partition
* @param parentPartition the parent that stopped
* @param lowWatermark low watermark of the parent
*/
private void writeNewPartition(
Range.ByteStringRange newPartition,
ByteString newPartitionContinuationToken,
ByteString parentPartition,
Instant lowWatermark) {
ByteString rowKey = convertPartitionToNewPartitionRowKey(newPartition);
RowMutation rowMutation =
RowMutation.create(tableId, rowKey)
.setCell(MetadataTableAdminDao.CF_INITIAL_TOKEN, newPartitionContinuationToken, 1)
.setCell(MetadataTableAdminDao.CF_PARENT_PARTITIONS, parentPartition, 1)
.setCell(
MetadataTableAdminDao.CF_PARENT_LOW_WATERMARKS,
parentPartition,
ByteString.copyFromUtf8(Long.toString(lowWatermark.getMillis())));
dataClient.mutateRow(rowMutation);
}
/**
* Update the metadata for the rowKey. This helper adds necessary prefixes to the row key.
*
* @param rowKey row key of the row to update
* @param watermark watermark value to set for the cell
* @param currentToken continuation token to set for the cell
*/
private void writeToMdTableWatermarkHelper(
ByteString rowKey, Instant watermark, @Nullable ChangeStreamContinuationToken currentToken) {
RowMutation rowMutation =
RowMutation.create(tableId, rowKey)
.setCell(
MetadataTableAdminDao.CF_WATERMARK,
MetadataTableAdminDao.QUALIFIER_DEFAULT,
watermark.getMillis());
if (currentToken != null) {
rowMutation.setCell(
MetadataTableAdminDao.CF_CONTINUATION_TOKEN,
MetadataTableAdminDao.QUALIFIER_DEFAULT,
currentToken.getToken());
}
dataClient.mutateRow(rowMutation);
}
/**
* Update the metadata for the row key represented by the partition.
*
* @param partition forms the row key of the row to update
* @param watermark watermark value to set for the cell
* @param currentToken continuation token to set for the cell
*/
public void updateWatermark(
Range.ByteStringRange partition,
Instant watermark,
@Nullable ChangeStreamContinuationToken currentToken) {
writeToMdTableWatermarkHelper(
convertPartitionToStreamPartitionRowKey(partition), watermark, currentToken);
}
/**
* Delete the row key represented by the partition. This represents that the partition will no
* longer be streamed.
*
* @param partition forms the row key of the row to delete
*/
public void deleteStreamPartitionRow(Range.ByteStringRange partition) {
ByteString rowKey = convertPartitionToStreamPartitionRowKey(partition);
RowMutation rowMutation = RowMutation.create(tableId, rowKey).deleteRow();
dataClient.mutateRow(rowMutation);
}
/**
* Set the version number for DetectNewPartition. This value can be checked later to verify that
* the existing metadata table is compatible with current beam connector code.
*/
public void writeDetectNewPartitionVersion() {
RowMutation rowMutation =
RowMutation.create(tableId, getFullDetectNewPartition())
.setCell(
MetadataTableAdminDao.CF_VERSION,
MetadataTableAdminDao.QUALIFIER_DEFAULT,
MetadataTableAdminDao.CURRENT_METADATA_TABLE_VERSION);
dataClient.mutateRow(rowMutation);
}
}