blob: 61f8e1135f395d436af2b55e37e3aa49f56952f2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.spi.discovery.tcp.ipfinder.jdbc;
import java.net.InetSocketAddress;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collection;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.sql.DataSource;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.IgniteSpiConfiguration;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinderAdapter;
import static java.sql.Connection.TRANSACTION_READ_COMMITTED;
/**
* JDBC-based IP finder.
* <h1 class="header">Configuration</h1>
* <h2 class="header">Mandatory</h2>
* <ul>
* <li>Data source (see {@link #setDataSource(DataSource)}).</li>
* </ul>
* <h2 class="header">Optional</h2>
* The following configuration parameters are optional:
* <ul>
* <li>Flag indicating whether DB schema should be initialized by Ignite (default behaviour) or
* was explicitly created by user (see {@link #setInitSchema(boolean)})</li>
* </ul>
* <p>
* The database will contain 1 table which will hold IP addresses.
*/
public class TcpDiscoveryJdbcIpFinder extends TcpDiscoveryIpFinderAdapter {
/** Grid logger. */
@LoggerResource
private IgniteLogger log;
/** Data source. */
private DataSource dataSrc;
/** Flag for schema initialization. */
private boolean initSchema = true;
/** Init guard. */
@GridToStringExclude
private final AtomicBoolean initGuard = new AtomicBoolean();
/** Init latch. */
@GridToStringExclude
private final CountDownLatch initLatch = new CountDownLatch(1);
/** Table name. */
private final String addrTableName;
/** Query to get addresses. */
private final String getAddrsQry;
/** Query to register address. */
private final String regAddrQry;
/** Query to unregister address. */
private final String unregAddrQry;
/** Query to create addresses table. */
private final String createAddrsTableQry;
/** Query to check database validity. */
private final String chkQry;
/**
* Constructor.
*/
public TcpDiscoveryJdbcIpFinder() {
this(new BasicJdbcIpFinderDialect());
}
/** @param jdbcDialect SQL dialect to use with {@link TcpDiscoveryJdbcIpFinder}. */
public TcpDiscoveryJdbcIpFinder(JdbcIpFinderDialect jdbcDialect) {
setShared(true);
this.addrTableName = jdbcDialect.tableName();
this.getAddrsQry = "select hostname, port from " + addrTableName;
this.regAddrQry = "insert into " + addrTableName + " values (?, ?)";
this.unregAddrQry = "delete from " + addrTableName + " where hostname = ? and port = ?";
this.createAddrsTableQry = "create table " + addrTableName + " (hostname VARCHAR(1024), port INT)";
this.chkQry = "select count(*) from " + addrTableName;
}
/** {@inheritDoc} */
@Override public Collection<InetSocketAddress> getRegisteredAddresses() throws IgniteSpiException {
init();
Connection conn = null;
PreparedStatement stmt = null;
ResultSet rs = null;
try {
conn = dataSrc.getConnection();
conn.setTransactionIsolation(TRANSACTION_READ_COMMITTED);
stmt = conn.prepareStatement(getAddrsQry);
rs = stmt.executeQuery();
Collection<InetSocketAddress> addrs = new LinkedList<>();
while (rs.next())
addrs.add(new InetSocketAddress(rs.getString(1), rs.getInt(2)));
return addrs;
}
catch (SQLException e) {
throw new IgniteSpiException("Failed to get registered addresses version.", e);
}
finally {
U.closeQuiet(rs);
U.closeQuiet(stmt);
U.closeQuiet(conn);
}
}
/** {@inheritDoc} */
@Override public void registerAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException {
assert !F.isEmpty(addrs);
init();
Connection conn = null;
PreparedStatement stmtUnreg = null;
PreparedStatement stmtReg = null;
boolean committed = false;
try {
conn = dataSrc.getConnection();
conn.setAutoCommit(false);
conn.setTransactionIsolation(TRANSACTION_READ_COMMITTED);
stmtUnreg = conn.prepareStatement(unregAddrQry);
stmtReg = conn.prepareStatement(regAddrQry);
for (InetSocketAddress addr : addrs) {
stmtUnreg.setString(1, addr.getAddress().getHostAddress());
stmtUnreg.setInt(2, addr.getPort());
stmtUnreg.addBatch();
stmtReg.setString(1, addr.getAddress().getHostAddress());
stmtReg.setInt(2, addr.getPort());
stmtReg.addBatch();
}
stmtUnreg.executeBatch();
stmtUnreg.close();
stmtReg.executeBatch();
stmtReg.close();
conn.commit();
committed = true;
}
catch (SQLException e) {
U.rollbackConnectionQuiet(conn);
throw new IgniteSpiException("Failed to register addresses: " + addrs, e);
}
finally {
if (!committed)
U.rollbackConnectionQuiet(conn);
U.closeQuiet(stmtUnreg);
U.closeQuiet(stmtReg);
U.closeQuiet(conn);
}
}
/** {@inheritDoc} */
@Override public void unregisterAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException {
assert !F.isEmpty(addrs);
init();
Connection conn = null;
PreparedStatement stmt = null;
boolean committed = false;
try {
conn = dataSrc.getConnection();
conn.setAutoCommit(false);
conn.setTransactionIsolation(TRANSACTION_READ_COMMITTED);
stmt = conn.prepareStatement(unregAddrQry);
for (InetSocketAddress addr : addrs) {
stmt.setString(1, addr.getAddress().getHostAddress());
stmt.setInt(2, addr.getPort());
stmt.addBatch();
}
stmt.executeBatch();
conn.commit();
committed = true;
}
catch (SQLException e) {
U.rollbackConnectionQuiet(conn);
throw new IgniteSpiException("Failed to unregister addresses: " + addrs, e);
}
finally {
if (!committed)
U.rollbackConnectionQuiet(conn);
U.closeQuiet(stmt);
U.closeQuiet(conn);
}
}
/**
* Sets data source.
* <p>
* Data source should be fully configured and ready-to-use.
*
* @param dataSrc Data source.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = false)
public TcpDiscoveryJdbcIpFinder setDataSource(DataSource dataSrc) {
this.dataSrc = dataSrc;
return this;
}
/**
* Flag indicating whether DB schema should be initialized by Ignite (default behaviour) or
* was explicitly created by user.
*
* @param initSchema {@code True} if DB schema should be initialized by Ignite (default behaviour),
* {code @false} if schema was explicitly created by user.
* @return {@code this} for chaining.
*/
@IgniteSpiConfiguration(optional = true)
public TcpDiscoveryJdbcIpFinder setInitSchema(boolean initSchema) {
this.initSchema = initSchema;
return this;
}
/**
* Checks configuration validity.
*
* @throws org.apache.ignite.spi.IgniteSpiException If any error occurs.
*/
private void init() throws IgniteSpiException {
if (initGuard.compareAndSet(false, true)) {
if (dataSrc == null)
throw new IgniteSpiException("Data source is null (you must configure it via setDataSource(..)" +
" configuration property)");
if (!initSchema) {
initLatch.countDown();
checkSchema();
return;
}
Connection conn = null;
boolean committed = false;
try {
conn = dataSrc.getConnection();
conn.setAutoCommit(false);
conn.setTransactionIsolation(TRANSACTION_READ_COMMITTED);
DatabaseMetaData dbm = conn.getMetaData();
// Many JDBC implementations support an 'if not exists' clause
// in the create statement which will check and create atomically.
// However not all databases support it, for example Oracle,
// so we do not use it.
try (ResultSet tables = dbm.getTables(null, null, addrTableName, null)) {
if (!tables.next()) {
// Table does not exist
// Create tbl_addrs.
try (Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createAddrsTableQry);
conn.commit();
}
catch (SQLException e) {
// Due to a race condition, the table may have been
// created since we tested above for its existence.
// We must ignore the exception if this is the
// cause.
// However different JDBC driver implementations may
// return different codes and messages in the
// exception, so the safest way to determine if this
// exception is to be ignored is to test again to
// see if the table has been created.
try (ResultSet tablesAgain = dbm.getTables(null, null, addrTableName, null)) {
if (!tablesAgain.next())
throw e;
}
}
}
}
committed = true;
if (log.isDebugEnabled())
log.debug("DB schema has been initialized.");
}
catch (SQLException e) {
U.rollbackConnectionQuiet(conn);
throw new IgniteSpiException("Failed to initialize DB schema.", e);
}
finally {
if (!committed)
U.rollbackConnectionQuiet(conn);
U.closeQuiet(conn);
initLatch.countDown();
}
}
else
checkSchema();
}
/**
* Checks correctness of existing DB schema.
*
* @throws org.apache.ignite.spi.IgniteSpiException If schema wasn't properly initialized.
*/
private void checkSchema() throws IgniteSpiException {
try {
U.await(initLatch);
}
catch (IgniteInterruptedCheckedException e) {
throw new IgniteSpiException("Thread has been interrupted.", e);
}
Connection conn = null;
Statement stmt = null;
try {
conn = dataSrc.getConnection();
conn.setTransactionIsolation(TRANSACTION_READ_COMMITTED);
// Check if tbl_addrs exists and database initialized properly.
stmt = conn.createStatement();
stmt.execute(chkQry);
}
catch (SQLException e) {
throw new IgniteSpiException("IP finder has not been properly initialized.", e);
}
finally {
U.closeQuiet(stmt);
U.closeQuiet(conn);
}
}
/** {@inheritDoc} */
@Override public TcpDiscoveryJdbcIpFinder setShared(boolean shared) {
super.setShared(shared);
return this;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpDiscoveryJdbcIpFinder.class, this);
}
}