blob: 5d7d79f71ca855ab9acc0801dae9b20689c0dbfd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.clickhouse;
import com.google.auto.value.AutoValue;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.io.clickhouse.TableSchema.ColumnType;
import org.apache.beam.sdk.io.clickhouse.TableSchema.DefaultType;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
import org.apache.beam.sdk.schemas.LogicalTypes;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHouseDataSource;
import ru.yandex.clickhouse.ClickHouseStatement;
import ru.yandex.clickhouse.settings.ClickHouseQueryParam;
/**
* An IO to write to ClickHouse.
*
* <h3>Writing to ClickHouse</h3>
*
* <p>To write to ClickHouse, use {@link ClickHouseIO#write(String, String)}, which writes elements
* from input {@link PCollection}. It's required that your ClickHouse cluster already has table you
* are going to insert into.
*
* <pre>{@code
* pipeline
* .apply(...)
* .apply(
* ClickHouseIO.<POJO>write("jdbc:clickhouse:localhost:8123/default", "my_table"));
* }</pre>
*
* <p>Optionally, you can provide connection settings, for instance, specify insert block size with
* {@link Write#withMaxInsertBlockSize(long)}, or configure number of retries with {@link
* Write#withMaxRetries(int)}.
*
* <h4>Deduplication</h4>
*
* Deduplication is performed by ClickHouse if inserting to <a
* href="https://clickhouse.yandex/docs/en/single/#data-replication">ReplicatedMergeTree</a> or <a
* href="https://clickhouse.yandex/docs/en/single/#distributed">Distributed</a> table on top of
* ReplicatedMergeTree. Without replication, inserting into regular MergeTree can produce
* duplicates, if insert fails, and then successfully retries. However, each block is inserted
* atomically, and you can configure block size with {@link Write#withMaxInsertBlockSize(long)}.
*
* <p>Deduplication is performed using checksums of inserted blocks.
*
* <h4>Mapping between Beam and ClickHouse types</h4>
*
* <table summary="Type mapping">
* <tr><th>ClickHouse</th> <th>Beam</th></tr>
* <tr><td>{@link TableSchema.TypeName#FLOAT32}</td> <td>{@link Schema.TypeName#FLOAT}</td></tr>
* <tr><td>{@link TableSchema.TypeName#FLOAT64}</td> <td>{@link Schema.TypeName#DOUBLE}</td></tr>
* <tr><td>{@link TableSchema.TypeName#FIXEDSTRING}</td> <td>{@link LogicalTypes.FixedBytes}</td></tr>
* <tr><td>{@link TableSchema.TypeName#INT8}</td> <td>{@link Schema.TypeName#BYTE}</td></tr>
* <tr><td>{@link TableSchema.TypeName#INT16}</td> <td>{@link Schema.TypeName#INT16}</td></tr>
* <tr><td>{@link TableSchema.TypeName#INT32}</td> <td>{@link Schema.TypeName#INT32}</td></tr>
* <tr><td>{@link TableSchema.TypeName#INT64}</td> <td>{@link Schema.TypeName#INT64}</td></tr>
* <tr><td>{@link TableSchema.TypeName#STRING}</td> <td>{@link Schema.TypeName#STRING}</td></tr>
* <tr><td>{@link TableSchema.TypeName#UINT8}</td> <td>{@link Schema.TypeName#INT16}</td></tr>
* <tr><td>{@link TableSchema.TypeName#UINT16}</td> <td>{@link Schema.TypeName#INT32}</td></tr>
* <tr><td>{@link TableSchema.TypeName#UINT32}</td> <td>{@link Schema.TypeName#INT64}</td></tr>
* <tr><td>{@link TableSchema.TypeName#UINT64}</td> <td>{@link Schema.TypeName#INT64}</td></tr>
* <tr><td>{@link TableSchema.TypeName#DATE}</td> <td>{@link Schema.TypeName#DATETIME}</td></tr>
* <tr><td>{@link TableSchema.TypeName#DATETIME}</td> <td>{@link Schema.TypeName#DATETIME}</td></tr>
* <tr><td>{@link TableSchema.TypeName#ARRAY}</td> <td>{@link Schema.TypeName#ARRAY}</td></tr>
* <tr><td>{@link TableSchema.TypeName#ENUM8}</td> <td>{@link Schema.TypeName#STRING}</td></tr>
* <tr><td>{@link TableSchema.TypeName#ENUM16}</td> <td>{@link Schema.TypeName#STRING}</td></tr>
* </table>
*
* Nullable row columns are supported through Nullable type in ClickHouse.
*
* <p>Nested rows should be unnested using {@link org.apache.beam.sdk.schemas.transforms.Unnest}.
* Type casting should be done using {@link org.apache.beam.sdk.schemas.transforms.Cast} before
* {@link ClickHouseIO}.
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
public class ClickHouseIO {
public static final long DEFAULT_MAX_INSERT_BLOCK_SIZE = 1000000;
public static final int DEFAULT_MAX_RETRIES = 5;
public static final Duration DEFAULT_MAX_CUMULATIVE_BACKOFF = Duration.standardDays(1000);
public static final Duration DEFAULT_INITIAL_BACKOFF = Duration.standardSeconds(5);
public static <T> Write<T> write(String jdbcUrl, String table) {
return new AutoValue_ClickHouseIO_Write.Builder<T>()
.jdbcUrl(jdbcUrl)
.table(table)
.properties(new Properties())
.maxInsertBlockSize(DEFAULT_MAX_INSERT_BLOCK_SIZE)
.initialBackoff(DEFAULT_INITIAL_BACKOFF)
.maxRetries(DEFAULT_MAX_RETRIES)
.maxCumulativeBackoff(DEFAULT_MAX_CUMULATIVE_BACKOFF)
.build()
.withInsertDeduplicate(true)
.withInsertDistributedSync(true);
}
/** A {@link PTransform} to write to ClickHouse. */
@AutoValue
public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
public abstract String jdbcUrl();
public abstract String table();
public abstract Properties properties();
public abstract long maxInsertBlockSize();
public abstract int maxRetries();
public abstract Duration maxCumulativeBackoff();
public abstract Duration initialBackoff();
@Nullable
public abstract Boolean insertDistributedSync();
@Nullable
public abstract Long insertQuorum();
@Nullable
public abstract Boolean insertDeduplicate();
abstract Builder<T> toBuilder();
@Override
public PDone expand(PCollection<T> input) {
TableSchema tableSchema = getTableSchema(jdbcUrl(), table());
Properties properties = properties();
set(properties, ClickHouseQueryParam.MAX_INSERT_BLOCK_SIZE, maxInsertBlockSize());
set(properties, ClickHouseQueryParam.INSERT_QUORUM, insertQuorum());
set(properties, "insert_distributed_sync", insertDistributedSync());
set(properties, "insert_deduplication", insertDeduplicate());
WriteFn<T> fn =
new AutoValue_ClickHouseIO_WriteFn.Builder<T>()
.jdbcUrl(jdbcUrl())
.table(table())
.maxInsertBlockSize(maxInsertBlockSize())
.schema(tableSchema)
.properties(properties)
.initialBackoff(initialBackoff())
.maxCumulativeBackoff(maxCumulativeBackoff())
.maxRetries(maxRetries())
.build();
input.apply(ParDo.of(fn));
return PDone.in(input.getPipeline());
}
/**
* The maximum block size for insertion, if we control the creation of blocks for insertion.
*
* @param value number of rows
* @return a {@link PTransform} writing data to ClickHouse
* @see <a href="https://clickhouse.yandex/docs/en/single/#max_insert_block_size">ClickHouse
* documentation</a>
*/
public Write<T> withMaxInsertBlockSize(long value) {
return toBuilder().maxInsertBlockSize(value).build();
}
/**
* If setting is enabled, insert query into distributed waits until data will be sent to all
* nodes in cluster.
*
* @param value true to enable, null for server default
* @return a {@link PTransform} writing data to ClickHouse
*/
public Write<T> withInsertDistributedSync(@Nullable Boolean value) {
return toBuilder().insertDistributedSync(value).build();
}
/**
* For INSERT queries in the replicated table, wait writing for the specified number of replicas
* and linearize the addition of the data. 0 - disabled.
*
* <p>This setting is disabled in default server settings.
*
* @param value number of replicas, 0 for disabling, null for server default
* @return a {@link PTransform} writing data to ClickHouse
* @see <a href="https://clickhouse.yandex/docs/en/single/#insert_quorum">ClickHouse
* documentation</a>
*/
public Write<T> withInsertQuorum(@Nullable Long value) {
return toBuilder().insertQuorum(value).build();
}
/**
* For INSERT queries in the replicated table, specifies that deduplication of inserting blocks
* should be performed.
*
* <p>Enabled by default. Shouldn't be disabled unless your input has duplicate blocks, and you
* don't want to deduplicate them.
*
* @param value true to enable, null for server default
* @return a {@link PTransform} writing data to ClickHouse
*/
public Write<T> withInsertDeduplicate(Boolean value) {
return toBuilder().insertDeduplicate(value).build();
}
/**
* Maximum number of retries per insert.
*
* <p>See {@link FluentBackoff#withMaxRetries}.
*
* @param value maximum number of retries
* @return a {@link PTransform} writing data to ClickHouse
*/
public Write<T> withMaxRetries(int value) {
return toBuilder().maxRetries(value).build();
}
/**
* Limits total time spent in backoff.
*
* <p>See {@link FluentBackoff#withMaxCumulativeBackoff}.
*
* @param value maximum duration
* @return a {@link PTransform} writing data to ClickHouse
*/
public Write<T> withMaxCumulativeBackoff(Duration value) {
return toBuilder().maxCumulativeBackoff(value).build();
}
/**
* Set initial backoff duration.
*
* <p>See {@link FluentBackoff#withInitialBackoff}.
*
* @param value initial duration
* @return a {@link PTransform} writing data to ClickHouse
*/
public Write<T> withInitialBackoff(Duration value) {
return toBuilder().initialBackoff(value).build();
}
/** Builder for {@link Write}. */
@AutoValue.Builder
abstract static class Builder<T> {
public abstract Builder<T> jdbcUrl(String jdbcUrl);
public abstract Builder<T> table(String table);
public abstract Builder<T> maxInsertBlockSize(long maxInsertBlockSize);
public abstract Builder<T> insertDistributedSync(Boolean insertDistributedSync);
public abstract Builder<T> insertQuorum(Long insertQuorum);
public abstract Builder<T> insertDeduplicate(Boolean insertDeduplicate);
public abstract Builder<T> properties(Properties properties);
public abstract Builder<T> maxRetries(int maxRetries);
public abstract Builder<T> maxCumulativeBackoff(Duration maxCumulativeBackoff);
public abstract Builder<T> initialBackoff(Duration initialBackoff);
public abstract Write<T> build();
}
private static void set(Properties properties, ClickHouseQueryParam param, Object value) {
if (value != null) {
Preconditions.checkArgument(
param.getClazz().isInstance(value),
"Unexpected value '"
+ value
+ "' for "
+ param.getKey()
+ " got "
+ value.getClass().getName()
+ ", expected "
+ param.getClazz().getName());
properties.put(param, value);
}
}
private static void set(Properties properties, String param, Object value) {
if (value != null) {
properties.put(param, value);
}
}
}
@AutoValue
abstract static class WriteFn<T> extends DoFn<T, Void> {
private static final Logger LOG = LoggerFactory.getLogger(WriteFn.class);
private static final String RETRY_ATTEMPT_LOG =
"Error writing to ClickHouse. Retry attempt[%d]";
private ClickHouseConnection connection;
private FluentBackoff retryBackoff;
private final List<Row> buffer = new ArrayList<>();
private final Distribution batchSize = Metrics.distribution(Write.class, "batch_size");
private final Counter retries = Metrics.counter(Write.class, "retries");
// TODO: This should be the same as resolved so that Beam knows which fields
// are being accessed. Currently Beam only supports wildcard descriptors.
// Once BEAM-4457 is fixed, fix this.
@FieldAccess("filterFields")
final FieldAccessDescriptor fieldAccessDescriptor = FieldAccessDescriptor.withAllFields();
public abstract String jdbcUrl();
public abstract String table();
public abstract long maxInsertBlockSize();
public abstract int maxRetries();
public abstract Duration maxCumulativeBackoff();
public abstract Duration initialBackoff();
public abstract TableSchema schema();
public abstract Properties properties();
@VisibleForTesting
static String insertSql(TableSchema schema, String table) {
String columnsStr =
schema.columns().stream()
.filter(x -> !x.materializedOrAlias())
.map(x -> quoteIdentifier(x.name()))
.collect(Collectors.joining(", "));
return "INSERT INTO " + quoteIdentifier(table) + " (" + columnsStr + ")";
}
@Setup
public void setup() throws SQLException {
connection = new ClickHouseDataSource(jdbcUrl(), properties()).getConnection();
retryBackoff =
FluentBackoff.DEFAULT
.withMaxRetries(maxRetries())
.withMaxCumulativeBackoff(maxCumulativeBackoff())
.withInitialBackoff(initialBackoff());
}
@Teardown
public void tearDown() throws Exception {
connection.close();
}
@StartBundle
public void startBundle() {
buffer.clear();
}
@FinishBundle
public void finishBundle() throws Exception {
flush();
}
@ProcessElement
public void processElement(@FieldAccess("filterFields") @Element Row input) throws Exception {
buffer.add(input);
if (buffer.size() >= maxInsertBlockSize()) {
flush();
}
}
private void flush() throws Exception {
BackOff backOff = retryBackoff.backoff();
int attempt = 0;
if (buffer.isEmpty()) {
return;
}
batchSize.update(buffer.size());
while (true) {
try (ClickHouseStatement statement = connection.createStatement()) {
statement.sendRowBinaryStream(
insertSql(schema(), table()),
stream -> {
for (Row row : buffer) {
ClickHouseWriter.writeRow(stream, schema(), row);
}
});
buffer.clear();
break;
} catch (SQLException e) {
if (!BackOffUtils.next(Sleeper.DEFAULT, backOff)) {
throw e;
} else {
retries.inc();
LOG.warn(String.format(RETRY_ATTEMPT_LOG, attempt), e);
attempt++;
}
}
}
}
@AutoValue.Builder
abstract static class Builder<T> {
public abstract Builder<T> jdbcUrl(String jdbcUrl);
public abstract Builder<T> table(String table);
public abstract Builder<T> maxInsertBlockSize(long maxInsertBlockSize);
public abstract Builder<T> schema(TableSchema schema);
public abstract Builder<T> properties(Properties properties);
public abstract Builder<T> maxRetries(int maxRetries);
public abstract Builder<T> maxCumulativeBackoff(Duration maxCumulativeBackoff);
public abstract Builder<T> initialBackoff(Duration initialBackoff);
public abstract WriteFn<T> build();
}
}
/**
* Returns {@link TableSchema} for a given table.
*
* @param jdbcUrl jdbc connection url
* @param table table name
* @return table schema
*/
public static TableSchema getTableSchema(String jdbcUrl, String table) {
List<TableSchema.Column> columns = new ArrayList<>();
try (ClickHouseConnection connection = new ClickHouseDataSource(jdbcUrl).getConnection();
Statement statement = connection.createStatement()) {
ResultSet rs = null; // try-finally is used because findbugs doesn't like try-with-resource
try {
rs = statement.executeQuery("DESCRIBE TABLE " + quoteIdentifier(table));
while (rs.next()) {
String name = rs.getString("name");
String type = rs.getString("type");
String defaultTypeStr = rs.getString("default_type");
String defaultExpression = rs.getString("default_expression");
ColumnType columnType = ColumnType.parse(type);
DefaultType defaultType = DefaultType.parse(defaultTypeStr).orElse(null);
Object defaultValue;
if (DefaultType.DEFAULT.equals(defaultType)
&& !Strings.isNullOrEmpty(defaultExpression)) {
defaultValue = ColumnType.parseDefaultExpression(columnType, defaultExpression);
} else {
defaultValue = null;
}
columns.add(TableSchema.Column.of(name, columnType, defaultType, defaultValue));
}
} finally {
if (rs != null) {
rs.close();
}
}
return TableSchema.of(columns.toArray(new TableSchema.Column[0]));
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
static String quoteIdentifier(String identifier) {
String backslash = "\\\\";
String quote = "\"";
return quote + identifier.replaceAll(quote, backslash + quote) + quote;
}
}