blob: de999b7388a6f38b9006eab4a8a5712c5eedfd7d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph.backend.store.mysql;
import java.net.SocketTimeoutException;
import java.net.URISyntaxException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.IntStream;
import org.apache.http.client.utils.URIBuilder;
import org.apache.logging.log4j.util.Strings;
import org.slf4j.Logger;
import org.apache.hugegraph.backend.BackendException;
import org.apache.hugegraph.backend.store.BackendSession.AbstractBackendSession;
import org.apache.hugegraph.backend.store.BackendSessionPool;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Log;
public class MysqlSessions extends BackendSessionPool {
private static final Logger LOG = Log.logger(MysqlSessions.class);
private static final String JDBC_PREFIX = "jdbc:";
private static final int DROP_DB_TIMEOUT = 10000;
private HugeConfig config;
private String database;
private volatile boolean opened;
public MysqlSessions(HugeConfig config, String database, String store) {
super(config, database + "/" + store);
this.config = config;
this.database = database;
this.opened = false;
}
@Override
public HugeConfig config() {
return this.config;
}
public String database() {
return this.database;
}
public String escapedDatabase() {
return MysqlUtil.escapeString(this.database());
}
/**
* Try connect with specified database, will not reconnect if failed
* @throws SQLException if a database access error occurs
*/
@Override
public synchronized void open() throws Exception {
try (Connection conn = this.open(false)) {
this.opened = true;
}
}
@Override
protected boolean opened() {
return this.opened;
}
@Override
protected void doClose() {
// pass
}
@Override
public Session session() {
return (Session) super.getOrNewSession();
}
@Override
protected Session newSession() {
return new Session();
}
public void createDatabase() {
// Create database with non-database-session
LOG.debug("Create database: {}", this.database());
String sql = this.buildCreateDatabase(this.database());
try (Connection conn = this.openWithoutDB(0)) {
conn.createStatement().execute(sql);
} catch (SQLException e) {
if (!e.getMessage().endsWith("already exists")) {
throw new BackendException("Failed to create database '%s'", e,
this.database());
}
// Ignore exception if database already exists
}
}
public void dropDatabase() {
LOG.debug("Drop database: {}", this.database());
String sql = this.buildDropDatabase(this.database());
try (Connection conn = this.openWithoutDB(DROP_DB_TIMEOUT)) {
conn.createStatement().execute(sql);
} catch (SQLException e) {
if (e.getCause() instanceof SocketTimeoutException) {
LOG.warn("Drop database '{}' timeout", this.database());
} else {
throw new BackendException("Failed to drop database '%s'", e,
this.database());
}
}
}
public boolean existsDatabase() {
try (Connection conn = this.openWithoutDB(0);
ResultSet result = conn.getMetaData().getCatalogs()) {
while (result.next()) {
String dbName = result.getString(1);
if (dbName.equals(this.database())) {
return true;
}
}
} catch (Exception e) {
throw new BackendException("Failed to obtain database info", e);
}
return false;
}
public boolean existsTable(String table) {
String sql = this.buildExistsTable(table);
try (Connection conn = this.openWithDB(0);
ResultSet result = conn.createStatement().executeQuery(sql)) {
return result.next();
} catch (Exception e) {
throw new BackendException("Failed to obtain table info", e);
}
}
public void resetConnections() {
// Close the under layer connections owned by each thread
this.forceResetSessions();
}
protected String buildCreateDatabase(String database) {
return String.format("CREATE DATABASE IF NOT EXISTS %s " +
"DEFAULT CHARSET utf8 COLLATE utf8_bin;",
database);
}
protected String buildDropDatabase(String database) {
return String.format("DROP DATABASE IF EXISTS %s;", database);
}
protected String buildExistsTable(String table) {
return String.format("SELECT * FROM information_schema.tables " +
"WHERE table_schema = '%s' " +
"AND table_name = '%s' LIMIT 1;",
this.escapedDatabase(),
MysqlUtil.escapeString(table));
}
/**
* Connect DB without specified database
*/
protected Connection openWithoutDB(int timeout) {
String url = this.buildUri(false, false, false, timeout);
try {
return this.connect(url);
} catch (SQLException e) {
throw new BackendException("Failed to access %s", e, url);
}
}
/**
* Connect DB with specified database, but won't auto reconnect
*/
protected Connection openWithDB(int timeout) {
String url = this.buildUri(false, true, false, timeout);
try {
return this.connect(url);
} catch (SQLException e) {
throw new BackendException("Failed to access %s", e, url);
}
}
/**
* Connect DB with specified database
*/
private Connection open(boolean autoReconnect) throws SQLException {
String url = this.buildUri(true, true, autoReconnect, null);
return this.connect(url);
}
protected String buildUri(boolean withConnParams, boolean withDB,
boolean autoReconnect, Integer timeout) {
String url = this.buildUrlPrefix(withDB);
boolean forcedAutoReconnect = this.config.get(
MysqlOptions.JDBC_FORCED_AUTO_RECONNECT);
int maxTimes = this.config.get(MysqlOptions.JDBC_RECONNECT_MAX_TIMES);
int interval = this.config.get(MysqlOptions.JDBC_RECONNECT_INTERVAL);
String sslMode = this.config.get(MysqlOptions.JDBC_SSL_MODE);
E.checkArgument(url.startsWith(JDBC_PREFIX),
"The url must start with '%s': '%s'",
JDBC_PREFIX, url);
String urlWithoutJdbc = url.substring(JDBC_PREFIX.length());
URIBuilder builder;
try {
builder = this.newConnectionURIBuilder(urlWithoutJdbc);
} catch (URISyntaxException e) {
throw new BackendException("Invalid url '%s'", e, url);
}
if (forcedAutoReconnect) {
autoReconnect = true;
}
if (withConnParams || forcedAutoReconnect) {
builder.setParameter("characterEncoding", "utf-8")
.setParameter("rewriteBatchedStatements", "true")
.setParameter("useServerPrepStmts", "false")
.setParameter("autoReconnect", String.valueOf(autoReconnect))
.setParameter("maxReconnects", String.valueOf(maxTimes))
.setParameter("initialTimeout", String.valueOf(interval));
}
if (timeout != null) {
builder.setParameter("socketTimeout", String.valueOf(timeout));
}
builder.setParameter("useSSL", sslMode);
return JDBC_PREFIX + builder.toString();
}
protected String buildUrlPrefix(boolean withDB) {
String url = this.config.get(MysqlOptions.JDBC_URL);
if (!url.endsWith("/")) {
url = String.format("%s/", url);
}
String database = withDB ? this.database() : this.connectDatabase();
return String.format("%s%s", url, database);
}
protected String connectDatabase() {
return Strings.EMPTY;
}
protected URIBuilder newConnectionURIBuilder(String url)
throws URISyntaxException {
return new URIBuilder(url);
}
private Connection connect(String url) throws SQLException {
LOG.info("Connect to the jdbc url: '{}'", url);
String driverName = this.config.get(MysqlOptions.JDBC_DRIVER);
String username = this.config.get(MysqlOptions.JDBC_USERNAME);
String password = this.config.get(MysqlOptions.JDBC_PASSWORD);
try {
// Register JDBC driver
Class.forName(driverName);
} catch (ClassNotFoundException e) {
throw new BackendException("Failed to register JDBC driver. Class '%s' not found. Please check if the MySQL driver package is available.",
driverName);
}
return DriverManager.getConnection(url, username, password);
}
public class Session extends AbstractBackendSession {
private Connection conn;
private Map<String, PreparedStatement> statements;
private int count;
public Session() {
this.conn = null;
this.statements = new HashMap<>();
this.count = 0;
}
public HugeConfig config() {
return MysqlSessions.this.config();
}
@Override
public void open() {
try {
this.doOpen();
} catch (SQLException e) {
throw new BackendException("Failed to open connection", e);
}
}
private void tryOpen() {
try {
this.doOpen();
} catch (SQLException ignored) {
// Ignore
}
}
private void doOpen() throws SQLException {
this.opened = true;
if (this.conn != null && !this.conn.isClosed()) {
return;
}
this.conn = MysqlSessions.this.open(true);
}
@Override
public void close() {
assert this.closeable();
if (this.conn == null) {
return;
}
this.opened = false;
this.doClose();
}
private void doClose() {
SQLException exception = null;
for (PreparedStatement statement : this.statements.values()) {
try {
statement.close();
} catch (SQLException e) {
exception = e;
}
}
this.statements.clear();
try {
this.conn.close();
} catch (SQLException e) {
exception = e;
} finally {
this.conn = null;
}
if (exception != null) {
throw new BackendException("Failed to close connection",
exception);
}
}
@Override
public boolean opened() {
if (this.opened && this.conn == null) {
// Reconnect if the connection is reset
tryOpen();
}
return this.opened && this.conn != null;
}
@Override
public boolean closed() {
if (!this.opened || this.conn == null) {
return true;
}
try {
return this.conn.isClosed();
} catch (SQLException ignored) {
// Assume closed here
return true;
}
}
public void clear() {
this.count = 0;
SQLException exception = null;
for (PreparedStatement statement : this.statements.values()) {
try {
statement.clearBatch();
} catch (SQLException e) {
exception = e;
}
}
if (exception != null) {
/*
* Will throw exception when the database connection error,
* we clear statements because clearBatch() failed
*/
this.statements = new HashMap<>();
}
}
public void begin() throws SQLException {
this.conn.setAutoCommit(false);
}
public void end() throws SQLException {
this.conn.setAutoCommit(true);
}
public void endAndLog() {
try {
this.conn.setAutoCommit(true);
} catch (SQLException e) {
LOG.warn("Failed to set connection to auto-commit status", e);
}
}
@Override
public Integer commit() {
int updated = 0;
try {
for (PreparedStatement statement : this.statements.values()) {
updated += IntStream.of(statement.executeBatch()).sum();
}
this.conn.commit();
this.clear();
} catch (SQLException e) {
throw new BackendException("Failed to commit", e);
}
/*
* Can't call endAndLog() in `finally` block here.
* Because If commit already failed with an exception,
* then rollback() should be called. Besides rollback() can only
* be called when autocommit=false and rollback() will always set
* autocommit=true. Therefore only commit successfully should set
* autocommit=true here
*/
this.endAndLog();
return updated;
}
@Override
public void rollback() {
this.clear();
try {
this.conn.rollback();
} catch (SQLException e) {
throw new BackendException("Failed to rollback", e);
} finally {
this.endAndLog();
}
}
@Override
public boolean hasChanges() {
return this.count > 0;
}
@Override
public void reconnectIfNeeded() {
if (!this.opened) {
return;
}
if (this.conn == null) {
tryOpen();
}
try {
this.execute("SELECT 1;");
} catch (SQLException ignored) {
// pass
}
}
@Override
public void reset() {
// NOTE: this method may be called by other threads
if (this.conn == null) {
return;
}
try {
this.doClose();
} catch (Throwable e) {
LOG.warn("Failed to reset connection", e);
}
}
public ResultSetWrapper select(String sql) throws SQLException {
assert this.conn.getAutoCommit();
Statement statement = this.conn.createStatement();
try {
ResultSet rs = statement.executeQuery(sql);
return new ResultSetWrapper(rs, statement);
} catch (SQLException e) {
statement.close();
throw e;
}
}
public boolean execute(String sql) throws SQLException {
/*
* commit() or rollback() failed to set connection to auto-commit
* status in prior transaction. Manually set to auto-commit here.
*/
if (!this.conn.getAutoCommit()) {
this.end();
}
try (Statement statement = this.conn.createStatement()) {
return statement.execute(sql);
}
}
public void add(PreparedStatement statement) {
try {
// Add a row to statement
statement.addBatch();
this.count++;
} catch (SQLException e) {
throw new BackendException("Failed to add statement '%s' " +
"to batch", e, statement);
}
}
public PreparedStatement prepareStatement(String sqlTemplate)
throws SQLException {
PreparedStatement statement = this.statements.get(sqlTemplate);
if (statement == null) {
statement = this.conn.prepareStatement(sqlTemplate);
this.statements.putIfAbsent(sqlTemplate, statement);
}
return statement;
}
}
}