blob: bb1699f63eaae9b7b35c05e1a0af920edc9cb318 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.management.internal.beans;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.Logger;
import org.apache.geode.SystemFailure;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.cache.query.QueryInvalidException;
import org.apache.geode.cache.query.internal.CompiledValue;
import org.apache.geode.cache.query.internal.QCompiler;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.management.DistributedRegionMXBean;
import org.apache.geode.management.internal.ManagementConstants;
import org.apache.geode.management.internal.SystemManagementService;
import org.apache.geode.management.internal.cli.CliUtil;
/**
* this is used by DistributedSystemBridge.queryData() call. It calls QueryDataFunction on each
* member in the input
*/
public class DataQueryEngine {
private static final Logger logger = LogService.getLogger();
// these numbers represents function argument index
private static final int DISPLAY_MEMBERWISE = 0;
private static final int QUERY = 1;
private static final int REGION = 2;
private static final int LIMIT = 3;
private static final int QUERY_RESULTSET_LIMIT = 4;
private static final int QUERY_COLLECTIONS_DEPTH = 5;
private SystemManagementService service;
private InternalCache cache;
public DataQueryEngine(SystemManagementService service, InternalCache cache) {
this.service = service;
this.cache = cache;
}
public String queryForJsonResult(final String query, final int limit,
final int queryResultSetLimit, final int queryCollectionsDepth)
throws Exception {
return (String) queryData(query, null, limit, false, queryResultSetLimit,
queryCollectionsDepth);
}
public String queryForJsonResult(final String query, String members, final int limit,
final int queryResultSetLimit, final int queryCollectionsDepth)
throws Exception {
return (String) queryData(query, members, limit, false, queryResultSetLimit,
queryCollectionsDepth);
}
public byte[] queryForCompressedResult(final String query, final int limit,
final int queryResultSetLimit, final int queryCollectionsDepth)
throws Exception {
return (byte[]) queryData(query, null, limit, true, queryResultSetLimit, queryCollectionsDepth);
}
public Object queryData(final String query, final String members, final int limit,
final boolean zipResult, final int queryResultSetLimit, final int queryCollectionsDepth)
throws Exception {
if (query == null || query.isEmpty()) {
return new JsonisedErrorMessage("Query is either empty or Null")
.toString();
}
Set<DistributedMember> inputMembers = null;
if (StringUtils.isNotBlank(members)) {
inputMembers = CliUtil.findMembers(null, members.split(","), cache);
if (inputMembers.size() == 0) {
return new JsonisedErrorMessage(
String.format("Query is invalid due to invalid member : %s", members)).toString();
}
}
try {
Set<String> regionsInQuery = compileQuery(query);
// Validate region existence
if (regionsInQuery.size() > 0) {
for (String regionPath : regionsInQuery) {
DistributedRegionMXBean regionMBean = service.getDistributedRegionMXBean(regionPath);
if (regionMBean == null) {
return new JsonisedErrorMessage(
String.format("Cannot find regions %s in any of the members", regionPath))
.toString();
} else {
Set<DistributedMember> associatedMembers =
CliUtil.getRegionAssociatedMembers(regionPath, cache, true);
if (inputMembers != null && inputMembers.size() > 0) {
if (!associatedMembers.containsAll(inputMembers)) {
return new JsonisedErrorMessage(
String.format("Cannot find regions %s in specified members", regionPath))
.toString();
}
}
}
}
} else {
return new JsonisedErrorMessage(String.format("Query is invalid due to error : %s",
"Region mentioned in query probably missing /")).toString();
}
// Validate
if (regionsInQuery.size() > 1 && inputMembers == null) {
for (String regionPath : regionsInQuery) {
DistributedRegionMXBean regionMBean = service.getDistributedRegionMXBean(regionPath);
if (regionMBean.getRegionType().equals(DataPolicy.PARTITION.toString())
|| regionMBean.getRegionType().equals(DataPolicy.PERSISTENT_PARTITION.toString())) {
return new JsonisedErrorMessage(
"Join operation can only be executed on targeted members, please give member input")
.toString();
}
}
}
String randomRegion = regionsInQuery.iterator().next();
// get the first available member
Set<DistributedMember> associatedMembers =
CliUtil.getQueryRegionsAssociatedMembers(regionsInQuery, cache, false);
if (associatedMembers != null && associatedMembers.size() > 0) {
Object[] functionArgs = new Object[6];
if (inputMembers != null && inputMembers.size() > 0) {// on input
// members
functionArgs[DISPLAY_MEMBERWISE] = true;
functionArgs[QUERY] = query;
functionArgs[REGION] = randomRegion;
functionArgs[LIMIT] = limit;
functionArgs[QUERY_RESULTSET_LIMIT] = queryResultSetLimit;
functionArgs[QUERY_COLLECTIONS_DEPTH] = queryCollectionsDepth;
return callFunction(functionArgs, inputMembers, zipResult);
} else { // Query on any random member
functionArgs[DISPLAY_MEMBERWISE] = false;
functionArgs[QUERY] = query;
functionArgs[REGION] = randomRegion;
functionArgs[LIMIT] = limit;
functionArgs[QUERY_RESULTSET_LIMIT] = queryResultSetLimit;
functionArgs[QUERY_COLLECTIONS_DEPTH] = queryCollectionsDepth;
return callFunction(functionArgs, associatedMembers, zipResult);
}
} else {
return new JsonisedErrorMessage(String
.format("Cannot find regions %s in any of the members", regionsInQuery.toString()))
.toString();
}
} catch (QueryInvalidException qe) {
return new JsonisedErrorMessage(
String.format("Query is invalid due to error : %s", qe.getMessage()))
.toString();
}
}
private static Object callFunction(final Object functionArgs,
final Set<DistributedMember> members, final boolean zipResult) throws Exception {
try {
if (members.size() == 1) {
DistributedMember member = members.iterator().next();
ResultCollector collector = FunctionService.onMember(member).setArguments(functionArgs)
.execute(ManagementConstants.QUERY_DATA_FUNCTION);
List list = (List) collector.getResult();
Object object = null;
if (list.size() > 0) {
object = list.get(0);
}
if (object instanceof Throwable) {
throw (Throwable) object;
}
byte[] result = (byte[]) object;
if (zipResult) { // The result is already compressed
return result;
} else {
Object[] functionArgsList = (Object[]) functionArgs;
boolean showMember = (Boolean) functionArgsList[DISPLAY_MEMBERWISE];
if (showMember) {// Added to show a single member similar to multiple
// member.
// Note , if no member is selected this is the code path executed. A
// random associated member is chosen.
List<String> decompressedList = new ArrayList<>();
decompressedList.add(BeanUtilFuncs.decompress(result));
return wrapResult(decompressedList.toString());
}
return BeanUtilFuncs.decompress(result);
}
} else { // More than 1 Member
ResultCollector coll = FunctionService.onMembers(members).setArguments(functionArgs)
.execute(ManagementConstants.QUERY_DATA_FUNCTION);
List list = (List) coll.getResult();
Object object = list.get(0);
if (object instanceof Throwable) {
throw (Throwable) object;
}
Iterator<byte[]> it = list.iterator();
List<String> decompressedList = new ArrayList<>();
while (it.hasNext()) {
String decompressedStr;
decompressedStr = BeanUtilFuncs.decompress(it.next());
decompressedList.add(decompressedStr);
}
if (zipResult) {
return BeanUtilFuncs.compress(wrapResult(decompressedList.toString()));
} else {
return wrapResult(decompressedList.toString());
}
}
} catch (FunctionException fe) {
throw new Exception(
String.format("Query could not be executed due to : %s", fe.getMessage()));
} catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
} catch (Throwable e) {
SystemFailure.checkFailure();
throw new Exception(String.format("Query could not be executed due to : %s", e.getMessage()));
}
}
private static String wrapResult(final String str) {
StringWriter w = new StringWriter();
synchronized (w.getBuffer()) {
w.write("{\"result\":");
w.write(str);
w.write("}");
return w.toString();
}
}
private Set<String> compileQuery(final String query)
throws QueryInvalidException {
QCompiler compiler = new QCompiler();
Set<String> regionsInQuery;
try {
CompiledValue compiledQuery = compiler.compileQuery(query);
Set<String> regions = new HashSet<>();
compiledQuery.getRegionsInQuery(regions, null);
regionsInQuery = Collections.unmodifiableSet(regions);
return regionsInQuery;
} catch (QueryInvalidException qe) {
logger.error("{} Failed, Error {}", query, qe.getMessage(), qe);
throw qe;
}
}
static class JsonisedErrorMessage {
private final String message;
public JsonisedErrorMessage(final String errorMessage) {
message = errorMessage;
}
@Override
public String toString() {
return String.format("{\"message\":\"%s\"}", message);
}
}
}