blob: 877c436da8eaa3e091c4262214707d3b4fefb738 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.util;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.execute.CursorFetchPlan;
import org.apache.phoenix.iterate.CursorResultIterator;
import org.apache.phoenix.parse.CloseStatement;
import org.apache.phoenix.parse.DeclareCursorStatement;
import org.apache.phoenix.parse.OpenStatement;
import org.apache.phoenix.schema.tuple.Tuple;
public final class CursorUtil {
private static class CursorWrapper {
private final String cursorName;
private final String selectSQL;
private boolean isOpen = false;
QueryPlan queryPlan;
ImmutableBytesWritable row;
ImmutableBytesWritable previousRow;
private Scan scan;
private boolean moreValues=true;
private boolean isReversed;
private boolean islastCallNext;
private CursorFetchPlan fetchPlan;
private int offset = -1;
private boolean isAggregate;
private CursorWrapper(String cursorName, String selectSQL, QueryPlan queryPlan){
this.cursorName = cursorName;
this.selectSQL = selectSQL;
this.queryPlan = queryPlan;
this.islastCallNext = true;
this.fetchPlan = new CursorFetchPlan(queryPlan,cursorName);
isAggregate = fetchPlan.isAggregate();
}
private synchronized void openCursor(Connection conn) throws SQLException {
if(isOpen){
return;
}
this.scan = this.queryPlan.getContext().getScan();
isReversed=OrderBy.REV_ROW_KEY_ORDER_BY.equals(this.queryPlan.getOrderBy());
isOpen = true;
}
private void closeCursor() throws SQLException {
isOpen = false;
((CursorResultIterator) fetchPlan.iterator()).closeCursor();
//TODO: Determine if the cursor should be removed from the HashMap at this point.
//Semantically it makes sense that something which is 'Closed' one should be able to 'Open' again.
mapCursorIDQuery.remove(this.cursorName);
}
private QueryPlan getFetchPlan(boolean isNext, int fetchSize) throws SQLException {
if (!isOpen)
throw new SQLException("Fetch call on closed cursor '" + this.cursorName + "'!");
((CursorResultIterator)fetchPlan.iterator()).setFetchSize(fetchSize);
if (!isAggregate) {
if (row!=null){
scan.setStartRow(row.get());
}
}
return this.fetchPlan;
}
public void updateLastScanRow(Tuple rowValues,Tuple nextRowValues) {
this.moreValues = !isReversed ? nextRowValues != null : rowValues != null;
if(!moreValues()){
return;
}
if (row == null) {
row = new ImmutableBytesWritable();
}
if (previousRow == null) {
previousRow = new ImmutableBytesWritable();
}
if (nextRowValues != null) {
nextRowValues.getKey(row);
}
if (rowValues != null) {
rowValues.getKey(previousRow);
}
offset++;
}
public boolean moreValues() {
return moreValues;
}
public String getFetchSQL() throws SQLException {
if (!isOpen)
throw new SQLException("Fetch call on closed cursor '" + this.cursorName + "'!");
return selectSQL;
}
}
private static Map<String, CursorWrapper> mapCursorIDQuery = new HashMap<String,CursorWrapper>();
/**
* Private constructor
*/
private CursorUtil() {
}
/**
*
* @param stmt DeclareCursorStatement instance intending to declare a new cursor.
* @return Returns true if the new cursor was successfully declared. False if a cursor with the same
* identifier already exists.
*/
public static boolean declareCursor(DeclareCursorStatement stmt, QueryPlan queryPlan) throws SQLException {
if(mapCursorIDQuery.containsKey(stmt.getCursorName())){
throw new SQLException("Can't declare cursor " + stmt.getCursorName() + ", cursor identifier already in use.");
} else {
mapCursorIDQuery.put(stmt.getCursorName(), new CursorWrapper(stmt.getCursorName(), stmt.getQuerySQL(), queryPlan));
return true;
}
}
public static boolean openCursor(OpenStatement stmt, Connection conn) throws SQLException {
if(mapCursorIDQuery.containsKey(stmt.getCursorName())){
mapCursorIDQuery.get(stmt.getCursorName()).openCursor(conn);
return true;
} else{
throw new SQLException("Cursor " + stmt.getCursorName() + " not declared.");
}
}
public static void closeCursor(CloseStatement stmt) throws SQLException {
if(mapCursorIDQuery.containsKey(stmt.getCursorName())){
mapCursorIDQuery.get(stmt.getCursorName()).closeCursor();
}
}
public static QueryPlan getFetchPlan(String cursorName, boolean isNext, int fetchSize) throws SQLException {
if(mapCursorIDQuery.containsKey(cursorName)){
return mapCursorIDQuery.get(cursorName).getFetchPlan(isNext, fetchSize);
} else {
throw new SQLException("Cursor " + cursorName + " not declared.");
}
}
public static String getFetchSQL(String cursorName) throws SQLException {
if (mapCursorIDQuery.containsKey(cursorName)) {
return mapCursorIDQuery.get(cursorName).getFetchSQL();
} else {
throw new SQLException("Cursor " + cursorName + " not declared.");
}
}
public static void updateCursor(String cursorName, Tuple rowValues, Tuple nextRowValues) throws SQLException {
mapCursorIDQuery.get(cursorName).updateLastScanRow(rowValues,nextRowValues);
}
public static boolean cursorDeclared(String cursorName){
return mapCursorIDQuery.containsKey(cursorName);
}
public static boolean moreValues(String cursorName) {
return mapCursorIDQuery.get(cursorName).moreValues();
}
}