| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to you under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.calcite.avatica.remote; |
| |
| import org.apache.calcite.avatica.AvaticaParameter; |
| import org.apache.calcite.avatica.AvaticaSeverity; |
| import org.apache.calcite.avatica.ColumnMetaData; |
| import org.apache.calcite.avatica.ColumnMetaData.ArrayType; |
| import org.apache.calcite.avatica.ColumnMetaData.Rep; |
| import org.apache.calcite.avatica.ColumnMetaData.ScalarType; |
| import org.apache.calcite.avatica.ConnectionPropertiesImpl; |
| import org.apache.calcite.avatica.Meta; |
| import org.apache.calcite.avatica.Meta.Frame; |
| import org.apache.calcite.avatica.Meta.Signature; |
| import org.apache.calcite.avatica.Meta.Style; |
| import org.apache.calcite.avatica.MetaImpl; |
| import org.apache.calcite.avatica.QueryState; |
| import org.apache.calcite.avatica.remote.Service.CatalogsRequest; |
| import org.apache.calcite.avatica.remote.Service.CloseConnectionRequest; |
| import org.apache.calcite.avatica.remote.Service.CloseConnectionResponse; |
| import org.apache.calcite.avatica.remote.Service.CloseStatementRequest; |
| import org.apache.calcite.avatica.remote.Service.CloseStatementResponse; |
| import org.apache.calcite.avatica.remote.Service.ColumnsRequest; |
| import org.apache.calcite.avatica.remote.Service.CommitRequest; |
| import org.apache.calcite.avatica.remote.Service.CommitResponse; |
| import org.apache.calcite.avatica.remote.Service.ConnectionSyncRequest; |
| import org.apache.calcite.avatica.remote.Service.ConnectionSyncResponse; |
| import org.apache.calcite.avatica.remote.Service.CreateStatementRequest; |
| import org.apache.calcite.avatica.remote.Service.CreateStatementResponse; |
| import org.apache.calcite.avatica.remote.Service.DatabasePropertyRequest; |
| import org.apache.calcite.avatica.remote.Service.DatabasePropertyResponse; |
| import org.apache.calcite.avatica.remote.Service.ErrorResponse; |
| import org.apache.calcite.avatica.remote.Service.ExecuteBatchResponse; |
| import org.apache.calcite.avatica.remote.Service.ExecuteRequest; |
| import org.apache.calcite.avatica.remote.Service.ExecuteResponse; |
| import org.apache.calcite.avatica.remote.Service.FetchRequest; |
| import org.apache.calcite.avatica.remote.Service.FetchResponse; |
| import org.apache.calcite.avatica.remote.Service.OpenConnectionRequest; |
| import org.apache.calcite.avatica.remote.Service.OpenConnectionResponse; |
| import org.apache.calcite.avatica.remote.Service.PrepareAndExecuteBatchRequest; |
| import org.apache.calcite.avatica.remote.Service.PrepareAndExecuteRequest; |
| import org.apache.calcite.avatica.remote.Service.PrepareRequest; |
| import org.apache.calcite.avatica.remote.Service.PrepareResponse; |
| import org.apache.calcite.avatica.remote.Service.Request; |
| import org.apache.calcite.avatica.remote.Service.Response; |
| import org.apache.calcite.avatica.remote.Service.ResultSetResponse; |
| import org.apache.calcite.avatica.remote.Service.RollbackRequest; |
| import org.apache.calcite.avatica.remote.Service.RollbackResponse; |
| import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse; |
| import org.apache.calcite.avatica.remote.Service.SchemasRequest; |
| import org.apache.calcite.avatica.remote.Service.SyncResultsRequest; |
| import org.apache.calcite.avatica.remote.Service.SyncResultsResponse; |
| import org.apache.calcite.avatica.remote.Service.TableTypesRequest; |
| import org.apache.calcite.avatica.remote.Service.TablesRequest; |
| import org.apache.calcite.avatica.remote.Service.TypeInfoRequest; |
| |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.Parameterized; |
| import org.junit.runners.Parameterized.Parameters; |
| |
| import java.io.IOException; |
| import java.io.PrintWriter; |
| import java.io.StringWriter; |
| import java.sql.DatabaseMetaData; |
| import java.sql.Types; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| |
| import static org.junit.Assert.assertEquals; |
| |
| /** |
| * Tests serialization of requests and response objects. |
| * |
| * @param <T> The object class being tested |
| */ |
| @RunWith(Parameterized.class) |
| public class ProtobufTranslationImplTest<T> { |
| |
| /** |
| * Simple function definition that acts as an identity. |
| * |
| * @param <A> Argument type |
| */ |
| private interface IdentityFunction<A> { |
| A apply(A obj) throws IOException; |
| } |
| |
| /** |
| * Identity function that accepts a request, serializes it to protobuf, and converts it back. |
| */ |
| private static class RequestFunc implements IdentityFunction<Request> { |
| private final ProtobufTranslation translation; |
| |
| private RequestFunc(ProtobufTranslation translation) { |
| this.translation = translation; |
| } |
| |
| public Request apply(Request request) throws IOException { |
| // Serialize and then re-parse the request |
| return translation.parseRequest(translation.serializeRequest(request)); |
| } |
| } |
| |
| /** |
| * Identity function that accepts a response, serializes it to protobuf, and converts it back. |
| */ |
| private static class ResponseFunc implements IdentityFunction<Response> { |
| private final ProtobufTranslation translation; |
| |
| private ResponseFunc(ProtobufTranslation translation) { |
| this.translation = translation; |
| } |
| |
| public Response apply(Response response) throws IOException { |
| // Serialize and then re-pare the response |
| return translation.parseResponse(translation.serializeResponse(response)); |
| } |
| } |
| |
| @Parameters |
| public static List<Object[]> parameters() { |
| List<Object[]> params = new ArrayList<>(); |
| |
| // The impl we're testing |
| ProtobufTranslationImpl translation = new ProtobufTranslationImpl(); |
| |
| // Identity transformation for Requests |
| RequestFunc requestFunc = new RequestFunc(translation); |
| // Identity transformation for Responses |
| ResponseFunc responseFunc = new ResponseFunc(translation); |
| |
| List<Request> requests = getRequests(); |
| List<Request> requestsWithNulls = getRequestsWithNulls(); |
| List<Response> responses = getResponses(); |
| |
| // Requests |
| for (Request request : requests) { |
| params.add(new Object[] {request, requestFunc}); |
| } |
| |
| // Requests with nulls in parameters |
| for (Request request : requestsWithNulls) { |
| params.add(new Object[] {request, requestFunc}); |
| } |
| |
| // Responses |
| for (Response response : responses) { |
| params.add(new Object[] {response, responseFunc}); |
| } |
| |
| return params; |
| } |
| |
| /** |
| * Generates a collection of Requests whose serialization will be tested. |
| */ |
| private static List<Request> getRequests() { |
| LinkedList<Request> requests = new LinkedList<>(); |
| |
| requests.add(new CatalogsRequest()); |
| requests.add(new DatabasePropertyRequest()); |
| requests.add(new SchemasRequest("connectionId", "catalog", "schemaPattern")); |
| requests.add( |
| new TablesRequest("connectionId", "catalog", "schemaPattern", "tableNamePattern", |
| Arrays.asList("STRING", "BOOLEAN", "INT"))); |
| requests.add(new TableTypesRequest()); |
| requests.add( |
| new ColumnsRequest("connectionId", "catalog", "schemaPattern", "tableNamePattern", |
| "columnNamePattern")); |
| requests.add(new TypeInfoRequest()); |
| requests.add( |
| new PrepareAndExecuteRequest("connectionId", Integer.MAX_VALUE, "sql", |
| Long.MAX_VALUE)); |
| requests.add(new PrepareRequest("connectionId", "sql", Long.MAX_VALUE)); |
| |
| List<TypedValue> paramValues = |
| Arrays.asList(TypedValue.create(Rep.BOOLEAN.name(), Boolean.TRUE), |
| TypedValue.create(Rep.STRING.name(), "string")); |
| FetchRequest fetchRequest = new FetchRequest("connectionId", Integer.MAX_VALUE, |
| Long.MAX_VALUE, Integer.MAX_VALUE); |
| requests.add(fetchRequest); |
| |
| requests.add(new CreateStatementRequest("connectionId")); |
| requests.add(new CloseStatementRequest("connectionId", Integer.MAX_VALUE)); |
| Map<String, String> info = new HashMap<>(); |
| info.put("param1", "value1"); |
| info.put("param2", "value2"); |
| requests.add(new OpenConnectionRequest("connectionId", info)); |
| requests.add(new CloseConnectionRequest("connectionId")); |
| requests.add( |
| new ConnectionSyncRequest("connectionId", |
| new ConnectionPropertiesImpl(Boolean.FALSE, Boolean.FALSE, |
| Integer.MAX_VALUE, "catalog", "schema"))); |
| |
| requests.add(new SyncResultsRequest("connectionId", 12345, getSqlQueryState(), 150)); |
| requests.add(new SyncResultsRequest("connectionId2", 54321, getMetadataQueryState1(), 0)); |
| requests.add(new SyncResultsRequest("connectionId3", 5, getMetadataQueryState2(), 10)); |
| |
| requests.add(new CommitRequest("connectionId")); |
| requests.add(new RollbackRequest("connectionId")); |
| |
| // ExecuteBatchRequest omitted because of the special protobuf conversion it does |
| |
| List<String> commands = Arrays.asList("command1", "command2", "command3"); |
| requests.add(new PrepareAndExecuteBatchRequest("connectionId", 12345, commands)); |
| |
| |
| List<ColumnMetaData> columns = Collections.emptyList(); |
| List<AvaticaParameter> params = Collections.emptyList(); |
| Meta.CursorFactory cursorFactory = Meta.CursorFactory.create(Style.LIST, Object.class, |
| Collections.<String>emptyList()); |
| Signature signature = Signature.create(columns, "sql", params, cursorFactory, |
| Meta.StatementType.SELECT); |
| Meta.StatementHandle handle = new Meta.StatementHandle("1234", 1, signature); |
| requests.add(new ExecuteRequest(handle, Arrays.<TypedValue>asList((TypedValue) null), 10)); |
| requests.add(new ExecuteRequest(handle, Arrays.asList(TypedValue.EXPLICIT_NULL), 10)); |
| |
| return requests; |
| } |
| |
| private static QueryState getSqlQueryState() { |
| return new QueryState("SELECT * from TABLE"); |
| } |
| |
| private static QueryState getMetadataQueryState1() { |
| return new QueryState(MetaDataOperation.GET_COLUMNS, new Object[] { |
| "", |
| null, |
| "%", |
| "%" |
| }); |
| } |
| |
| private static QueryState getMetadataQueryState2() { |
| return new QueryState(MetaDataOperation.GET_CATALOGS, new Object[0]); |
| } |
| |
| private static List<Request> getRequestsWithNulls() { |
| LinkedList<Request> requests = new LinkedList<>(); |
| |
| // We're pretty fast and loose on what can be null. |
| requests.add(new SchemasRequest(null, null, null)); |
| // Repeated fields default to an empty list |
| requests.add(new TablesRequest(null, null, null, null, Collections.<String>emptyList())); |
| requests.add(new ColumnsRequest(null, null, null, null, null)); |
| requests.add(new PrepareAndExecuteRequest(null, 0, null, 0)); |
| requests.add(new PrepareRequest(null, null, 0)); |
| requests.add(new CreateStatementRequest(null)); |
| requests.add(new CloseStatementRequest(null, 0)); |
| requests.add(new OpenConnectionRequest(null, null)); |
| requests.add(new CloseConnectionRequest(null)); |
| requests.add(new ConnectionSyncRequest(null, null)); |
| |
| return requests; |
| } |
| |
| private static ColumnMetaData getArrayColumnMetaData(ScalarType componentType, int index, |
| String name) { |
| ArrayType arrayType = ColumnMetaData.array(componentType, "Array", Rep.ARRAY); |
| return new ColumnMetaData( |
| index, false, true, false, false, DatabaseMetaData.columnNullable, |
| true, -1, name, name, null, |
| 0, 0, null, null, arrayType, true, false, false, |
| "ARRAY"); |
| } |
| |
| /** |
| * Generates a collection of Responses whose serialization will be tested. |
| */ |
| private static List<Response> getResponses() { |
| final RpcMetadataResponse rpcMetadata = new RpcMetadataResponse("localhost:8765"); |
| LinkedList<Response> responses = new LinkedList<>(); |
| |
| // Nested classes (Signature, ColumnMetaData, CursorFactory, etc) are implicitly getting tested) |
| |
| // Stub out the metadata for a row |
| ScalarType arrayComponentType = ColumnMetaData.scalar(Types.INTEGER, "integer", Rep.INTEGER); |
| ColumnMetaData arrayColumnMetaData = getArrayColumnMetaData(arrayComponentType, 2, "counts"); |
| List<ColumnMetaData> columns = |
| Arrays.asList(MetaImpl.columnMetaData("str", 0, String.class, true), |
| MetaImpl.columnMetaData("count", 1, Integer.class, true), |
| arrayColumnMetaData); |
| List<AvaticaParameter> params = |
| Arrays.asList( |
| new AvaticaParameter(false, 10, 0, Types.VARCHAR, "VARCHAR", |
| String.class.getName(), "str")); |
| Meta.CursorFactory cursorFactory = Meta.CursorFactory.create(Style.LIST, Object.class, |
| Arrays.asList("str", "count", "counts")); |
| // The row values |
| List<Object> rows = new ArrayList<>(); |
| rows.add(new Object[] {"str_value1", 50, Arrays.asList(1, 2, 3)}); |
| rows.add(new Object[] {"str_value2", 100, Arrays.asList(1)}); |
| |
| // Create the signature and frame using the metadata and values |
| Signature signature = Signature.create(columns, "sql", params, cursorFactory, |
| Meta.StatementType.SELECT); |
| Frame frame = Frame.create(Integer.MAX_VALUE, true, rows); |
| |
| // And then create a ResultSetResponse |
| ResultSetResponse results1 = new ResultSetResponse("connectionId", Integer.MAX_VALUE, true, |
| signature, frame, Long.MAX_VALUE, rpcMetadata); |
| responses.add(results1); |
| |
| responses.add(new CloseStatementResponse(rpcMetadata)); |
| |
| ConnectionPropertiesImpl connProps = new ConnectionPropertiesImpl(false, true, |
| Integer.MAX_VALUE, "catalog", "schema"); |
| responses.add(new ConnectionSyncResponse(connProps, rpcMetadata)); |
| |
| responses.add(new OpenConnectionResponse(rpcMetadata)); |
| responses.add(new CloseConnectionResponse(rpcMetadata)); |
| |
| responses.add(new CreateStatementResponse("connectionId", Integer.MAX_VALUE, rpcMetadata)); |
| |
| Map<Meta.DatabaseProperty, Object> propertyMap = new HashMap<>(); |
| for (Meta.DatabaseProperty prop : Meta.DatabaseProperty.values()) { |
| propertyMap.put(prop, prop.defaultValue); |
| } |
| responses.add(new DatabasePropertyResponse(propertyMap, rpcMetadata)); |
| |
| responses.add( |
| new ExecuteResponse(Arrays.asList(results1, results1, results1), false, rpcMetadata)); |
| responses.add(new FetchResponse(frame, false, false, rpcMetadata)); |
| responses.add(new FetchResponse(frame, true, true, rpcMetadata)); |
| responses.add(new FetchResponse(frame, false, true, rpcMetadata)); |
| responses.add( |
| new PrepareResponse( |
| new Meta.StatementHandle("connectionId", Integer.MAX_VALUE, signature), |
| rpcMetadata)); |
| |
| StringWriter sw = new StringWriter(); |
| new Exception().printStackTrace(new PrintWriter(sw)); |
| responses.add( |
| new ErrorResponse(Collections.singletonList(sw.toString()), "Test Error Message", |
| ErrorResponse.UNKNOWN_ERROR_CODE, ErrorResponse.UNKNOWN_SQL_STATE, |
| AvaticaSeverity.WARNING, rpcMetadata)); |
| |
| // No more results, statement not missing |
| responses.add(new SyncResultsResponse(false, false, rpcMetadata)); |
| // Missing statement, no results |
| responses.add(new SyncResultsResponse(false, true, rpcMetadata)); |
| // More results, no missing statement |
| responses.add(new SyncResultsResponse(true, false, rpcMetadata)); |
| |
| // Some tests to make sure ErrorResponse doesn't fail. |
| responses.add(new ErrorResponse((List<String>) null, null, 0, null, null, null)); |
| responses.add( |
| new ErrorResponse(Arrays.asList("stacktrace1", "stacktrace2"), null, 0, null, null, null)); |
| |
| responses.add(new CommitResponse()); |
| responses.add(new RollbackResponse()); |
| |
| long[] updateCounts = new long[]{1, 0, 1, 1}; |
| responses.add( |
| new ExecuteBatchResponse("connectionId", 12345, updateCounts, false, rpcMetadata)); |
| |
| return responses; |
| } |
| |
| private final T object; |
| private final IdentityFunction<T> function; |
| |
| public ProtobufTranslationImplTest(T object, IdentityFunction<T> func) { |
| this.object = object; |
| this.function = func; |
| } |
| |
| @Test |
| public void testSerialization() throws Exception { |
| // Function acts as opposite sides of the transport. |
| // An object (a request or response) starts on one side |
| // of the transport, serialized, "sent" over the transport |
| // and then reconstituted. The object on either side should |
| // be equivalent. |
| assertEquals(object, this.function.apply(object)); |
| } |
| } |
| |
| // End ProtobufTranslationImplTest.java |