blob: 58b5b2a470f6bf4535b2c793a28790c540aa4692 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.distributedlog.service;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import com.google.common.base.Optional;
import com.google.common.collect.Sets;
import org.apache.distributedlog.DLMTestUtil;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.client.DistributedLogClientImpl;
import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
import org.apache.distributedlog.client.routing.LocalRoutingService;
import org.apache.distributedlog.client.routing.RegionsRoutingService;
import org.apache.distributedlog.service.DistributedLogCluster.DLServer;
import org.apache.distributedlog.service.stream.StreamManager;
import org.apache.distributedlog.service.stream.StreamManagerImpl;
import com.twitter.finagle.builder.ClientBuilder;
import com.twitter.finagle.thrift.ClientId$;
import com.twitter.util.Duration;
import java.net.SocketAddress;
import java.net.URI;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
/**
* Base test case for distributedlog servers.
*/
public abstract class DistributedLogServerTestCase {
protected static DistributedLogConfiguration conf =
new DistributedLogConfiguration().setLockTimeout(10)
.setOutputBufferSize(0).setPeriodicFlushFrequencyMilliSeconds(10);
protected static DistributedLogConfiguration noAdHocConf =
new DistributedLogConfiguration().setLockTimeout(10).setCreateStreamIfNotExists(false)
.setOutputBufferSize(0).setPeriodicFlushFrequencyMilliSeconds(10);
protected static DistributedLogCluster dlCluster;
protected static DistributedLogCluster noAdHocCluster;
/**
* A distributedlog client wrapper for testing.
*/
protected static class DLClient {
public final LocalRoutingService routingService;
public DistributedLogClientBuilder dlClientBuilder;
public final DistributedLogClientImpl dlClient;
protected DLClient(String name,
String streamNameRegex,
Optional<String> serverSideRoutingFinagleName) {
routingService = LocalRoutingService.newBuilder().build();
dlClientBuilder = DistributedLogClientBuilder.newBuilder()
.name(name)
.clientId(ClientId$.MODULE$.apply(name))
.routingService(routingService)
.streamNameRegex(streamNameRegex)
.handshakeWithClientInfo(true)
.clientBuilder(ClientBuilder.get()
.hostConnectionLimit(1)
.connectionTimeout(Duration.fromSeconds(1))
.requestTimeout(Duration.fromSeconds(60)));
if (serverSideRoutingFinagleName.isPresent()) {
dlClientBuilder =
dlClientBuilder.serverRoutingServiceFinagleNameStr(serverSideRoutingFinagleName.get());
}
dlClient = (DistributedLogClientImpl) dlClientBuilder.build();
}
public void handshake() {
dlClient.handshake();
}
public void shutdown() {
dlClient.close();
}
}
/**
* A distributedlog client wrapper that talks to two regions.
*/
protected static class TwoRegionDLClient {
public final LocalRoutingService localRoutingService;
public final LocalRoutingService remoteRoutingService;
public final DistributedLogClientBuilder dlClientBuilder;
public final DistributedLogClientImpl dlClient;
protected TwoRegionDLClient(String name, Map<SocketAddress, String> regionMap) {
localRoutingService = new LocalRoutingService();
remoteRoutingService = new LocalRoutingService();
RegionsRoutingService regionsRoutingService =
RegionsRoutingService.of(new DefaultRegionResolver(regionMap),
localRoutingService, remoteRoutingService);
dlClientBuilder = DistributedLogClientBuilder.newBuilder()
.name(name)
.clientId(ClientId$.MODULE$.apply(name))
.routingService(regionsRoutingService)
.streamNameRegex(".*")
.handshakeWithClientInfo(true)
.maxRedirects(2)
.clientBuilder(ClientBuilder.get()
.hostConnectionLimit(1)
.connectionTimeout(Duration.fromSeconds(1))
.requestTimeout(Duration.fromSeconds(10)));
dlClient = (DistributedLogClientImpl) dlClientBuilder.build();
}
public void shutdown() {
dlClient.close();
}
}
private final boolean clientSideRouting;
protected DLServer dlServer;
protected DLClient dlClient;
protected DLServer noAdHocServer;
protected DLClient noAdHocClient;
public static DistributedLogCluster createCluster(DistributedLogConfiguration conf) throws Exception {
return DistributedLogCluster.newBuilder()
.numBookies(3)
.shouldStartZK(true)
.zkServers("127.0.0.1")
.shouldStartProxy(false)
.dlConf(conf)
.bkConf(DLMTestUtil.loadTestBkConf())
.build();
}
@BeforeClass
public static void setupCluster() throws Exception {
dlCluster = createCluster(conf);
dlCluster.start();
}
public void setupNoAdHocCluster() throws Exception {
noAdHocCluster = createCluster(noAdHocConf);
noAdHocCluster.start();
noAdHocServer = new DLServer(noAdHocConf, noAdHocCluster.getUri(), 7002, false);
Optional<String> serverSideRoutingFinagleName = Optional.absent();
if (!clientSideRouting) {
serverSideRoutingFinagleName =
Optional.of("inet!" + DLSocketAddress.toString(noAdHocServer.getAddress()));
}
noAdHocClient = createDistributedLogClient("no-ad-hoc-client", serverSideRoutingFinagleName);
}
public void tearDownNoAdHocCluster() throws Exception {
if (null != noAdHocClient) {
noAdHocClient.shutdown();
}
if (null != noAdHocServer) {
noAdHocServer.shutdown();
}
}
@AfterClass
public static void teardownCluster() throws Exception {
if (null != dlCluster) {
dlCluster.stop();
}
if (null != noAdHocCluster) {
noAdHocCluster.stop();
}
}
protected static URI getUri() {
return dlCluster.getUri();
}
protected DistributedLogServerTestCase(boolean clientSideRouting) {
this.clientSideRouting = clientSideRouting;
}
@Before
public void setup() throws Exception {
dlServer = createDistributedLogServer(7001);
Optional<String> serverSideRoutingFinagleName = Optional.absent();
if (!clientSideRouting) {
serverSideRoutingFinagleName =
Optional.of("inet!" + DLSocketAddress.toString(dlServer.getAddress()));
}
dlClient = createDistributedLogClient("test", serverSideRoutingFinagleName);
}
@After
public void teardown() throws Exception {
if (null != dlClient) {
dlClient.shutdown();
}
if (null != dlServer) {
dlServer.shutdown();
}
}
protected DLServer createDistributedLogServer(int port) throws Exception {
return new DLServer(conf, dlCluster.getUri(), port, false);
}
protected DLServer createDistributedLogServer(DistributedLogConfiguration conf, int port)
throws Exception {
return new DLServer(conf, dlCluster.getUri(), port, false);
}
protected DLClient createDistributedLogClient(String clientName,
Optional<String> serverSideRoutingFinagleName)
throws Exception {
return createDistributedLogClient(clientName, ".*", serverSideRoutingFinagleName);
}
protected DLClient createDistributedLogClient(String clientName,
String streamNameRegex,
Optional<String> serverSideRoutingFinagleName)
throws Exception {
return new DLClient(clientName, streamNameRegex, serverSideRoutingFinagleName);
}
protected TwoRegionDLClient createTwoRegionDLClient(String clientName,
Map<SocketAddress, String> regionMap)
throws Exception {
return new TwoRegionDLClient(clientName, regionMap);
}
protected static void checkStreams(int numExpectedStreams, DLServer dlServer) {
StreamManager streamManager = dlServer.dlServer.getKey().getStreamManager();
assertEquals(numExpectedStreams, streamManager.numCached());
assertEquals(numExpectedStreams, streamManager.numAcquired());
}
protected static void checkStreams(Set<String> streams, DLServer dlServer) {
StreamManagerImpl streamManager = (StreamManagerImpl) dlServer.dlServer.getKey().getStreamManager();
Set<String> cachedStreams = streamManager.getCachedStreams().keySet();
Set<String> acquiredStreams = streamManager.getAcquiredStreams().keySet();
assertEquals(streams.size(), cachedStreams.size());
assertEquals(streams.size(), acquiredStreams.size());
assertTrue(Sets.difference(streams, cachedStreams).isEmpty());
assertTrue(Sets.difference(streams, acquiredStreams).isEmpty());
}
protected static void checkStream(String name, DLClient dlClient, DLServer dlServer,
int expectedNumProxiesInClient, int expectedClientCacheSize,
int expectedServerCacheSize, boolean existedInServer, boolean existedInClient) {
Map<SocketAddress, Set<String>> distribution = dlClient.dlClient.getStreamOwnershipDistribution();
assertEquals(expectedNumProxiesInClient, distribution.size());
if (expectedNumProxiesInClient > 0) {
Map.Entry<SocketAddress, Set<String>> localEntry =
distribution.entrySet().iterator().next();
assertEquals(dlServer.getAddress(), localEntry.getKey());
assertEquals(expectedClientCacheSize, localEntry.getValue().size());
assertEquals(existedInClient, localEntry.getValue().contains(name));
}
StreamManagerImpl streamManager = (StreamManagerImpl) dlServer.dlServer.getKey().getStreamManager();
Set<String> cachedStreams = streamManager.getCachedStreams().keySet();
Set<String> acquiredStreams = streamManager.getCachedStreams().keySet();
assertEquals(expectedServerCacheSize, cachedStreams.size());
assertEquals(existedInServer, cachedStreams.contains(name));
assertEquals(expectedServerCacheSize, acquiredStreams.size());
assertEquals(existedInServer, acquiredStreams.contains(name));
}
protected static Map<SocketAddress, Set<String>> getStreamOwnershipDistribution(DLClient dlClient) {
return dlClient.dlClient.getStreamOwnershipDistribution();
}
protected static Set<String> getAllStreamsFromDistribution(Map<SocketAddress, Set<String>> distribution) {
Set<String> allStreams = new HashSet<String>();
for (Map.Entry<SocketAddress, Set<String>> entry : distribution.entrySet()) {
allStreams.addAll(entry.getValue());
}
return allStreams;
}
}