blob: a19ea673e094ce04f473576bcb4887a66fa4a8d8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.experimental.driver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import com.google.protobuf.ProtocolStringList;
import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes.EncodedValue;
import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes.Table;
import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol.Message;
import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol.Message.MessageTypeCase;
import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI.OQLQueryRequest;
import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI.OQLQueryResponse;
class ProtobufQueryService implements QueryService {
private final ProtobufChannel channel;
private final ValueEncoder valueEncoder;
public ProtobufQueryService(ProtobufChannel channel, ValueEncoder valueEncoder) {
this.channel = channel;
this.valueEncoder = valueEncoder;
}
@Override
public <T> Query<T> newQuery(final String queryString) {
return new ProtobufQuery<>(queryString);
}
class ProtobufQuery<T> implements Query<T> {
private final String queryString;
public ProtobufQuery(final String queryString) {
this.queryString = queryString;
}
@Override
public List<T> execute(final Object... bindParameters) throws IOException {
List<EncodedValue> encodedParameters = Arrays.stream(bindParameters)
.map(valueEncoder::encodeValue).collect(Collectors.toList());
Message request = Message.newBuilder().setOqlQueryRequest(
OQLQueryRequest.newBuilder().addAllBindParameter(encodedParameters).setQuery(queryString))
.build();
final OQLQueryResponse response =
channel.sendRequest(request, MessageTypeCase.OQLQUERYRESPONSE).getOqlQueryResponse();
switch (response.getResultCase()) {
case SINGLERESULT:
return parseSingleResult(response);
case LISTRESULT:
return parseListResult(response);
case TABLERESULT:
@SuppressWarnings("unchecked")
final List<T> tableResult = (List<T>) parseTableResult(response);
return tableResult;
default:
throw new RuntimeException("Unexpected response: " + response);
}
}
private List<Map<String, Object>> parseTableResult(final OQLQueryResponse response) {
final Table table = response.getTableResult();
final ProtocolStringList fieldNames = table.getFieldNameList();
List<Map<String, Object>> results = new ArrayList<>();
for (BasicTypes.EncodedValueList row : table.getRowList()) {
final List<Object> decodedRow = row.getElementList().stream().map(valueEncoder::decodeValue)
.collect(Collectors.toList());
Map<String, Object> rowMap = new LinkedHashMap<>(decodedRow.size());
for (int i = 0; i < decodedRow.size(); i++) {
rowMap.put(fieldNames.get(i), decodedRow.get(i));
}
results.add(rowMap);
}
return results;
}
private List<T> parseListResult(final OQLQueryResponse response) {
return response.getListResult().getElementList().stream()
.map(valueEncoder::<T>decodeValue).collect(Collectors.toList());
}
private List<T> parseSingleResult(final OQLQueryResponse response) {
return Collections.singletonList(valueEncoder.decodeValue(response.getSingleResult()));
}
}
}