blob: 10d6400d38d49bd9f58cfd843906d1fec034d20b [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.management.plugin.controller.v6_1.category;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.qpid.server.management.plugin.controller.ConverterHelper;
import org.apache.qpid.server.management.plugin.controller.LegacyConfiguredObject;
import org.apache.qpid.server.management.plugin.controller.LegacyManagementController;
import org.apache.qpid.server.management.plugin.controller.TypeController;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.ConfiguredObject;
public class QueueController extends DestinationController
{
public static final String TYPE = "Queue";
QueueController(final LegacyManagementController legacyManagementController,
final Set<TypeController> typeControllers)
{
super(legacyManagementController,
TYPE,
new String[]{VirtualHostController.TYPE},
"standard",
typeControllers);
}
@Override
protected LegacyConfiguredObject convertNextVersionLegacyConfiguredObject(final LegacyConfiguredObject object)
{
return new LegacyQueue(getManagementController(), object);
}
@Override
public Map<String, Object> convertAttributesToNextVersion(final ConfiguredObject<?> root,
final List<String> path,
final Map<String, Object> attributes)
{
Map<String, Object> converted = new LinkedHashMap<>(attributes);
Object queueFlowControlSizeBytes = converted.remove("queueFlowControlSizeBytes");
Object queueFlowResumeSizeBytes = converted.remove("queueFlowResumeSizeBytes");
if (queueFlowControlSizeBytes != null)
{
long queueFlowControlSizeBytesValue = ConverterHelper.toLong(queueFlowControlSizeBytes);
if (queueFlowControlSizeBytesValue > 0)
{
if (queueFlowResumeSizeBytes != null)
{
long queueFlowResumeSizeBytesValue = ConverterHelper.toLong(queueFlowResumeSizeBytes);
double ratio = ((double) queueFlowResumeSizeBytesValue)
/ ((double) queueFlowControlSizeBytesValue);
String flowResumeLimit = String.format("%.2f", ratio * 100.0);
Object context = converted.get("context");
Map<String, String> contextMap;
if (context instanceof Map)
{
contextMap = (Map) context;
}
else
{
contextMap = new LinkedHashMap<>();
converted.put("context", contextMap);
}
contextMap.put("queue.queueFlowResumeLimit", flowResumeLimit);
}
converted.put("overflowPolicy", "PRODUCER_FLOW_CONTROL");
converted.put("maximumQueueDepthBytes", queueFlowControlSizeBytes);
}
}
if (converted.containsKey("messageGroupKey"))
{
if (converted.containsKey("messageGroupSharedGroups")
&& ConverterHelper.toBoolean(converted.remove("messageGroupSharedGroups")))
{
converted.put("messageGroupType", "SHARED_GROUPS");
}
else
{
converted.put("messageGroupType", "STANDARD");
}
Object oldMessageGroupKey = converted.remove("messageGroupKey");
if (!"JMSXGroupId".equals(oldMessageGroupKey))
{
converted.put("messageGroupKeyOverride", oldMessageGroupKey);
}
}
else
{
converted.put("messageGroupType", "NONE");
}
return super.convertAttributesToNextVersion(root, path, converted);
}
static class LegacyQueue extends LegacyDestination
{
static final String QUEUE_FLOW_RESUME_SIZE_BYTES = "queueFlowResumeSizeBytes";
static final String QUEUE_FLOW_CONTROL_SIZE_BYTES = "queueFlowControlSizeBytes";
static final String MESSAGE_GROUP_SHARED_GROUPS = "messageGroupSharedGroups";
static final String MESSAGE_GROUP_KEY = "messageGroupKey";
Set<String> MOVED_ATTRIBUTES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
QUEUE_FLOW_RESUME_SIZE_BYTES,
QUEUE_FLOW_CONTROL_SIZE_BYTES,
MESSAGE_GROUP_SHARED_GROUPS,
MESSAGE_GROUP_KEY,
ALTERNATE_EXCHANGE)));
LegacyQueue(final LegacyManagementController managementController,
final LegacyConfiguredObject nextVersionQueue)
{
super(managementController, nextVersionQueue, QueueController.TYPE);
}
@Override
public Collection<String> getAttributeNames()
{
return Stream.concat(super.getAttributeNames().stream(),
MOVED_ATTRIBUTES.stream()).collect(Collectors.toSet());
}
@Override
@SuppressWarnings("unchecked")
public Collection<LegacyConfiguredObject> getChildren(final String category)
{
if (BindingController.TYPE.equals(category))
{
String queueName = (String) getAttribute(NAME);
Collection<LegacyConfiguredObject> exchanges =
getNextVersionLegacyConfiguredObject().getParent(VirtualHostController.TYPE)
.getChildren(ExchangeController.TYPE);
List<LegacyConfiguredObject> bindingObjects = new ArrayList<>();
for (LegacyConfiguredObject exchange : exchanges)
{
Object bindings = exchange.getAttribute("bindings");
if (bindings instanceof Collection)
{
Collection<?> exchangeBindings = (Collection<?>) bindings;
exchangeBindings.stream()
.map(Binding.class::cast)
.filter(i -> i.getDestination().equals(queueName))
.map(i -> new BindingController.LegacyBinding(getManagementController(),
exchange,
getNextVersionLegacyConfiguredObject(),
i.getBindingKey(),
i.getArguments()))
.forEach(bindingObjects::add);
}
}
return bindingObjects;
}
return super.getChildren(category);
}
protected Object getAttributeInternal(final String name, boolean isActual)
{
if (QUEUE_FLOW_CONTROL_SIZE_BYTES.equals(name))
{
final Object overflowPolicy = getAttribute("overflowPolicy", isActual);
if (overflowPolicy != null)
{
final Object maximumQueueDepthBytes = getAttribute("maximumQueueDepthBytes", isActual);
if ("PRODUCER_FLOW_CONTROL".equals(String.valueOf(overflowPolicy))
&& maximumQueueDepthBytes != null)
{
return maximumQueueDepthBytes;
}
}
return null;
}
else if (QUEUE_FLOW_RESUME_SIZE_BYTES.equals(name))
{
final Object overflowPolicy = getAttribute("overflowPolicy", isActual);
final Object maximumQueueDepthBytes = getAttribute("maximumQueueDepthBytes", isActual);
if ("PRODUCER_FLOW_CONTROL".equals(String.valueOf(overflowPolicy)) && maximumQueueDepthBytes != null)
{
final long queueFlowControlSizeBytesValue = ConverterHelper.toLong(maximumQueueDepthBytes);
if (queueFlowControlSizeBytesValue > 0)
{
int queueFlowResumeLimit = 80;
@SuppressWarnings("unchecked")
Map<String, String> context = (Map<String, String>) super.getAttribute(CONTEXT);
if (context != null)
{
queueFlowResumeLimit = ConverterHelper.toInt(context.get("queue.queueFlowResumeLimit"));
if (queueFlowResumeLimit == 0)
{
queueFlowResumeLimit = 80;
}
}
return Math.round(queueFlowControlSizeBytesValue * ((double) queueFlowResumeLimit) / 100.0);
}
}
return null;
}
else if (MESSAGE_GROUP_SHARED_GROUPS.equals(name))
{
Object messageGroupType = getAttribute("messageGroupType", isActual);
if (messageGroupType != null)
{
String type = String.valueOf(messageGroupType);
return "SHARED_GROUPS".equals(type);
}
return null;
}
else if (MESSAGE_GROUP_KEY.equals(name))
{
Object messageGroupKeyOverride = getAttribute("messageGroupKeyOverride", isActual);
Object messageGroupType = getAttribute("messageGroupType", isActual);
if (messageGroupType != null)
{
return messageGroupKeyOverride == null ? "JMSXGroupId" : messageGroupKeyOverride;
}
return null;
}
return super.getAttributeInternal(name, isActual);
}
@Override
public boolean isSecureAttribute(final String name)
{
if (MOVED_ATTRIBUTES.contains(name))
{
return false;
}
return super.isSecureAttribute(name);
}
@Override
public boolean isOversizedAttribute(final String name)
{
if (MOVED_ATTRIBUTES.contains(name))
{
return false;
}
return super.isOversizedAttribute(name);
}
}
}