blob: c83ca8b4667147d2c3fb5b7c3855d1130f75285f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.testutil;
import com.google.common.base.Preconditions;
import java.io.Closeable;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Simple connection pooling implementation which opens a given number of connections
* to HS2 and reuses the connections to submit the queries. It exposes a autocloseable
* HiveJdbcClient class which can be use to submit hive queries. When the close is
* called on the HiveJdbcClient it releases the connections back to the pool so that it
* can reused by another query
*/
public class HiveJdbcClientPool implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(ImpalaJdbcClient.class);
private static AtomicInteger clientIdGenerator = new AtomicInteger(0);
private final static String HIVE_SERVER2_DRIVER_NAME =
"org.apache.hive.jdbc.HiveDriver";
private final int poolSize_;
private final BlockingQueue<HiveJdbcClient> freeClients_;
private final long timeoutInSeconds_;
private final static int DEFAULT_PORT_NUMBER = 11050;
public class HiveJdbcClient implements AutoCloseable {
private final Connection conn_;
private Statement stmt_;
private final int clientId;
private HiveJdbcClient(String connString) throws SQLException {
conn_ = DriverManager.getConnection(connString);
stmt_ = conn_.createStatement();
clientId = clientIdGenerator.getAndIncrement();
}
public int getClientId() { return clientId; }
@Override
public void close() throws SQLException {
if (stmt_ != null) {
stmt_.close();
}
freeClients_.add(this);
}
private void validateConnection() throws SQLException {
Preconditions.checkNotNull(conn_,"Connection not initialized.");
Preconditions.checkState(!conn_.isClosed(), "Connection is not open");
Preconditions.checkNotNull(stmt_);
// Re-open if the statement if it has been closed.
if (stmt_.isClosed()) {
stmt_ = conn_.createStatement();
}
}
/*
* Executes the given query and returns the ResultSet. Will re-open the Statement
* if needed.
*/
public ResultSet execQuery(String query) throws SQLException {
validateConnection();
LOG.debug("Executing: " + query);
return stmt_.executeQuery(query);
}
/**
* Executes a given query and returns true if the query is successful
*/
public boolean executeSql(String sql) throws SQLException {
validateConnection();
LOG.debug("Executing sql : " + sql);
return stmt_.execute(sql);
}
}
public HiveJdbcClient getClient() throws TimeoutException, InterruptedException {
try {
HiveJdbcClient client = freeClients_.poll(timeoutInSeconds_, TimeUnit.SECONDS);
if (client == null) {
throw new TimeoutException("Timed out while waiting to get a "
+ "new client. Consider increasing the pool size");
}
return client;
} catch (InterruptedException e) {
LOG.error("Interrupted while waiting to a Hive JDBC client", e);
throw e;
}
}
private HiveJdbcClientPool(int poolsize, long timeoutInSeconds)
throws ClassNotFoundException, SQLException {
Preconditions.checkArgument(poolsize > 0);
this.poolSize_ = poolsize;
this.timeoutInSeconds_ = timeoutInSeconds;
this.freeClients_ = new LinkedBlockingQueue<>(poolsize);
LOG.info("Using JDBC Driver Name: " + HIVE_SERVER2_DRIVER_NAME);
// Make sure the driver can be found, throws a ClassNotFoundException if
// it is not available.
Class.forName(HIVE_SERVER2_DRIVER_NAME);
String connString = String.format(TestUtils.HS2_CONNECTION_TEMPLATE,
DEFAULT_PORT_NUMBER,
"default");
LOG.info("Using connection string: " + connString);
for (int i = 0; i < poolSize_; i++) {
freeClients_.add(new HiveJdbcClient(connString));
}
}
public static synchronized HiveJdbcClientPool create(int poolSize)
throws SQLException, ClassNotFoundException {
return new HiveJdbcClientPool(poolSize, 5 * 60);
}
/*
* Closes the internal Statement and Connection objects. If they are already closed
* this is a no-op.
*/
@Override
public void close() {
int closedCount = poolSize_;
while (closedCount > 0) {
try {
HiveJdbcClient client = freeClients_.poll(5 * 60, TimeUnit.SECONDS);
if (client.stmt_ != null) { client.stmt_.close(); }
if (client.conn_ != null) { client.conn_.close(); }
closedCount--;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}