blob: 2bbe319c6f96efe9070b686543680fc18c2fd7f1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.jdbc;
import com.google.common.collect.Lists;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
/**
* A Simple abstract class for Jdbc sink.
*/
@Slf4j
public abstract class JdbcAbstractSink<T> implements Sink<T> {
// ----- Runtime fields
private JdbcSinkConfig jdbcSinkConfig;
@Getter
private Connection connection;
private String jdbcUrl;
private String tableName;
private JdbcUtils.TableId tableId;
private PreparedStatement insertStatement;
private PreparedStatement updateStatement;
private PreparedStatement deleteStatement;
protected static final String ACTION = "ACTION";
protected static final String INSERT = "INSERT";
protected static final String UPDATE = "UPDATE";
protected static final String DELETE = "DELETE";
protected JdbcUtils.TableDefinition tableDefinition;
// for flush
private List<Record<T>> incomingList;
private List<Record<T>> swapList;
private AtomicBoolean isFlushing;
private int batchSize;
private ScheduledExecutorService flushExecutor;
@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
jdbcSinkConfig = JdbcSinkConfig.load(config);
jdbcUrl = jdbcSinkConfig.getJdbcUrl();
if (jdbcSinkConfig.getJdbcUrl() == null) {
throw new IllegalArgumentException("Required jdbc Url not set.");
}
Properties properties = new Properties();
String username = jdbcSinkConfig.getUserName();
String password = jdbcSinkConfig.getPassword();
if (username != null) {
properties.setProperty("user", username);
}
if (password != null) {
properties.setProperty("password", password);
}
Class.forName(JdbcUtils.getDriverClassName(jdbcSinkConfig.getJdbcUrl()));
connection = DriverManager.getConnection(jdbcSinkConfig.getJdbcUrl(), properties);
connection.setAutoCommit(false);
log.info("Opened jdbc connection: {}, autoCommit: {}", jdbcUrl, connection.getAutoCommit());
tableName = jdbcSinkConfig.getTableName();
tableId = JdbcUtils.getTableId(connection, tableName);
// Init PreparedStatement include insert, delete, update
initStatement();
int timeoutMs = jdbcSinkConfig.getTimeoutMs();
batchSize = jdbcSinkConfig.getBatchSize();
incomingList = Lists.newArrayList();
swapList = Lists.newArrayList();
isFlushing = new AtomicBoolean(false);
flushExecutor = Executors.newScheduledThreadPool(1);
flushExecutor.scheduleAtFixedRate(this::flush, timeoutMs, timeoutMs, TimeUnit.MILLISECONDS);
}
private void initStatement() throws Exception {
List<String> keyList = Lists.newArrayList();
String key = jdbcSinkConfig.getKey();
if (key !=null && !key.isEmpty()) {
keyList = Arrays.asList(key.split(","));
}
List<String> nonKeyList = Lists.newArrayList();
String nonKey = jdbcSinkConfig.getNonKey();
if (nonKey != null && !nonKey.isEmpty()) {
nonKeyList = Arrays.asList(nonKey.split(","));
}
tableDefinition = JdbcUtils.getTableDefinition(connection, tableId, keyList, nonKeyList);
insertStatement = JdbcUtils.buildInsertStatement(connection, JdbcUtils.buildInsertSql(tableDefinition));
if (!nonKeyList.isEmpty()) {
updateStatement = JdbcUtils.buildUpdateStatement(connection, JdbcUtils.buildUpdateSql(tableDefinition));
}
if (!keyList.isEmpty()) {
deleteStatement = JdbcUtils.buildDeleteStatement(connection, JdbcUtils.buildDeleteSql(tableDefinition));
}
}
@Override
public void close() throws Exception {
if (!connection.getAutoCommit()) {
connection.commit();
}
flushExecutor.shutdown();
if (connection != null) {
connection.close();
}
log.info("Closed jdbc connection: {}", jdbcUrl);
}
@Override
public void write(Record<T> record) throws Exception {
int number;
synchronized (this) {
incomingList.add(record);
number = incomingList.size();
}
if (number == batchSize) {
flushExecutor.schedule(this::flush, 0, TimeUnit.MILLISECONDS);
}
}
// bind value with a PreparedStetement
public abstract void bindValue(
PreparedStatement statement,
Record<T> message, String action) throws Exception;
private void flush() {
// if not in flushing state, do flush, else return;
if (incomingList.size() > 0 && isFlushing.compareAndSet(false, true)) {
if (log.isDebugEnabled()) {
log.debug("Starting flush, queue size: {}", incomingList.size());
}
if (!swapList.isEmpty()) {
throw new IllegalStateException("swapList should be empty since last flush. swapList.size: " + swapList.size());
}
synchronized (this) {
List<Record<T>> tmpList;
swapList.clear();
tmpList = swapList;
swapList = incomingList;
incomingList = tmpList;
}
int count = 0;
try {
// bind each record value
for (Record<T> record : swapList) {
String action = record.getProperties().get(ACTION);
if (action == null) {
action = INSERT;
}
switch (action) {
case DELETE:
bindValue(deleteStatement, record, action);
count += 1;
deleteStatement.execute();
break;
case UPDATE:
bindValue(updateStatement, record, action);
count += 1;
updateStatement.execute();
break;
case INSERT:
bindValue(insertStatement, record, action);
count += 1;
insertStatement.execute();
break;
default:
String msg = String.format("Unsupported action %s, can be one of %s, or not set which indicate %s",
action, Arrays.asList(INSERT, UPDATE, DELETE), INSERT);
throw new IllegalArgumentException(msg);
}
}
connection.commit();
swapList.forEach(Record::ack);
} catch (Exception e) {
log.error("Got exception ", e);
swapList.forEach(Record::fail);
}
if (swapList.size() != count) {
log.error("Update count {} not match total number of records {}", count, swapList.size());
}
// finish flush
if (log.isDebugEnabled()) {
log.debug("Finish flush, queue size: {}", swapList.size());
}
swapList.clear();
isFlushing.set(false);
} else {
if (log.isDebugEnabled()) {
log.debug("Already in flushing state, will not flush, queue size: {}", incomingList.size());
}
}
}
}