blob: be6b7b9fc6f7eef0f729e4aa05afc459531e7a38 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.management.internal.cli.commands;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.shell.core.annotation.CliCommand;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.management.cli.GfshCommand;
import org.apache.geode.management.internal.cli.domain.AsyncEventQueueDetails;
import org.apache.geode.management.internal.cli.functions.CliFunctionResult;
import org.apache.geode.management.internal.cli.functions.ListAsyncEventQueuesFunction;
import org.apache.geode.management.internal.cli.i18n.CliStrings;
import org.apache.geode.management.internal.cli.result.model.ResultModel;
import org.apache.geode.management.internal.cli.result.model.TabularResultModel;
import org.apache.geode.management.internal.security.ResourceOperation;
import org.apache.geode.security.ResourcePermission;
public class ListAsyncEventQueuesCommand extends GfshCommand {
private static final String[] DETAILS_OUTPUT_COLUMNS =
{"Member", "ID", "Batch Size", "Persistent", "Disk Store", "Max Memory", "Listener",
"Created with paused event processing", "Currently Paused"};
private static final String ASYNC_EVENT_QUEUES_TABLE_SECTION = "Async Event Queues";
private static final String MEMBER_ERRORS_TABLE_SECTION = "Member Errors";
@CliCommand(value = CliStrings.LIST_ASYNC_EVENT_QUEUES,
help = CliStrings.LIST_ASYNC_EVENT_QUEUES__HELP)
@ResourceOperation(resource = ResourcePermission.Resource.CLUSTER,
operation = ResourcePermission.Operation.READ)
public ResultModel listAsyncEventQueues() {
Set<DistributedMember> targetMembers = getAllNormalMembers();
if (targetMembers.isEmpty()) {
return ResultModel.createInfo(CliStrings.NO_MEMBERS_FOUND_MESSAGE);
}
// Each (successful) member returns a list of AsyncEventQueueDetails.
List<CliFunctionResult> results = executeAndGetFunctionResult(
new ListAsyncEventQueuesFunction(), new Object[] {}, targetMembers);
ResultModel result = buildAsyncEventQueueInfo(results);
// Report any explicit errors as well.
if (results.stream().anyMatch(r -> !r.isSuccessful())) {
TabularResultModel errors = result.addTable(MEMBER_ERRORS_TABLE_SECTION);
errors.setColumnHeader("Member", "Error");
results.stream().filter(r -> !r.isSuccessful()).forEach(errorResult -> errors
.addRow(errorResult.getMemberIdOrName(), errorResult.getStatusMessage()));
}
return result;
}
/**
* @return An info result containing the table of AsyncEventQueueDetails.
* If no details are found, returns an info result message indicating so.
*/
private ResultModel buildAsyncEventQueueInfo(List<CliFunctionResult> results) {
if (results.stream().filter(CliFunctionResult::isSuccessful)
.noneMatch(r -> ((List<AsyncEventQueueDetails>) r.getResultObject()).size() > 0)) {
return ResultModel.createInfo(CliStrings.LIST_ASYNC_EVENT_QUEUES__NO_QUEUES_FOUND_MESSAGE);
}
ResultModel result = new ResultModel();
TabularResultModel detailsTable = result.addTable(ASYNC_EVENT_QUEUES_TABLE_SECTION);
detailsTable.setColumnHeader(DETAILS_OUTPUT_COLUMNS);
results.stream().filter(CliFunctionResult::isSuccessful).forEach(successfulResult -> {
String memberName = successfulResult.getMemberIdOrName();
((List<AsyncEventQueueDetails>) successfulResult.getResultObject())
.forEach(entry -> detailsTable.addRow(memberName, entry.getId(),
String.valueOf(entry.getBatchSize()), String.valueOf(entry.isPersistent()),
String.valueOf(entry.getDiskStoreName()), String.valueOf(entry.getMaxQueueMemory()),
getListenerEntry(entry), String.valueOf(entry.isCreatedWithPausedEventProcessing()),
String.valueOf(entry.isPausedEventProcessing())));
});
return result;
}
/**
* @return The class of the entry's listener. If the listener is parameterized, these parameters
* are appended in a json format.
*/
private String getListenerEntry(AsyncEventQueueDetails entry) {
return entry.getListener() + propertiesToString(entry.getListenerProperties());
}
/**
* @return A json format of the properties, or the empty string if the properties are empty.
*/
static String propertiesToString(Properties props) {
if (props == null || props.isEmpty()) {
return "";
}
ObjectMapper mapper = new ObjectMapper();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
mapper.writeValue(baos, props);
} catch (IOException e) {
return e.getMessage();
}
return baos.toString();
}
}