| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| |
| package org.apache.geode.management.internal.cli.commands; |
| |
| import static org.apache.geode.management.internal.cli.i18n.CliStrings.CREATE_ASYNC_EVENT_QUEUE__BATCHTIMEINTERVAL__HELP; |
| import static org.apache.geode.management.internal.cli.i18n.CliStrings.CREATE_ASYNC_EVENT_QUEUE__BATCH_SIZE__HELP; |
| import static org.apache.geode.management.internal.cli.i18n.CliStrings.CREATE_ASYNC_EVENT_QUEUE__MAXIMUM_QUEUE_MEMORY__HELP; |
| import static org.apache.geode.management.internal.cli.i18n.CliStrings.IFEXISTS; |
| import static org.apache.geode.management.internal.cli.i18n.CliStrings.IFEXISTS_HELP; |
| |
| import java.io.IOException; |
| import java.util.List; |
| import java.util.Set; |
| |
| import javax.xml.parsers.ParserConfigurationException; |
| import javax.xml.transform.TransformerException; |
| |
| import org.apache.commons.lang3.StringUtils; |
| import org.springframework.shell.core.annotation.CliCommand; |
| import org.springframework.shell.core.annotation.CliOption; |
| import org.xml.sax.SAXException; |
| |
| import org.apache.geode.cache.configuration.CacheConfig; |
| import org.apache.geode.cache.configuration.CacheElement; |
| import org.apache.geode.distributed.internal.InternalConfigurationPersistenceService; |
| import org.apache.geode.management.cli.CliMetaData; |
| import org.apache.geode.management.cli.SingleGfshCommand; |
| import org.apache.geode.management.cli.UpdateAllConfigurationGroupsMarker; |
| import org.apache.geode.management.internal.cli.AbstractCliAroundInterceptor; |
| import org.apache.geode.management.internal.cli.GfshParseResult; |
| import org.apache.geode.management.internal.cli.result.model.ResultModel; |
| import org.apache.geode.management.internal.exceptions.EntityNotFoundException; |
| import org.apache.geode.management.internal.security.ResourceOperation; |
| import org.apache.geode.security.ResourcePermission; |
| |
| /** |
| * this command currently only updates the cluster configuration. Requires server restart to pick up |
| * the changes. |
| */ |
| public class AlterAsyncEventQueueCommand extends SingleGfshCommand implements |
| UpdateAllConfigurationGroupsMarker { |
| |
| public static final String GROUP_STATUS_SECTION = "group-status"; |
| static final String COMMAND_NAME = "alter async-event-queue"; |
| static final String ID = "id"; |
| static final String BATCH_SIZE = "batch-size"; |
| static final String BATCH_TIME_INTERVAL = "batch-time-interval"; |
| static final String MAX_QUEUE_MEMORY = "max-queue-memory"; |
| static final String MAXIMUM_QUEUE_MEMORY = "maximum-queue-memory"; |
| |
| static final String COMMAND_HELP = |
| "alter attributes of async-event-queue, needs rolling restart for new attributes to take effect. "; |
| static final String ID_HELP = "Id of the async event queue to be changed."; |
| static final String BATCH_SIZE_HELP = CREATE_ASYNC_EVENT_QUEUE__BATCH_SIZE__HELP; |
| static final String BATCH_TIME_INTERVAL_HELP = CREATE_ASYNC_EVENT_QUEUE__BATCHTIMEINTERVAL__HELP; |
| static final String MAXIMUM_QUEUE_MEMORY_HELP = |
| CREATE_ASYNC_EVENT_QUEUE__MAXIMUM_QUEUE_MEMORY__HELP; |
| static final String PAUSE_EVENT_PROCESSING = "pause-event-processing"; |
| static final String PAUSE_EVENT_PROCESSING_HELP = |
| "Pause event processing when the async event queue is created"; |
| |
| @CliCommand(value = COMMAND_NAME, help = COMMAND_HELP) |
| @CliMetaData( |
| interceptor = "org.apache.geode.management.internal.cli.commands.AlterAsyncEventQueueCommand$Interceptor") |
| @ResourceOperation(resource = ResourcePermission.Resource.CLUSTER, |
| operation = ResourcePermission.Operation.MANAGE, target = ResourcePermission.Target.DEPLOY) |
| public ResultModel execute(@CliOption(key = ID, mandatory = true, help = ID_HELP) String id, |
| @CliOption(key = BATCH_SIZE, help = BATCH_SIZE_HELP) Integer batchSize, |
| @CliOption(key = BATCH_TIME_INTERVAL, |
| help = BATCH_TIME_INTERVAL_HELP) Integer batchTimeInterval, |
| @CliOption(key = MAX_QUEUE_MEMORY, help = MAXIMUM_QUEUE_MEMORY_HELP) Integer maxQueueMemory, |
| @CliOption(key = IFEXISTS, help = IFEXISTS_HELP, specifiedDefaultValue = "true", |
| unspecifiedDefaultValue = "false") boolean ifExists, |
| @CliOption(key = PAUSE_EVENT_PROCESSING, help = PAUSE_EVENT_PROCESSING_HELP, |
| specifiedDefaultValue = "true", |
| unspecifiedDefaultValue = "false") boolean pauseEventProcessing) |
| throws IOException, SAXException, ParserConfigurationException, TransformerException, |
| EntityNotFoundException { |
| |
| // need not check if any running servers has this async-event-queue. A server with this queue id |
| // may be shutdown, but we still need to update Cluster Configuration. |
| if (getConfigurationPersistenceService() == null) { |
| return ResultModel.createError("Cluster Configuration Service is not available. " |
| + "Please connect to a locator with running Cluster Configuration Service."); |
| } |
| |
| if (findAEQ(id) == null) { |
| String message = String.format("Can not find an async event queue with id '%s'.", id); |
| throw new EntityNotFoundException(message, ifExists); |
| } |
| |
| CacheConfig.AsyncEventQueue aeqConfiguration = new CacheConfig.AsyncEventQueue(); |
| aeqConfiguration.setId(id); |
| aeqConfiguration.setPauseEventProcessing(pauseEventProcessing); |
| |
| if (batchSize != null) { |
| aeqConfiguration.setBatchSize(batchSize + ""); |
| } |
| if (batchTimeInterval != null) { |
| aeqConfiguration.setBatchTimeInterval(batchTimeInterval + ""); |
| } |
| if (maxQueueMemory != null) { |
| aeqConfiguration.setMaximumQueueMemory(maxQueueMemory + ""); |
| } |
| |
| ResultModel result = new ResultModel(); |
| result.addInfo().addLine("Please restart the servers to apply any changed configuration"); |
| result.setConfigObject(aeqConfiguration); |
| |
| return result; |
| } |
| |
| CacheConfig.AsyncEventQueue findAEQ(String aeqId) { |
| CacheConfig.AsyncEventQueue queue = null; |
| InternalConfigurationPersistenceService ccService = |
| (InternalConfigurationPersistenceService) this.getConfigurationPersistenceService(); |
| if (ccService == null) { |
| return null; |
| } |
| |
| Set<String> groups = ccService.getGroups(); |
| for (String group : groups) { |
| queue = |
| CacheElement.findElement(ccService.getCacheConfig(group).getAsyncEventQueues(), aeqId); |
| if (queue != null) { |
| return queue; |
| } |
| } |
| return queue; |
| } |
| |
| @Override |
| public boolean updateConfigForGroup(String group, CacheConfig config, Object configObject) { |
| |
| boolean aeqConfigsHaveBeenUpdated = false; |
| |
| List<CacheConfig.AsyncEventQueue> queues = config.getAsyncEventQueues(); |
| if (queues.isEmpty()) { |
| return false; |
| } |
| |
| CacheConfig.AsyncEventQueue aeqConfiguration = |
| ((CacheConfig.AsyncEventQueue) configObject); |
| |
| String aeqId = aeqConfiguration.getId(); |
| |
| for (CacheConfig.AsyncEventQueue queue : queues) { |
| if (aeqId.equals(queue.getId())) { |
| if (StringUtils.isNotBlank(aeqConfiguration.getBatchSize())) { |
| queue.setBatchSize(aeqConfiguration.getBatchSize()); |
| } |
| |
| if (StringUtils.isNotBlank(aeqConfiguration.getBatchTimeInterval())) { |
| queue.setBatchTimeInterval(aeqConfiguration.getBatchTimeInterval()); |
| } |
| |
| if (StringUtils.isNotBlank(aeqConfiguration.getMaximumQueueMemory())) { |
| queue.setMaximumQueueMemory(aeqConfiguration.getMaximumQueueMemory()); |
| } |
| if (aeqConfiguration.isPauseEventProcessing() != null) { |
| queue.setPauseEventProcessing(aeqConfiguration.isPauseEventProcessing()); |
| } |
| aeqConfigsHaveBeenUpdated = true; |
| } |
| |
| } |
| return aeqConfigsHaveBeenUpdated; |
| |
| } |
| |
| public static class Interceptor extends AbstractCliAroundInterceptor { |
| @Override |
| public ResultModel preExecution(GfshParseResult parseResult) { |
| Object batchSize = parseResult.getParamValue(BATCH_SIZE); |
| Object batchTimeInterval = parseResult.getParamValue(BATCH_TIME_INTERVAL); |
| Object maxQueueMemory = parseResult.getParamValue(MAX_QUEUE_MEMORY); |
| |
| if (batchSize == null && batchTimeInterval == null && maxQueueMemory == null) { |
| return ResultModel.createError("need to specify at least one option to modify."); |
| } |
| return new ResultModel(); |
| } |
| } |
| } |