blob: d2d61a91f6292f42e787044755006823b1798860 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.distributedlog.client.routing;
import static org.junit.Assert.assertEquals;
import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
import com.twitter.finagle.Address;
import com.twitter.finagle.Addresses;
import com.twitter.finagle.addr.WeightedAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test Case for {@link RoutingService}.
*/
@RunWith(Parameterized.class)
public class TestRoutingService {
static final Logger LOG = LoggerFactory.getLogger(TestRoutingService.class);
@Parameterized.Parameters
public static Collection<Object[]> configs() {
ArrayList<Object[]> list = new ArrayList<Object[]>();
for (int i = 0; i <= 1; i++) {
for (int j = 0; j <= 1; j++) {
for (int k = 0; k <= 1; k++) {
list.add(new Boolean[] {i == 1, j == 1, k == 1});
}
}
}
return list;
}
private final boolean consistentHash;
private final boolean weightedAddresses;
private final boolean asyncResolution;
public TestRoutingService(boolean consistentHash, boolean weightedAddresses, boolean asyncResolution) {
this.consistentHash = consistentHash;
this.weightedAddresses = weightedAddresses;
this.asyncResolution = asyncResolution;
}
private List<Address> getAddresses(boolean weightedAddresses) {
ArrayList<Address> addresses = new ArrayList<Address>();
addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.1", 3181)));
addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.2", 3181)));
addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.3", 3181)));
addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.4", 3181)));
addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.5", 3181)));
addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.6", 3181)));
addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.7", 3181)));
if (weightedAddresses) {
ArrayList<Address> wAddresses = new ArrayList<Address>();
for (Address address: addresses) {
wAddresses.add(WeightedAddress.apply(address, 1.0));
}
return wAddresses;
} else {
return addresses;
}
}
private void testRoutingServiceHelper(boolean consistentHash,
boolean weightedAddresses,
boolean asyncResolution)
throws Exception {
ExecutorService executorService = null;
final List<Address> addresses = getAddresses(weightedAddresses);
final TestName name = new TestName();
RoutingService routingService;
if (consistentHash) {
routingService = ConsistentHashRoutingService.newBuilder()
.serverSet(new NameServerSet(name))
.resolveFromName(true)
.numReplicas(997)
.build();
} else {
routingService = ServerSetRoutingService.newServerSetRoutingServiceBuilder()
.serverSetWatcher(new TwitterServerSetWatcher(new NameServerSet(name), true)).build();
}
if (asyncResolution) {
executorService = Executors.newSingleThreadExecutor();
executorService.submit(new Runnable() {
@Override
public void run() {
name.changeAddrs(addresses);
}
});
} else {
name.changeAddrs(addresses);
}
routingService.startService();
HashSet<SocketAddress> mapping = new HashSet<SocketAddress>();
for (int i = 0; i < 1000; i++) {
for (int j = 0; j < 5; j++) {
String stream = "TestStream-" + i + "-" + j;
mapping.add(routingService.getHost(stream,
RoutingService.RoutingContext.of(new DefaultRegionResolver())));
}
}
assertEquals(mapping.size(), addresses.size());
if (null != executorService) {
executorService.shutdown();
}
}
@Test(timeout = 5000)
public void testRoutingService() throws Exception {
testRoutingServiceHelper(this.consistentHash, this.weightedAddresses, this.asyncResolution);
}
}