blob: df7ed4740c613b12a148d2cc149131d3772bad2d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.avatica.remote;
import org.apache.calcite.avatica.AvaticaUtils;
import org.apache.calcite.avatica.Meta;
import org.apache.calcite.avatica.Meta.ExecuteBatchResult;
import org.apache.calcite.avatica.MissingResultsException;
import org.apache.calcite.avatica.NoSuchStatementException;
import org.apache.calcite.avatica.metrics.MetricsSystem;
import org.apache.calcite.avatica.metrics.Timer;
import org.apache.calcite.avatica.metrics.Timer.Context;
import org.apache.calcite.avatica.metrics.noop.NoopMetricsSystem;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import static org.apache.calcite.avatica.remote.MetricsHelper.concat;
/**
* Implementation of {@link Service} that talks to a local {@link Meta}.
*/
public class LocalService implements Service {
final Meta meta;
final MetricsSystem metrics;
private final Timer executeTimer;
private final Timer commitTimer;
private final Timer prepareTimer;
private final Timer prepareAndExecuteTimer;
private final Timer connectionSyncTimer;
private RpcMetadataResponse serverLevelRpcMetadata;
public LocalService(Meta meta) {
this(meta, NoopMetricsSystem.getInstance());
}
public LocalService(Meta meta, MetricsSystem metrics) {
this.meta = meta;
this.metrics = Objects.requireNonNull(metrics);
this.executeTimer = this.metrics.getTimer(name("Execute"));
this.commitTimer = this.metrics.getTimer(name("Commit"));
this.prepareTimer = this.metrics.getTimer(name("Prepare"));
this.prepareAndExecuteTimer = this.metrics.getTimer(name("PrepareAndExecute"));
this.connectionSyncTimer = this.metrics.getTimer(name("ConnectionSync"));
}
private static String name(String timer) {
return concat(LocalService.class, timer);
}
@Override public void setRpcMetadata(RpcMetadataResponse serverLevelRpcMetadata) {
this.serverLevelRpcMetadata = Objects.requireNonNull(serverLevelRpcMetadata);
}
private static <E> List<E> list(Iterable<E> iterable) {
if (iterable instanceof List) {
return (List<E>) iterable;
}
final List<E> rowList = new ArrayList<>();
for (E row : iterable) {
rowList.add(row);
}
return rowList;
}
/** Converts a result set (not serializable) into a serializable response. */
public ResultSetResponse toResponse(Meta.MetaResultSet resultSet) {
if (resultSet.updateCount != -1) {
return new ResultSetResponse(resultSet.connectionId,
resultSet.statementId, resultSet.ownStatement, null, null,
resultSet.updateCount, serverLevelRpcMetadata);
}
Meta.Signature signature = resultSet.signature;
// TODO Revise modification of CursorFactory see:
// https://issues.apache.org/jira/browse/CALCITE-4567
Meta.CursorFactory cursorFactory = resultSet.signature.cursorFactory;
Meta.Frame frame = null;
int updateCount = -1;
final List<Object> list;
if (resultSet.firstFrame != null) {
list = list(resultSet.firstFrame.rows);
if (list.isEmpty()) {
cursorFactory = Meta.CursorFactory.LIST;
} else {
switch (cursorFactory.style) {
case ARRAY:
cursorFactory = Meta.CursorFactory.LIST;
break;
case MAP:
case LIST:
break;
case RECORD:
cursorFactory = Meta.CursorFactory.map(cursorFactory.fieldNames);
break;
default:
throw new IllegalStateException("Unknown cursor factory style: "
+ cursorFactory.style);
}
}
final boolean done = resultSet.firstFrame.done;
frame = new Meta.Frame(0, done, list);
updateCount = -1;
if (signature.statementType != null) {
if (signature.statementType.canUpdate()) {
frame = null;
updateCount = ((Number) ((List) list.get(0)).get(0)).intValue();
}
}
} else {
cursorFactory = Meta.CursorFactory.LIST;
}
if (cursorFactory != resultSet.signature.cursorFactory) {
signature = signature.setCursorFactory(cursorFactory);
}
return new ResultSetResponse(resultSet.connectionId, resultSet.statementId,
resultSet.ownStatement, signature, frame, updateCount, serverLevelRpcMetadata);
}
public ResultSetResponse apply(CatalogsRequest request) {
final Meta.ConnectionHandle ch =
new Meta.ConnectionHandle(request.connectionId);
final Meta.MetaResultSet resultSet = meta.getCatalogs(ch);
return toResponse(resultSet);
}
public ResultSetResponse apply(SchemasRequest request) {
final Meta.ConnectionHandle ch =
new Meta.ConnectionHandle(request.connectionId);
final Meta.MetaResultSet resultSet =
meta.getSchemas(ch, request.catalog, Meta.Pat.of(request.schemaPattern));
return toResponse(resultSet);
}
public ResultSetResponse apply(TablesRequest request) {
final Meta.ConnectionHandle ch =
new Meta.ConnectionHandle(request.connectionId);
final Meta.MetaResultSet resultSet =
meta.getTables(ch,
request.catalog,
Meta.Pat.of(request.schemaPattern),
Meta.Pat.of(request.tableNamePattern),
request.typeList);
return toResponse(resultSet);
}
public ResultSetResponse apply(TableTypesRequest request) {
final Meta.ConnectionHandle ch =
new Meta.ConnectionHandle(request.connectionId);
final Meta.MetaResultSet resultSet = meta.getTableTypes(ch);
return toResponse(resultSet);
}
public ResultSetResponse apply(TypeInfoRequest request) {
final Meta.ConnectionHandle ch =
new Meta.ConnectionHandle(request.connectionId);
final Meta.MetaResultSet resultSet = meta.getTypeInfo(ch);
return toResponse(resultSet);
}
public ResultSetResponse apply(ColumnsRequest request) {
final Meta.ConnectionHandle ch =
new Meta.ConnectionHandle(request.connectionId);
final Meta.MetaResultSet resultSet =
meta.getColumns(ch,
request.catalog,
Meta.Pat.of(request.schemaPattern),
Meta.Pat.of(request.tableNamePattern),
Meta.Pat.of(request.columnNamePattern));
return toResponse(resultSet);
}
public PrepareResponse apply(PrepareRequest request) {
try (final Context ignore = prepareTimer.start()) {
final Meta.ConnectionHandle ch =
new Meta.ConnectionHandle(request.connectionId);
final Meta.StatementHandle h =
meta.prepare(ch, request.sql, request.maxRowCount);
return new PrepareResponse(h, serverLevelRpcMetadata);
}
}
public ExecuteResponse apply(PrepareAndExecuteRequest request) {
try (final Context ignore = prepareAndExecuteTimer.start()) {
final Meta.StatementHandle sh =
new Meta.StatementHandle(request.connectionId, request.statementId, null);
try {
final Meta.ExecuteResult executeResult =
meta.prepareAndExecute(sh, request.sql, request.maxRowCount,
request.maxRowsInFirstFrame, new Meta.PrepareCallback() {
@Override public Object getMonitor() {
return LocalService.class;
}
@Override public void clear() {
}
@Override public void assign(Meta.Signature signature,
Meta.Frame firstFrame, long updateCount) {
}
@Override public void execute() {
}
});
final List<ResultSetResponse> results = new ArrayList<>();
for (Meta.MetaResultSet metaResultSet : executeResult.resultSets) {
results.add(toResponse(metaResultSet));
}
return new ExecuteResponse(results, false, serverLevelRpcMetadata);
} catch (NoSuchStatementException e) {
// The Statement doesn't exist anymore, bubble up this information
return new ExecuteResponse(null, true, serverLevelRpcMetadata);
}
}
}
public FetchResponse apply(FetchRequest request) {
final Meta.StatementHandle h = new Meta.StatementHandle(
request.connectionId, request.statementId, null);
try {
final Meta.Frame frame =
meta.fetch(h,
request.offset,
request.fetchMaxRowCount);
return new FetchResponse(frame, false, false, serverLevelRpcMetadata);
} catch (NullPointerException | NoSuchStatementException e) {
// The Statement doesn't exist anymore, bubble up this information
return new FetchResponse(null, true, true, serverLevelRpcMetadata);
} catch (MissingResultsException e) {
return new FetchResponse(null, false, true, serverLevelRpcMetadata);
}
}
public ExecuteResponse apply(ExecuteRequest request) {
try (final Context ignore = executeTimer.start()) {
try {
final Meta.ExecuteResult executeResult = meta.execute(request.statementHandle,
request.parameterValues, AvaticaUtils.toSaturatedInt(request.maxRowCount));
final List<ResultSetResponse> results = new ArrayList<>(executeResult.resultSets.size());
for (Meta.MetaResultSet metaResultSet : executeResult.resultSets) {
results.add(toResponse(metaResultSet));
}
return new ExecuteResponse(results, false, serverLevelRpcMetadata);
} catch (NoSuchStatementException e) {
return new ExecuteResponse(null, true, serverLevelRpcMetadata);
}
}
}
public CreateStatementResponse apply(CreateStatementRequest request) {
final Meta.ConnectionHandle ch =
new Meta.ConnectionHandle(request.connectionId);
final Meta.StatementHandle h = meta.createStatement(ch);
return new CreateStatementResponse(h.connectionId, h.id, serverLevelRpcMetadata);
}
public CloseStatementResponse apply(CloseStatementRequest request) {
final Meta.StatementHandle h = new Meta.StatementHandle(
request.connectionId, request.statementId, null);
meta.closeStatement(h);
return new CloseStatementResponse(serverLevelRpcMetadata);
}
public OpenConnectionResponse apply(OpenConnectionRequest request) {
final Meta.ConnectionHandle ch =
new Meta.ConnectionHandle(request.connectionId);
meta.openConnection(ch, request.info);
return new OpenConnectionResponse(serverLevelRpcMetadata);
}
public CloseConnectionResponse apply(CloseConnectionRequest request) {
final Meta.ConnectionHandle ch =
new Meta.ConnectionHandle(request.connectionId);
meta.closeConnection(ch);
return new CloseConnectionResponse(serverLevelRpcMetadata);
}
public ConnectionSyncResponse apply(ConnectionSyncRequest request) {
try (final Context ignore = connectionSyncTimer.start()) {
final Meta.ConnectionHandle ch =
new Meta.ConnectionHandle(request.connectionId);
final Meta.ConnectionProperties connProps =
meta.connectionSync(ch, request.connProps);
return new ConnectionSyncResponse(connProps, serverLevelRpcMetadata);
}
}
public DatabasePropertyResponse apply(DatabasePropertyRequest request) {
final Meta.ConnectionHandle ch =
new Meta.ConnectionHandle(request.connectionId);
return new DatabasePropertyResponse(meta.getDatabaseProperties(ch), serverLevelRpcMetadata);
}
public SyncResultsResponse apply(SyncResultsRequest request) {
final Meta.StatementHandle h = new Meta.StatementHandle(
request.connectionId, request.statementId, null);
SyncResultsResponse response;
try {
// Set success on the cached statement
response = new SyncResultsResponse(meta.syncResults(h, request.state, request.offset), false,
serverLevelRpcMetadata);
} catch (NoSuchStatementException e) {
// Tried to sync results on a statement which wasn't cached
response = new SyncResultsResponse(false, true, serverLevelRpcMetadata);
}
return response;
}
public CommitResponse apply(CommitRequest request) {
try (final Context ignore = commitTimer.start()) {
meta.commit(new Meta.ConnectionHandle(request.connectionId));
// If commit() errors, let the ErrorResponse be sent back via an uncaught Exception.
return new CommitResponse();
}
}
public RollbackResponse apply(RollbackRequest request) {
meta.rollback(new Meta.ConnectionHandle(request.connectionId));
// If rollback() errors, let the ErrorResponse be sent back via an uncaught Exception.
return new RollbackResponse();
}
public ExecuteBatchResponse apply(PrepareAndExecuteBatchRequest request) {
final Meta.StatementHandle h = new Meta.StatementHandle(request.connectionId,
request.statementId, null);
try {
ExecuteBatchResult result = meta.prepareAndExecuteBatch(h, request.sqlCommands);
return new ExecuteBatchResponse(request.connectionId, request.statementId,
result.updateCounts, false, serverLevelRpcMetadata);
} catch (NoSuchStatementException e) {
return new ExecuteBatchResponse(request.connectionId, request.statementId, null, true,
serverLevelRpcMetadata);
}
}
public ExecuteBatchResponse apply(ExecuteBatchRequest request) {
final Meta.StatementHandle h = new Meta.StatementHandle(request.connectionId,
request.statementId, null);
try {
ExecuteBatchResult result;
if (request.hasProtoUpdateBatches() && meta instanceof ProtobufMeta) {
result = ((ProtobufMeta) meta).executeBatchProtobuf(h, request.getProtoUpdateBatches());
} else {
result = meta.executeBatch(h, request.parameterValues);
}
return new ExecuteBatchResponse(request.connectionId, request.statementId,
result.updateCounts, false, serverLevelRpcMetadata);
} catch (NoSuchStatementException e) {
return new ExecuteBatchResponse(request.connectionId, request.statementId, null, true,
serverLevelRpcMetadata);
}
}
}
// End LocalService.java