blob: d77490e1b8210ae358acbb3a2c88b96f5b78e8a4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.extensions;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.Cleanup;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.loadbalance.AntiAffinityNamespaceGroupTest;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
import org.apache.pulsar.broker.loadbalance.extensions.filter.AntiAffinityGroupPolicyFilter;
import org.apache.pulsar.broker.loadbalance.extensions.policies.AntiAffinityGroupPolicyHelper;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.testng.annotations.Test;
@Test(groups = "broker")
public class AntiAffinityNamespaceGroupExtensionTest extends AntiAffinityNamespaceGroupTest {
final String bundle = "0x00000000_0xffffffff";
final String nsSuffix = "-antiaffinity-enabled";
protected Object getBundleOwnershipData() {
return new HashSet<Map.Entry<String, ServiceUnitStateData>>();
}
protected String getLoadManagerClassName() {
return ExtensibleLoadManagerImpl.class.getName();
}
protected String selectBroker(ServiceUnitId serviceUnit, Object loadManager) {
try {
return ((ExtensibleLoadManagerImpl) loadManager).assign(Optional.empty(), serviceUnit).get()
.get().getPulsarServiceUrl();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
protected void selectBrokerForNamespace(
Object ownershipData,
String broker, String namespace, String assignedBundleName) {
Set<Map.Entry<String, ServiceUnitStateData>> ownershipDataSet =
(Set<Map.Entry<String, ServiceUnitStateData>>) ownershipData;
ownershipDataSet.add(
new AbstractMap.SimpleEntry<String, ServiceUnitStateData>(
assignedBundleName,
new ServiceUnitStateData(Owned, broker, 1)));
}
protected void verifyLoadSheddingWithAntiAffinityNamespace(String namespace, String bundle) {
// No-op
}
protected boolean isLoadManagerUpdatedDomainCache(Object loadManager) throws Exception {
@SuppressWarnings("unchecked")
var antiAffinityGroupPolicyHelper =
(AntiAffinityGroupPolicyHelper)
FieldUtils.readDeclaredField(
loadManager, "antiAffinityGroupPolicyHelper", true);
var brokerToFailureDomainMap = (Map<String, String>)
org.apache.commons.lang.reflect.FieldUtils.readDeclaredField(antiAffinityGroupPolicyHelper,
"brokerToFailureDomainMap", true);
return !brokerToFailureDomainMap.isEmpty();
}
@Test
public void testAntiAffinityGroupPolicyFilter()
throws IllegalAccessException, ExecutionException, InterruptedException,
TimeoutException, PulsarAdminException, PulsarClientException {
final String namespace = "my-tenant/test/my-ns-filter";
final String namespaceAntiAffinityGroup = "my-antiaffinity-filter";
final String antiAffinityEnabledNameSpace = namespace + nsSuffix;
admin.namespaces().createNamespace(antiAffinityEnabledNameSpace);
admin.namespaces().setNamespaceAntiAffinityGroup(antiAffinityEnabledNameSpace, namespaceAntiAffinityGroup);
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsar.getSafeWebServiceAddress()).build();
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(
"persistent://" + antiAffinityEnabledNameSpace + "/my-topic1")
.create();
pulsar.getBrokerService().updateRates();
var brokerRegistry =
(BrokerRegistry)
FieldUtils.readDeclaredField(
primaryLoadManager, "brokerRegistry", true);
var antiAffinityGroupPolicyFilter =
(AntiAffinityGroupPolicyFilter)
FieldUtils.readDeclaredField(
primaryLoadManager, "antiAffinityGroupPolicyFilter", true);
var context = ((ExtensibleLoadManagerImpl) primaryLoadManager).getContext();
var brokers = brokerRegistry
.getAvailableBrokerLookupDataAsync().get(5, TimeUnit.SECONDS);
ServiceUnitId namespaceBundle = mock(ServiceUnitId.class);
doReturn(namespace + "/" + bundle).when(namespaceBundle).toString();
var expected = new HashMap<>(brokers);
var actual = antiAffinityGroupPolicyFilter.filterAsync(
brokers, namespaceBundle, context).get();
assertEquals(actual, expected);
doReturn(antiAffinityEnabledNameSpace + "/" + bundle).when(namespaceBundle).toString();
var serviceUnitStateChannel = (ServiceUnitStateChannel)
FieldUtils.readDeclaredField(
primaryLoadManager, "serviceUnitStateChannel", true);
var srcBroker = serviceUnitStateChannel.getOwnerAsync(namespaceBundle.toString())
.get(5, TimeUnit.SECONDS).get();
expected.remove(srcBroker);
actual = antiAffinityGroupPolicyFilter.filterAsync(
brokers, namespaceBundle, context).get();
assertEquals(actual, expected);
}
}