blob: 1273db247cb0c0ae28b54c366129e4044b253a6b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.management.internal.cli.commands;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
import org.apache.geode.cache.configuration.CacheConfig;
import org.apache.geode.cache.configuration.DeclarableType;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.management.cli.ConverterHint;
import org.apache.geode.management.cli.SingleGfshCommand;
import org.apache.geode.management.internal.cli.functions.CliFunctionResult;
import org.apache.geode.management.internal.cli.functions.CreateAsyncEventQueueFunction;
import org.apache.geode.management.internal.cli.i18n.CliStrings;
import org.apache.geode.management.internal.cli.result.model.ResultModel;
import org.apache.geode.management.internal.security.ResourceOperation;
import org.apache.geode.security.ResourcePermission;
public class CreateAsyncEventQueueCommand extends SingleGfshCommand {
@CliCommand(value = CliStrings.CREATE_ASYNC_EVENT_QUEUE,
help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__HELP)
@ResourceOperation(resource = ResourcePermission.Resource.CLUSTER,
operation = ResourcePermission.Operation.MANAGE, target = ResourcePermission.Target.DEPLOY)
public ResultModel createAsyncEventQueue(
@CliOption(key = CliStrings.CREATE_ASYNC_EVENT_QUEUE__ID, mandatory = true,
help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__ID__HELP) String id,
@CliOption(key = {CliStrings.GROUP, CliStrings.GROUPS},
optionContext = ConverterHint.MEMBERGROUP,
help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__GROUP__HELP) String[] groups,
@CliOption(key = CliStrings.CREATE_ASYNC_EVENT_QUEUE__PARALLEL,
unspecifiedDefaultValue = "false", specifiedDefaultValue = "true",
help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__PARALLEL__HELP) boolean parallel,
@CliOption(key = CliStrings.CREATE_ASYNC_EVENT_QUEUE__ENABLEBATCHCONFLATION,
unspecifiedDefaultValue = "false", specifiedDefaultValue = "true",
help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__ENABLEBATCHCONFLATION__HELP) boolean enableBatchConflation,
@CliOption(key = CliStrings.CREATE_ASYNC_EVENT_QUEUE__BATCH_SIZE,
unspecifiedDefaultValue = "100",
help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__BATCH_SIZE__HELP) int batchSize,
@CliOption(key = CliStrings.CREATE_ASYNC_EVENT_QUEUE__BATCHTIMEINTERVAL,
unspecifiedDefaultValue = AsyncEventQueueFactoryImpl.DEFAULT_BATCH_TIME_INTERVAL + "",
help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__BATCHTIMEINTERVAL__HELP) int batchTimeInterval,
@CliOption(key = CliStrings.CREATE_ASYNC_EVENT_QUEUE__PERSISTENT,
unspecifiedDefaultValue = "false", specifiedDefaultValue = "true",
help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__PERSISTENT__HELP) boolean persistent,
@CliOption(key = CliStrings.CREATE_ASYNC_EVENT_QUEUE__DISK_STORE,
help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__DISK_STORE__HELP) String diskStore,
@CliOption(key = CliStrings.CREATE_ASYNC_EVENT_QUEUE__DISKSYNCHRONOUS,
unspecifiedDefaultValue = "true", specifiedDefaultValue = "true",
help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__DISKSYNCHRONOUS__HELP) boolean diskSynchronous,
@CliOption(key = CliStrings.CREATE_ASYNC_EVENT_QUEUE__FORWARD_EXPIRATION_DESTROY,
unspecifiedDefaultValue = "false", specifiedDefaultValue = "true",
help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__FORWARD_EXPIRATION_DESTROY__HELP) boolean forwardExpirationDestroy,
@CliOption(key = CliStrings.CREATE_ASYNC_EVENT_QUEUE__MAXIMUM_QUEUE_MEMORY,
unspecifiedDefaultValue = "100",
help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__MAXIMUM_QUEUE_MEMORY__HELP) int maxQueueMemory,
@CliOption(key = CliStrings.CREATE_ASYNC_EVENT_QUEUE__DISPATCHERTHREADS,
unspecifiedDefaultValue = "1",
help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__DISPATCHERTHREADS__HELP) int dispatcherThreads,
@CliOption(key = CliStrings.CREATE_ASYNC_EVENT_QUEUE__ORDERPOLICY,
unspecifiedDefaultValue = "KEY",
help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__ORDERPOLICY__HELP) String orderPolicy,
@CliOption(key = CliStrings.CREATE_ASYNC_EVENT_QUEUE__GATEWAYEVENTFILTER,
help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__GATEWAYEVENTFILTER__HELP) String[] gatewayEventFilters,
@CliOption(key = CliStrings.CREATE_ASYNC_EVENT_QUEUE__SUBSTITUTION_FILTER,
help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__SUBSTITUTION_FILTER__HELP) String gatewaySubstitutionListener,
@CliOption(key = CliStrings.CREATE_ASYNC_EVENT_QUEUE__LISTENER, mandatory = true,
help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__LISTENER__HELP) String listener,
@CliOption(key = CliStrings.CREATE_ASYNC_EVENT_QUEUE__LISTENER_PARAM_AND_VALUE,
help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__LISTENER_PARAM_AND_VALUE__HELP) String[] listenerParamsAndValues,
@CliOption(key = CliStrings.CREATE_ASYNC_EVENT_QUEUE__PAUSE_EVENT_PROCESSING,
help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__PAUSE_EVENT_PROCESSING__HELP,
unspecifiedDefaultValue = "false",
specifiedDefaultValue = "true") boolean pauseEventProcessing) {
if (persistent) {
authorize(ResourcePermission.Resource.CLUSTER, ResourcePermission.Operation.WRITE,
ResourcePermission.Target.DISK);
}
Properties listenerProperties = new Properties();
if (listenerParamsAndValues != null) {
for (String listenerParamsAndValue : listenerParamsAndValues) {
final int hashPosition = listenerParamsAndValue.indexOf('#');
if (hashPosition == -1) {
listenerProperties.put(listenerParamsAndValue, "");
} else {
listenerProperties.put(listenerParamsAndValue.substring(0, hashPosition),
listenerParamsAndValue.substring(hashPosition + 1));
}
}
}
Set<DistributedMember> targetMembers = getMembers(groups, null);
CacheConfig.AsyncEventQueue config = new CacheConfig.AsyncEventQueue();
config.setAsyncEventListener(new DeclarableType(listener, listenerProperties));
config.setBatchSize(String.valueOf(batchSize));
config.setBatchTimeInterval(String.valueOf(batchTimeInterval));
config.setDiskStoreName(diskStore);
config.setDiskSynchronous(diskSynchronous);
config.setDispatcherThreads(String.valueOf(dispatcherThreads));
config.setEnableBatchConflation(enableBatchConflation);
config.setForwardExpirationDestroy(forwardExpirationDestroy);
if (gatewayEventFilters != null) {
config.getGatewayEventFilters().addAll(Arrays.stream(gatewayEventFilters)
.map(classname -> new DeclarableType((classname))).collect(Collectors.toList()));
}
if (gatewaySubstitutionListener != null) {
config.setGatewayEventSubstitutionFilter(new DeclarableType(gatewaySubstitutionListener));
}
config.setId(id);
config.setMaximumQueueMemory(String.valueOf(maxQueueMemory));
config.setOrderPolicy(orderPolicy);
config.setParallel(parallel);
config.setPersistent(persistent);
config.setPauseEventProcessing(pauseEventProcessing);
CreateAsyncEventQueueFunction function = new CreateAsyncEventQueueFunction();
List<CliFunctionResult> results = executeAndGetFunctionResult(function, config, targetMembers);
ResultModel commandResult = ResultModel.createMemberStatusResult(results);
commandResult.setConfigObject(config);
return commandResult;
}
@Override
public boolean updateConfigForGroup(String group, CacheConfig config, Object configObject) {
config.getAsyncEventQueues().add((CacheConfig.AsyncEventQueue) configObject);
return true;
}
}