blob: bbabbb24968748ca3c2e8ce7ce0379561fa7a414 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.distributedlog.impl.metadata;
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.DistributedLogConstants;
import com.twitter.distributedlog.TestZooKeeperClientBuilder;
import com.twitter.distributedlog.impl.metadata.BKDLConfig;
import com.twitter.distributedlog.impl.metadata.ZkMetadataResolver;
import com.twitter.distributedlog.metadata.DLMetadata;
import com.twitter.distributedlog.util.Utils;
import com.twitter.distributedlog.ZooKeeperClient;
import com.twitter.distributedlog.ZooKeeperClientBuilder;
import com.twitter.distributedlog.ZooKeeperClusterTestCase;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.net.URI;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class TestZkMetadataResolver extends ZooKeeperClusterTestCase {
private static final BKDLConfig bkdlConfig = new BKDLConfig("127.0.0.1:7000", "ledgers");
private static final BKDLConfig bkdlConfig2 = new BKDLConfig("127.0.0.1:7000", "ledgers2");
private ZooKeeperClient zkc;
private ZkMetadataResolver resolver;
@Before
public void setup() throws Exception {
zkc = TestZooKeeperClientBuilder.newBuilder()
.uri(createURI("/"))
.sessionTimeoutMs(10000)
.build();
resolver = new ZkMetadataResolver(zkc);
}
@After
public void tearDown() throws Exception {
zkc.close();
}
private URI createURI(String path) {
return URI.create("distributedlog://127.0.0.1:" + zkPort + path);
}
@Test(timeout = 60000)
public void testResolveFailures() throws Exception {
// resolve unexisted path
try {
resolver.resolve(createURI("/unexisted/path"));
fail("Should fail if no metadata resolved.");
} catch (IOException e) {
// expected
}
// resolve existed unbound path
Utils.zkCreateFullPathOptimistic(zkc, "/existed/path", new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
try {
resolver.resolve(createURI("/existed/path"));
fail("Should fail if no metadata resolved.");
} catch (IOException e) {
// expected
}
}
@Test(timeout = 60000)
public void testResolve() throws Exception {
DLMetadata dlMetadata = DLMetadata.create(bkdlConfig);
dlMetadata.create(createURI("/messaging/distributedlog-testresolve"));
DLMetadata dlMetadata2 = DLMetadata.create(bkdlConfig2);
dlMetadata2.create(createURI("/messaging/distributedlog-testresolve/child"));
assertEquals(dlMetadata,
resolver.resolve(createURI("/messaging/distributedlog-testresolve")));
assertEquals(dlMetadata2,
resolver.resolve(createURI("/messaging/distributedlog-testresolve/child")));
assertEquals(dlMetadata2,
resolver.resolve(createURI("/messaging/distributedlog-testresolve/child/unknown")));
Utils.zkCreateFullPathOptimistic(zkc, "/messaging/distributedlog-testresolve/child/child2", new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
assertEquals(dlMetadata2,
resolver.resolve(createURI("/messaging/distributedlog-testresolve/child/child2")));
}
@Test(timeout = 60000)
public void testEncodeRegionID() throws Exception {
DistributedLogConfiguration dlConf = new DistributedLogConfiguration();
URI uri = createURI("/messaging/distributedlog-testencoderegionid/dl1");
DLMetadata meta1 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers"));
meta1.create(uri);
BKDLConfig read1 = BKDLConfig.resolveDLConfig(zkc, uri);
BKDLConfig.propagateConfiguration(read1, dlConf);
assertFalse(dlConf.getEncodeRegionIDInLogSegmentMetadata());
BKDLConfig.clearCachedDLConfigs();
DLMetadata meta2 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers").setEncodeRegionID(true));
meta2.update(uri);
BKDLConfig read2 = BKDLConfig.resolveDLConfig(zkc, uri);
BKDLConfig.propagateConfiguration(read2, dlConf);
assertTrue(dlConf.getEncodeRegionIDInLogSegmentMetadata());
BKDLConfig.clearCachedDLConfigs();
DLMetadata meta3 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers").setEncodeRegionID(false));
meta3.update(uri);
BKDLConfig read3 = BKDLConfig.resolveDLConfig(zkc, uri);
BKDLConfig.propagateConfiguration(read3, dlConf);
assertFalse(dlConf.getEncodeRegionIDInLogSegmentMetadata());
BKDLConfig.clearCachedDLConfigs();
}
@Test(timeout = 60000)
public void testFirstLogSegmentSequenceNumber() throws Exception {
DistributedLogConfiguration dlConf = new DistributedLogConfiguration();
URI uri = createURI("/messaging/distributedlog-testfirstledgerseqno/dl1");
DLMetadata meta1 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers"));
meta1.create(uri);
BKDLConfig read1 = BKDLConfig.resolveDLConfig(zkc, uri);
BKDLConfig.propagateConfiguration(read1, dlConf);
assertEquals(DistributedLogConstants.FIRST_LOGSEGMENT_SEQNO, dlConf.getFirstLogSegmentSequenceNumber());
BKDLConfig.clearCachedDLConfigs();
DLMetadata meta2 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers")
.setFirstLogSegmentSeqNo(9999L));
meta2.update(uri);
BKDLConfig read2 = BKDLConfig.resolveDLConfig(zkc, uri);
BKDLConfig.propagateConfiguration(read2, dlConf);
assertEquals(9999L, dlConf.getFirstLogSegmentSequenceNumber());
BKDLConfig.clearCachedDLConfigs();
DLMetadata meta3 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers")
.setFirstLogSegmentSeqNo(99L));
meta3.update(uri);
BKDLConfig read3 = BKDLConfig.resolveDLConfig(zkc, uri);
BKDLConfig.propagateConfiguration(read3, dlConf);
assertEquals(99L, dlConf.getFirstLogSegmentSequenceNumber());
BKDLConfig.clearCachedDLConfigs();
}
@Test(timeout = 60000)
public void testFederatedNamespace() throws Exception {
DistributedLogConfiguration dlConf = new DistributedLogConfiguration();
URI uri = createURI("/messaging/distributedlog-testfederatednamespace/dl1");
DLMetadata meta1 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers"));
meta1.create(uri);
BKDLConfig read1 = BKDLConfig.resolveDLConfig(zkc, uri);
BKDLConfig.propagateConfiguration(read1, dlConf);
assertTrue(dlConf.getCreateStreamIfNotExists());
BKDLConfig.clearCachedDLConfigs();
DLMetadata meta2 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers")
.setFederatedNamespace(true));
meta2.update(uri);
BKDLConfig read2 = BKDLConfig.resolveDLConfig(zkc, uri);
BKDLConfig.propagateConfiguration(read2, dlConf);
assertFalse(dlConf.getCreateStreamIfNotExists());
BKDLConfig.clearCachedDLConfigs();
DLMetadata meta3 = DLMetadata.create(new BKDLConfig("127.0.0.1:7000", "ledgers")
.setFederatedNamespace(false));
meta3.update(uri);
BKDLConfig read3 = BKDLConfig.resolveDLConfig(zkc, uri);
BKDLConfig.propagateConfiguration(read3, dlConf);
// if it is non-federated namespace, it won't change the create stream behavior.
assertFalse(dlConf.getCreateStreamIfNotExists());
BKDLConfig.clearCachedDLConfigs();
}
}