blob: 604be0ee12abf2db353a871215b1fa99797623d3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.distributedlog;
import static org.junit.Assert.assertTrue;
import com.twitter.distributedlog.impl.BKLogSegmentEntryWriter;
import com.twitter.distributedlog.logsegment.LogSegmentEntryWriter;
import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
import com.twitter.distributedlog.namespace.DistributedLogNamespace;
import com.twitter.distributedlog.util.PermitLimiter;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.feature.SettableFeatureProvider;
import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
public class TestDistributedLogBase {
static final Logger LOG = LoggerFactory.getLogger(TestDistributedLogBase.class);
// Num worker threads should be one, since the exec service is used for the ordered
// future pool in test cases, and setting to > 1 will therefore result in unordered
// write ops.
protected static DistributedLogConfiguration conf =
new DistributedLogConfiguration()
.setEnableReadAhead(true)
.setReadAheadMaxRecords(1000)
.setReadAheadBatchSize(10)
.setLockTimeout(1)
.setNumWorkerThreads(1)
.setReadAheadNoSuchLedgerExceptionOnReadLACErrorThresholdMillis(20)
.setSchedulerShutdownTimeoutMs(0)
.setDLLedgerMetadataLayoutVersion(LogSegmentMetadata.LEDGER_METADATA_CURRENT_LAYOUT_VERSION);
protected ZooKeeper zkc;
protected static LocalDLMEmulator bkutil;
protected static ZooKeeperServerShim zks;
protected static String zkServers;
protected static int zkPort;
protected static int numBookies = 3;
protected static final List<File> tmpDirs = new ArrayList<File>();
@BeforeClass
public static void setupCluster() throws Exception {
File zkTmpDir = IOUtils.createTempDir("zookeeper", "distrlog");
tmpDirs.add(zkTmpDir);
Pair<ZooKeeperServerShim, Integer> serverAndPort = LocalDLMEmulator.runZookeeperOnAnyPort(zkTmpDir);
zks = serverAndPort.getLeft();
zkPort = serverAndPort.getRight();
bkutil = LocalDLMEmulator.newBuilder()
.numBookies(numBookies)
.zkHost("127.0.0.1")
.zkPort(zkPort)
.serverConf(DLMTestUtil.loadTestBkConf())
.shouldStartZK(false)
.build();
bkutil.start();
zkServers = "127.0.0.1:" + zkPort;
}
@AfterClass
public static void teardownCluster() throws Exception {
bkutil.teardown();
zks.stop();
for (File dir : tmpDirs) {
FileUtils.deleteDirectory(dir);
}
}
@Before
public void setup() throws Exception {
try {
zkc = LocalDLMEmulator.connectZooKeeper("127.0.0.1", zkPort);
} catch (Exception ex) {
LOG.error("hit exception connecting to zookeeper at {}:{}", new Object[] { "127.0.0.1", zkPort, ex });
throw ex;
}
}
@After
public void teardown() throws Exception {
if (null != zkc) {
zkc.close();
}
}
protected LogRecord waitForNextRecord(LogReader reader) throws Exception {
LogRecord record = reader.readNext(false);
while (null == record) {
record = reader.readNext(false);
}
return record;
}
public URI createDLMURI(String path) throws Exception {
return DLMTestUtil.createDLMURI(zkPort, path);
}
protected void ensureURICreated(URI uri) throws Exception {
ensureURICreated(zkc, uri);
}
protected void ensureURICreated(ZooKeeper zkc, URI uri) throws Exception {
try {
zkc.create(uri.getPath(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException nee) {
// ignore
}
}
public BKDistributedLogManager createNewDLM(DistributedLogConfiguration conf,
String name) throws Exception {
URI uri = createDLMURI("/" + name);
ensureURICreated(uri);
return new BKDistributedLogManager(
name,
conf,
uri,
null,
null,
null,
null,
null,
null,
new SettableFeatureProvider("", 0),
PermitLimiter.NULL_PERMIT_LIMITER,
NullStatsLogger.INSTANCE
);
}
public BKDistributedLogManager createNewDLM(DistributedLogConfiguration conf,
String name,
PermitLimiter writeLimiter)
throws Exception {
URI uri = createDLMURI("/" + name);
ensureURICreated(uri);
return new BKDistributedLogManager(
name,
conf,
uri,
null,
null,
null,
null,
null,
null,
new SettableFeatureProvider("", 0),
writeLimiter,
NullStatsLogger.INSTANCE
);
}
public DLMTestUtil.BKLogPartitionWriteHandlerAndClients createNewBKDLM(
DistributedLogConfiguration conf,
String path) throws Exception {
return DLMTestUtil.createNewBKDLM(conf, path, zkPort);
}
@SuppressWarnings("deprecation")
protected LogSegmentMetadataStore getLogSegmentMetadataStore(DistributedLogManagerFactory factory) {
DistributedLogNamespace namespace = factory.getNamespace();
assertTrue(namespace instanceof BKDistributedLogNamespace);
return ((BKDistributedLogNamespace) namespace).getWriterSegmentMetadataStore();
}
@SuppressWarnings("deprecation")
protected ZooKeeperClient getZooKeeperClient(DistributedLogManagerFactory factory) throws Exception {
DistributedLogNamespace namespace = factory.getNamespace();
assertTrue(namespace instanceof BKDistributedLogNamespace);
return ((BKDistributedLogNamespace) namespace).getSharedWriterZKCForDL();
}
@SuppressWarnings("deprecation")
protected BookKeeperClient getBookKeeperClient(DistributedLogManagerFactory factory) throws Exception {
DistributedLogNamespace namespace = factory.getNamespace();
assertTrue(namespace instanceof BKDistributedLogNamespace);
return ((BKDistributedLogNamespace) namespace).getReaderBKC();
}
protected LedgerHandle getLedgerHandle(BKLogSegmentWriter segmentWriter) {
LogSegmentEntryWriter entryWriter = segmentWriter.getEntryWriter();
assertTrue(entryWriter instanceof BKLogSegmentEntryWriter);
return ((BKLogSegmentEntryWriter) entryWriter).getLedgerHandle();
}
}