blob: 5a9423796fd38bda87d88e542c9bd89ae0767671 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.sql;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
import org.apache.solr.client.solrj.io.stream.SolrStream;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
class StatementImpl implements Statement {
private final ConnectionImpl connection;
private boolean closed;
private String currentSQL;
private ResultSetImpl currentResultSet;
private SQLWarning currentWarning;
private int maxRows;
StatementImpl(ConnectionImpl connection) {
this.connection = connection;
}
private void checkClosed() throws SQLException {
if(isClosed()) {
throw new SQLException("Statement is closed.");
}
}
private ResultSet executeQueryImpl(String sql) throws SQLException {
try {
if(this.currentResultSet != null) {
this.currentResultSet.close();
this.currentResultSet = null;
}
if(maxRows > 0 && !containsLimit(sql)) {
sql = sql + " limit "+Integer.toString(maxRows);
}
closed = false; // If closed reopen so Statement can be reused.
this.currentResultSet = new ResultSetImpl(this, constructStream(sql));
return this.currentResultSet;
} catch(Exception e) {
throw new SQLException(e);
}
}
protected SolrStream constructStream(String sql) throws IOException {
try {
ZkStateReader zkStateReader = this.connection.getClient().getZkStateReader();
Slice[] slices = CloudSolrStream.getSlices(this.connection.getCollection(), zkStateReader, true);
List<Replica> shuffler = new ArrayList<>();
for(Slice slice : slices) {
Collection<Replica> replicas = slice.getReplicas();
for (Replica replica : replicas) {
shuffler.add(replica);
}
}
Collections.shuffle(shuffler, new Random());
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CommonParams.QT, "/sql");
params.set("stmt", sql);
for(String propertyName : this.connection.getProperties().stringPropertyNames()) {
params.set(propertyName, this.connection.getProperties().getProperty(propertyName));
}
Replica rep = shuffler.get(0);
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
String url = zkProps.getCoreUrl();
return new SolrStream(url, params);
} catch (Exception e) {
throw new IOException(e);
}
}
@Override
public ResultSet executeQuery(String sql) throws SQLException {
return this.executeQueryImpl(sql);
}
@Override
public int executeUpdate(String sql) throws SQLException {
return 0;
}
@Override
public void close() throws SQLException {
if(closed) {
return;
}
this.closed = true;
if(this.currentResultSet != null) {
this.currentResultSet.close();
}
}
@Override
public int getMaxFieldSize() throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public void setMaxFieldSize(int max) throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public int getMaxRows() throws SQLException {
return this.maxRows;
}
@Override
public void setMaxRows(int max) throws SQLException {
this.maxRows = max;
}
@Override
public void setEscapeProcessing(boolean enable) throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public int getQueryTimeout() throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public void setQueryTimeout(int seconds) throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public void cancel() throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public SQLWarning getWarnings() throws SQLException {
checkClosed();
return this.currentWarning;
}
@Override
public void clearWarnings() throws SQLException {
checkClosed();
this.currentWarning = null;
}
@Override
public void setCursorName(String name) throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public boolean execute(String sql) throws SQLException {
if(this.currentResultSet != null) {
this.currentResultSet.close();
this.currentResultSet = null;
}
// TODO Add logic when update statements are added to JDBC.
this.currentSQL = sql;
return true;
}
@Override
public ResultSet getResultSet() throws SQLException {
return this.executeQueryImpl(this.currentSQL);
}
@Override
public int getUpdateCount() throws SQLException {
checkClosed();
// TODO Add logic when update statements are added to JDBC.
return -1;
}
@Override
public boolean getMoreResults() throws SQLException {
checkClosed();
// Currently multiple result sets are not possible yet
this.currentResultSet.close();
return false;
}
@Override
public void setFetchDirection(int direction) throws SQLException {
checkClosed();
if(direction != ResultSet.FETCH_FORWARD) {
throw new SQLException("Direction must be ResultSet.FETCH_FORWARD currently");
}
}
@Override
public int getFetchDirection() throws SQLException {
checkClosed();
return ResultSet.FETCH_FORWARD;
}
@Override
public void setFetchSize(int rows) throws SQLException {
checkClosed();
if(rows < 0) {
throw new SQLException("Rows must be >= 0");
}
}
@Override
public int getFetchSize() throws SQLException {
checkClosed();
return 0;
}
@Override
public int getResultSetConcurrency() throws SQLException {
checkClosed();
return ResultSet.CONCUR_READ_ONLY;
}
@Override
public int getResultSetType() throws SQLException {
checkClosed();
return ResultSet.TYPE_FORWARD_ONLY;
}
@Override
public void addBatch(String sql) throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public void clearBatch() throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public int[] executeBatch() throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public Connection getConnection() throws SQLException {
return this.connection;
}
@Override
public boolean getMoreResults(int current) throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public ResultSet getGeneratedKeys() throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public int executeUpdate(String sql, String[] columnNames) throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public boolean execute(String sql, int[] columnIndexes) throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public boolean execute(String sql, String[] columnNames) throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public int getResultSetHoldability() throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public boolean isClosed() throws SQLException {
return closed;
}
@Override
public void setPoolable(boolean poolable) throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public boolean isPoolable() throws SQLException {
return true;
}
@Override
public void closeOnCompletion() throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public boolean isCloseOnCompletion() throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
throw new UnsupportedOperationException();
}
private boolean containsLimit(String sql) {
String[] tokens = sql.split("\\s+");
String secondToLastToken = tokens[tokens.length-2];
return ("limit").equalsIgnoreCase(secondToLastToken);
}
}