blob: 4f30c4adec155a6ce0c658faacb7b40bd49ff070 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.jdbc.internal;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
import org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl;
import org.apache.flink.connector.jdbc.utils.JdbcUtils;
import org.apache.flink.types.Row;
import javax.annotation.Nonnull;
import java.util.HashMap;
import static org.apache.flink.connector.jdbc.utils.JdbcUtils.setRecordToStatement;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** A JDBC Row outputFormat that supports batching records before writing records to database. */
@Internal
public class RowJdbcOutputFormat<In>
extends JdbcOutputFormat<In, Row, JdbcBatchStatementExecutor<Row>> {
public RowJdbcOutputFormat(
@Nonnull JdbcConnectionProvider connectionProvider,
@Nonnull JdbcExecutionOptions executionOptions,
@Nonnull
StatementExecutorFactory<JdbcBatchStatementExecutor<Row>>
statementExecutorFactory) {
super(connectionProvider, executionOptions, statementExecutorFactory);
}
static JdbcBatchStatementExecutor<Row> createSimpleRowExecutor(String sql, int[] fieldTypes) {
return JdbcBatchStatementExecutor.simple(sql, createRowJdbcStatementBuilder(fieldTypes));
}
/**
* Creates a {@link JdbcStatementBuilder} for {@link Row} using the provided SQL types array.
* Uses {@link JdbcUtils#setRecordToStatement}
*/
static JdbcStatementBuilder<Row> createRowJdbcStatementBuilder(int[] types) {
return (st, record) -> setRecordToStatement(st, types, record);
}
public static Builder builder() {
return new Builder();
}
/** Builder for a {@link JdbcOutputFormat} using Row. */
public static class Builder {
private InternalJdbcConnectionOptions options;
private JdbcDmlOptions.JdbcDmlOptionsBuilder dmlOptionsBuilder = JdbcDmlOptions.builder();
private JdbcExecutionOptions.Builder executionOptionsBuilder =
JdbcExecutionOptions.builder();
/** required, jdbc options. */
public Builder setOptions(InternalJdbcConnectionOptions options) {
this.options = options;
this.dmlOptionsBuilder
.withTableName(options.getTableName())
.withDialect(options.getDialect());
return this;
}
/** required, field names of this jdbc sink. */
public Builder setFieldNames(String[] fieldNames) {
this.dmlOptionsBuilder.withFieldNames(fieldNames);
return this;
}
/** required, upsert unique keys. */
public Builder setKeyFields(String[] keyFields) {
this.dmlOptionsBuilder.withKeyFields(keyFields);
return this;
}
/** required, field types of this jdbc sink. */
public Builder setFieldTypes(int[] fieldTypes) {
this.dmlOptionsBuilder.withFieldTypes(fieldTypes);
return this;
}
/**
* optional, flush max size (includes all append, upsert and delete records), over this
* number of records, will flush data.
*/
public Builder setFlushMaxSize(int flushMaxSize) {
executionOptionsBuilder.withBatchSize(flushMaxSize);
return this;
}
/** optional, flush interval mills, over this time, asynchronous threads will flush data. */
public Builder setFlushIntervalMills(long flushIntervalMills) {
executionOptionsBuilder.withBatchIntervalMs(flushIntervalMills);
return this;
}
/** optional, max retry times for jdbc connector. */
public Builder setMaxRetryTimes(int maxRetryTimes) {
executionOptionsBuilder.withMaxRetries(maxRetryTimes);
return this;
}
/**
* Finalizes the configuration and checks validity.
*
* @return Configured JdbcUpsertOutputFormat
*/
public JdbcOutputFormat<Row, Row, JdbcBatchStatementExecutor<Row>> build() {
checkNotNull(options, "No options supplied.");
JdbcDmlOptions dml = this.dmlOptionsBuilder.build();
// warn: don't close over builder fields
String sql =
FieldNamedPreparedStatementImpl.parseNamedStatement(
options.getDialect()
.getInsertIntoStatement(
dml.getTableName(), dml.getFieldNames()),
new HashMap<>());
return new RowJdbcOutputFormat<>(
new SimpleJdbcConnectionProvider(options),
executionOptionsBuilder.build(),
() -> createSimpleRowExecutor(sql, dml.getFieldTypes()));
}
}
}