blob: 9748d7c6196ab126934ba7d2b82c617d19859474 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.protocol.protobuf.v1.operations;
import java.util.Arrays;
import java.util.List;
import org.apache.logging.log4j.Logger;
import org.apache.geode.cache.query.QueryException;
import org.apache.geode.cache.query.SelectResults;
import org.apache.geode.cache.query.Struct;
import org.apache.geode.cache.query.types.StructType;
import org.apache.geode.internal.exception.InvalidExecutionContextException;
import org.apache.geode.internal.protocol.operations.ProtobufOperationHandler;
import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes.EncodedValue;
import org.apache.geode.internal.protocol.protobuf.v1.Failure;
import org.apache.geode.internal.protocol.protobuf.v1.MessageExecutionContext;
import org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI.OQLQueryRequest;
import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI.OQLQueryResponse;
import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI.OQLQueryResponse.Builder;
import org.apache.geode.internal.protocol.protobuf.v1.Result;
import org.apache.geode.internal.protocol.protobuf.v1.Success;
import org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.DecodingException;
import org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.EncodingException;
import org.apache.geode.internal.protocol.protobuf.v1.state.exception.ConnectionStateException;
import org.apache.geode.logging.internal.log4j.api.LogService;
public class OqlQueryRequestOperationHandler
implements ProtobufOperationHandler<OQLQueryRequest, OQLQueryResponse> {
Logger logger = LogService.getLogger();
@Override
public Result<OQLQueryResponse> process(final ProtobufSerializationService serializationService,
final OQLQueryRequest request, final MessageExecutionContext messageExecutionContext)
throws InvalidExecutionContextException, ConnectionStateException, EncodingException,
DecodingException {
String queryString = request.getQuery();
List<EncodedValue> encodedParameters = request.getBindParameterList();
Object[] bindParameters = decodeBindParameters(serializationService, encodedParameters);
try {
Object results = messageExecutionContext.getSecureCache().query(queryString, bindParameters);
return Success.of(encodeResults(serializationService, results));
} catch (QueryException e) {
logger.info("Query failed: " + queryString, e);
return Failure.of(e);
}
}
private Object[] decodeBindParameters(final ProtobufSerializationService serializationService,
final List<EncodedValue> encodedParameters) {
Object[] bindParameters = new Object[encodedParameters.size()];
for (int i = 0; i < encodedParameters.size(); i++) {
bindParameters[i] = serializationService.decode(encodedParameters.get(i));
}
return bindParameters;
}
private OQLQueryResponse encodeResults(final ProtobufSerializationService serializationService,
final Object value) throws EncodingException {
final Builder builder = OQLQueryResponse.newBuilder();
// The result is a single value
if (!(value instanceof SelectResults)) {
builder.setSingleResult(serializationService.encode(value));
return builder.build();
}
SelectResults<?> selectResults = (SelectResults<?>) value;
// The result is a list of objects
if (!selectResults.getCollectionType().getElementType().isStructType()) {
BasicTypes.EncodedValueList.Builder listResult = BasicTypes.EncodedValueList.newBuilder();
selectResults.stream().map(serializationService::encode).forEach(listResult::addElement);
builder.setListResult(listResult);
return builder.build();
}
// The result is a list of structs
SelectResults<Struct> structResults = (SelectResults<Struct>) selectResults;
StructType elementType = (StructType) structResults.getCollectionType().getElementType();
BasicTypes.Table.Builder tableResult = BasicTypes.Table.newBuilder();
tableResult.addAllFieldName(Arrays.asList(elementType.getFieldNames()));
for (Struct row : structResults) {
tableResult.addRow(encodeStruct(serializationService, row));
}
builder.setTableResult(tableResult);
return builder.build();
}
private BasicTypes.EncodedValueList.Builder encodeStruct(
final ProtobufSerializationService serializationService, final Struct row)
throws EncodingException {
BasicTypes.EncodedValueList.Builder structBuilder = BasicTypes.EncodedValueList.newBuilder();
for (Object element : row.getFieldValues()) {
structBuilder.addElement(serializationService.encode(element));
}
return structBuilder;
}
}