| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package com.twitter.distributedlog.service; |
| |
| import static com.google.common.base.Charsets.UTF_8; |
| import static com.twitter.distributedlog.LogRecord.MAX_LOGRECORD_SIZE; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import com.google.common.base.Optional; |
| import com.twitter.distributedlog.AsyncLogReader; |
| import com.twitter.distributedlog.DLMTestUtil; |
| import com.twitter.distributedlog.DLSN; |
| import com.twitter.distributedlog.DistributedLogManager; |
| import com.twitter.distributedlog.LogReader; |
| import com.twitter.distributedlog.LogRecord; |
| import com.twitter.distributedlog.LogRecordWithDLSN; |
| import com.twitter.distributedlog.TestZooKeeperClientBuilder; |
| import com.twitter.distributedlog.ZooKeeperClient; |
| import com.twitter.distributedlog.acl.AccessControlManager; |
| import com.twitter.distributedlog.annotations.DistributedLogAnnotations; |
| import com.twitter.distributedlog.client.routing.LocalRoutingService; |
| import com.twitter.distributedlog.exceptions.DLException; |
| import com.twitter.distributedlog.exceptions.LogNotFoundException; |
| import com.twitter.distributedlog.impl.acl.ZKAccessControl; |
| import com.twitter.distributedlog.impl.metadata.BKDLConfig; |
| import com.twitter.distributedlog.namespace.DistributedLogNamespace; |
| import com.twitter.distributedlog.service.stream.StreamManagerImpl; |
| import com.twitter.distributedlog.thrift.AccessControlEntry; |
| import com.twitter.distributedlog.thrift.service.BulkWriteResponse; |
| import com.twitter.distributedlog.thrift.service.HeartbeatOptions; |
| import com.twitter.distributedlog.thrift.service.StatusCode; |
| import com.twitter.distributedlog.thrift.service.WriteContext; |
| import com.twitter.distributedlog.util.FailpointUtils; |
| import com.twitter.distributedlog.util.FutureUtils; |
| import com.twitter.finagle.builder.ClientBuilder; |
| import com.twitter.finagle.thrift.ClientId$; |
| import com.twitter.util.Await; |
| import com.twitter.util.Duration; |
| import com.twitter.util.Future; |
| import com.twitter.util.Futures; |
| import java.net.SocketAddress; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import org.junit.Ignore; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.TestName; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Test Case for {@link DistributedLogServer}. |
| */ |
| public abstract class TestDistributedLogServerBase extends DistributedLogServerTestCase { |
| |
| private static final Logger logger = LoggerFactory.getLogger(TestDistributedLogServerBase.class); |
| |
| @Rule |
| public TestName testName = new TestName(); |
| |
| protected TestDistributedLogServerBase(boolean clientSideRouting) { |
| super(clientSideRouting); |
| } |
| |
| /** |
| * {@link https://issues.apache.org/jira/browse/DL-27}. |
| */ |
| @DistributedLogAnnotations.FlakyTest |
| @Ignore |
| @Test(timeout = 60000) |
| public void testBasicWrite() throws Exception { |
| String name = "dlserver-basic-write"; |
| |
| dlClient.routingService.addHost(name, dlServer.getAddress()); |
| |
| for (long i = 1; i <= 10; i++) { |
| logger.debug("Write entry {} to stream {}.", i, name); |
| Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes()))); |
| } |
| |
| HeartbeatOptions hbOptions = new HeartbeatOptions(); |
| hbOptions.setSendHeartBeatToReader(true); |
| // make sure the first log segment of each stream created |
| FutureUtils.result(dlClient.dlClient.heartbeat(name)); |
| |
| DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri()); |
| LogReader reader = dlm.getInputStream(1); |
| int numRead = 0; |
| LogRecord r = reader.readNext(false); |
| while (null != r) { |
| ++numRead; |
| int i = Integer.parseInt(new String(r.getPayload())); |
| assertEquals(numRead, i); |
| r = reader.readNext(false); |
| } |
| assertEquals(10, numRead); |
| reader.close(); |
| dlm.close(); |
| } |
| |
| /** |
| * Sanity check to make sure both checksum flag values work. |
| */ |
| @Test(timeout = 60000) |
| public void testChecksumFlag() throws Exception { |
| String name = "testChecksumFlag"; |
| LocalRoutingService routingService = LocalRoutingService.newBuilder().build(); |
| routingService.addHost(name, dlServer.getAddress()); |
| DistributedLogClientBuilder dlClientBuilder = DistributedLogClientBuilder.newBuilder() |
| .name(name) |
| .clientId(ClientId$.MODULE$.apply("test")) |
| .routingService(routingService) |
| .handshakeWithClientInfo(true) |
| .clientBuilder(ClientBuilder.get() |
| .hostConnectionLimit(1) |
| .connectionTimeout(Duration.fromSeconds(1)) |
| .requestTimeout(Duration.fromSeconds(60))) |
| .checksum(false); |
| DistributedLogClient dlClient = dlClientBuilder.build(); |
| Await.result(dlClient.write(name, ByteBuffer.wrap(("1").getBytes()))); |
| dlClient.close(); |
| |
| dlClient = dlClientBuilder.checksum(true).build(); |
| Await.result(dlClient.write(name, ByteBuffer.wrap(("2").getBytes()))); |
| dlClient.close(); |
| } |
| |
| private void runSimpleBulkWriteTest(int writeCount) throws Exception { |
| String name = String.format("dlserver-bulk-write-%d", writeCount); |
| |
| dlClient.routingService.addHost(name, dlServer.getAddress()); |
| |
| List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount); |
| for (long i = 1; i <= writeCount; i++) { |
| writes.add(ByteBuffer.wrap(("" + i).getBytes())); |
| } |
| |
| logger.debug("Write {} entries to stream {}.", writeCount, name); |
| List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes); |
| assertEquals(futures.size(), writeCount); |
| for (Future<DLSN> future : futures) { |
| // No throw == pass. |
| DLSN dlsn = Await.result(future, Duration.fromSeconds(10)); |
| } |
| |
| DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri()); |
| LogReader reader = dlm.getInputStream(1); |
| int numRead = 0; |
| LogRecord r = reader.readNext(false); |
| while (null != r) { |
| int i = Integer.parseInt(new String(r.getPayload())); |
| assertEquals(numRead + 1, i); |
| ++numRead; |
| r = reader.readNext(false); |
| } |
| assertEquals(writeCount, numRead); |
| reader.close(); |
| dlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testBulkWrite() throws Exception { |
| runSimpleBulkWriteTest(100); |
| } |
| |
| @Test(timeout = 60000) |
| public void testBulkWriteSingleWrite() throws Exception { |
| runSimpleBulkWriteTest(1); |
| } |
| |
| @Test(timeout = 60000) |
| public void testBulkWriteEmptyList() throws Exception { |
| String name = String.format("dlserver-bulk-write-%d", 0); |
| |
| dlClient.routingService.addHost(name, dlServer.getAddress()); |
| |
| List<ByteBuffer> writes = new ArrayList<ByteBuffer>(); |
| List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes); |
| |
| assertEquals(0, futures.size()); |
| } |
| |
| @Test(timeout = 60000) |
| public void testBulkWriteNullArg() throws Exception { |
| |
| String name = String.format("dlserver-bulk-write-%s", "null"); |
| |
| dlClient.routingService.addHost(name, dlServer.getAddress()); |
| |
| List<ByteBuffer> writes = new ArrayList<ByteBuffer>(); |
| writes.add(null); |
| |
| try { |
| dlClient.dlClient.writeBulk(name, writes); |
| fail("should not have succeeded"); |
| } catch (NullPointerException npe) { |
| // expected |
| logger.info("Expected to catch NullPointException."); |
| } |
| } |
| |
| @Test(timeout = 60000) |
| public void testBulkWriteEmptyBuffer() throws Exception { |
| String name = String.format("dlserver-bulk-write-%s", "empty"); |
| |
| dlClient.routingService.addHost(name, dlServer.getAddress()); |
| |
| List<ByteBuffer> writes = new ArrayList<ByteBuffer>(); |
| writes.add(ByteBuffer.wrap(("").getBytes())); |
| writes.add(ByteBuffer.wrap(("").getBytes())); |
| List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes); |
| assertEquals(2, futures.size()); |
| for (Future<DLSN> future : futures) { |
| // No throw == pass |
| DLSN dlsn = Await.result(future, Duration.fromSeconds(10)); |
| } |
| } |
| |
| void failDueToWrongException(Exception ex) { |
| logger.info("testBulkWritePartialFailure: ", ex); |
| fail(String.format("failed with wrong exception %s", ex.getClass().getName())); |
| } |
| |
| int validateAllFailedAsCancelled(List<Future<DLSN>> futures, int start, int finish) { |
| int failed = 0; |
| for (int i = start; i < finish; i++) { |
| Future<DLSN> future = futures.get(i); |
| try { |
| Await.result(future, Duration.fromSeconds(10)); |
| fail("future should have failed!"); |
| } catch (DLException cre) { |
| ++failed; |
| } catch (Exception ex) { |
| failDueToWrongException(ex); |
| } |
| } |
| return failed; |
| } |
| |
| void validateFailedAsLogRecordTooLong(Future<DLSN> future) { |
| try { |
| Await.result(future, Duration.fromSeconds(10)); |
| fail("should have failed"); |
| } catch (DLException dle) { |
| assertEquals(StatusCode.TOO_LARGE_RECORD, dle.getCode()); |
| } catch (Exception ex) { |
| failDueToWrongException(ex); |
| } |
| } |
| |
| @Test(timeout = 60000) |
| public void testBulkWritePartialFailure() throws Exception { |
| String name = String.format("dlserver-bulk-write-%s", "partial-failure"); |
| |
| dlClient.routingService.addHost(name, dlServer.getAddress()); |
| |
| final int writeCount = 100; |
| |
| List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount * 2 + 1); |
| for (long i = 1; i <= writeCount; i++) { |
| writes.add(ByteBuffer.wrap(("" + i).getBytes())); |
| } |
| // Too big, will cause partial failure. |
| ByteBuffer buf = ByteBuffer.allocate(MAX_LOGRECORD_SIZE + 1); |
| writes.add(buf); |
| for (long i = 1; i <= writeCount; i++) { |
| writes.add(ByteBuffer.wrap(("" + i).getBytes())); |
| } |
| |
| // Count succeeded. |
| List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes); |
| int succeeded = 0; |
| for (int i = 0; i < writeCount; i++) { |
| Future<DLSN> future = futures.get(i); |
| try { |
| Await.result(future, Duration.fromSeconds(10)); |
| ++succeeded; |
| } catch (Exception ex) { |
| failDueToWrongException(ex); |
| } |
| } |
| |
| validateFailedAsLogRecordTooLong(futures.get(writeCount)); |
| FutureUtils.result(Futures.collect(futures.subList(writeCount + 1, 2 * writeCount + 1))); |
| assertEquals(writeCount, succeeded); |
| } |
| |
| @Test(timeout = 60000) |
| public void testBulkWriteTotalFailureFirstWriteFailed() throws Exception { |
| String name = String.format("dlserver-bulk-write-%s", "first-write-failed"); |
| |
| dlClient.routingService.addHost(name, dlServer.getAddress()); |
| |
| final int writeCount = 100; |
| List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount + 1); |
| ByteBuffer buf = ByteBuffer.allocate(MAX_LOGRECORD_SIZE + 1); |
| writes.add(buf); |
| for (long i = 1; i <= writeCount; i++) { |
| writes.add(ByteBuffer.wrap(("" + i).getBytes())); |
| } |
| |
| List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes); |
| validateFailedAsLogRecordTooLong(futures.get(0)); |
| FutureUtils.result(Futures.collect(futures.subList(1, writeCount + 1))); |
| } |
| |
| @Test(timeout = 60000) |
| public void testBulkWriteTotalFailureLostLock() throws Exception { |
| String name = String.format("dlserver-bulk-write-%s", "lost-lock"); |
| |
| dlClient.routingService.addHost(name, dlServer.getAddress()); |
| |
| final int writeCount = 8; |
| List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount + 1); |
| ByteBuffer buf = ByteBuffer.allocate(8); |
| writes.add(buf); |
| for (long i = 1; i <= writeCount; i++) { |
| writes.add(ByteBuffer.wrap(("" + i).getBytes())); |
| } |
| // Warm it up with a write. |
| Await.result(dlClient.dlClient.write(name, ByteBuffer.allocate(8))); |
| |
| // Failpoint a lost lock, make sure the failure gets promoted to an operation failure. |
| DistributedLogServiceImpl svcImpl = (DistributedLogServiceImpl) dlServer.dlServer.getLeft(); |
| try { |
| FailpointUtils.setFailpoint( |
| FailpointUtils.FailPointName.FP_WriteInternalLostLock, |
| FailpointUtils.FailPointActions.FailPointAction_Default |
| ); |
| Future<BulkWriteResponse> futures = svcImpl.writeBulkWithContext(name, writes, new WriteContext()); |
| assertEquals(StatusCode.LOCKING_EXCEPTION, Await.result(futures).header.code); |
| } finally { |
| FailpointUtils.removeFailpoint( |
| FailpointUtils.FailPointName.FP_WriteInternalLostLock |
| ); |
| } |
| } |
| |
| @Test(timeout = 60000) |
| public void testHeartbeat() throws Exception { |
| String name = "dlserver-heartbeat"; |
| |
| dlClient.routingService.addHost(name, dlServer.getAddress()); |
| |
| for (long i = 1; i <= 10; i++) { |
| logger.debug("Send heartbeat {} to stream {}.", i, name); |
| dlClient.dlClient.check(name).get(); |
| } |
| |
| logger.debug("Write entry one to stream {}.", name); |
| dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes())).get(); |
| |
| Thread.sleep(1000); |
| |
| DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri()); |
| LogReader reader = dlm.getInputStream(DLSN.InitialDLSN); |
| int numRead = 0; |
| // eid=0 => control records |
| // other 9 heartbeats will not trigger writing any control records. |
| // eid=1 => user entry |
| long startEntryId = 1; |
| LogRecordWithDLSN r = reader.readNext(false); |
| while (null != r) { |
| int i = Integer.parseInt(new String(r.getPayload())); |
| assertEquals(numRead + 1, i); |
| assertEquals(r.getDlsn().compareTo(new DLSN(1, startEntryId, 0)), 0); |
| ++numRead; |
| ++startEntryId; |
| r = reader.readNext(false); |
| } |
| assertEquals(1, numRead); |
| } |
| |
| @Test(timeout = 60000) |
| public void testFenceWrite() throws Exception { |
| String name = "dlserver-fence-write"; |
| |
| dlClient.routingService.addHost(name, dlServer.getAddress()); |
| |
| for (long i = 1; i <= 10; i++) { |
| logger.debug("Write entry {} to stream {}.", i, name); |
| dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes())).get(); |
| } |
| |
| Thread.sleep(1000); |
| |
| logger.info("Fencing stream {}.", name); |
| DLMTestUtil.fenceStream(conf, getUri(), name); |
| logger.info("Fenced stream {}.", name); |
| |
| for (long i = 11; i <= 20; i++) { |
| logger.debug("Write entry {} to stream {}.", i, name); |
| dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes())).get(); |
| } |
| |
| DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri()); |
| LogReader reader = dlm.getInputStream(1); |
| int numRead = 0; |
| LogRecord r = reader.readNext(false); |
| while (null != r) { |
| int i = Integer.parseInt(new String(r.getPayload())); |
| assertEquals(numRead + 1, i); |
| ++numRead; |
| r = reader.readNext(false); |
| } |
| assertEquals(20, numRead); |
| reader.close(); |
| dlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testDeleteStream() throws Exception { |
| String name = "dlserver-delete-stream"; |
| |
| dlClient.routingService.addHost(name, dlServer.getAddress()); |
| |
| long txid = 101; |
| for (long i = 1; i <= 10; i++) { |
| long curTxId = txid++; |
| logger.debug("Write entry {} to stream {}.", curTxId, name); |
| dlClient.dlClient.write(name, |
| ByteBuffer.wrap(("" + curTxId).getBytes())).get(); |
| } |
| |
| checkStream(1, 1, 1, name, dlServer.getAddress(), true, true); |
| |
| dlClient.dlClient.delete(name).get(); |
| |
| checkStream(0, 0, 0, name, dlServer.getAddress(), false, false); |
| |
| Thread.sleep(1000); |
| |
| DistributedLogManager dlm101 = DLMTestUtil.createNewDLM(name, conf, getUri()); |
| AsyncLogReader reader101 = FutureUtils.result(dlm101.openAsyncLogReader(DLSN.InitialDLSN)); |
| try { |
| FutureUtils.result(reader101.readNext()); |
| fail("Should fail with LogNotFoundException since the stream is deleted"); |
| } catch (LogNotFoundException lnfe) { |
| // expected |
| } |
| FutureUtils.result(reader101.asyncClose()); |
| dlm101.close(); |
| |
| txid = 201; |
| for (long i = 1; i <= 10; i++) { |
| long curTxId = txid++; |
| logger.debug("Write entry {} to stream {}.", curTxId, name); |
| DLSN dlsn = dlClient.dlClient.write(name, |
| ByteBuffer.wrap(("" + curTxId).getBytes())).get(); |
| } |
| Thread.sleep(1000); |
| |
| DistributedLogManager dlm201 = DLMTestUtil.createNewDLM(name, conf, getUri()); |
| LogReader reader201 = dlm201.getInputStream(1); |
| int numRead = 0; |
| int curTxId = 201; |
| LogRecord r = reader201.readNext(false); |
| while (null != r) { |
| int i = Integer.parseInt(new String(r.getPayload())); |
| assertEquals(curTxId++, i); |
| ++numRead; |
| r = reader201.readNext(false); |
| } |
| assertEquals(10, numRead); |
| reader201.close(); |
| dlm201.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testCreateStream() throws Exception { |
| try { |
| setupNoAdHocCluster(); |
| final String name = "dlserver-create-stream"; |
| |
| noAdHocClient.routingService.addHost("dlserver-create-stream", noAdHocServer.getAddress()); |
| assertFalse(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name)); |
| assertTrue(Await.ready(noAdHocClient.dlClient.create(name)).isReturn()); |
| |
| long txid = 101; |
| for (long i = 1; i <= 10; i++) { |
| long curTxId = txid++; |
| logger.debug("Write entry {} to stream {}.", curTxId, name); |
| noAdHocClient.dlClient.write(name, |
| ByteBuffer.wrap(("" + curTxId).getBytes())).get(); |
| } |
| |
| assertTrue(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name)); |
| } finally { |
| tearDownNoAdHocCluster(); |
| } |
| } |
| |
| /** |
| * This tests that create has touch like behavior in that trying to create the stream twice, simply does nothing. |
| */ |
| @Test(timeout = 60000) |
| public void testCreateStreamTwice() throws Exception { |
| try { |
| setupNoAdHocCluster(); |
| final String name = "dlserver-create-stream-twice"; |
| |
| noAdHocClient.routingService.addHost("dlserver-create-stream-twice", noAdHocServer.getAddress()); |
| assertFalse(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name)); |
| assertTrue(Await.ready(noAdHocClient.dlClient.create(name)).isReturn()); |
| |
| long txid = 101; |
| for (long i = 1; i <= 10; i++) { |
| long curTxId = txid++; |
| logger.debug("Write entry {} to stream {}.", curTxId, name); |
| noAdHocClient.dlClient.write(name, |
| ByteBuffer.wrap(("" + curTxId).getBytes())).get(); |
| } |
| |
| assertTrue(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name)); |
| |
| // create again |
| assertTrue(Await.ready(noAdHocClient.dlClient.create(name)).isReturn()); |
| assertTrue(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name)); |
| } finally { |
| tearDownNoAdHocCluster(); |
| } |
| } |
| |
| |
| |
| @Test(timeout = 60000) |
| public void testTruncateStream() throws Exception { |
| String name = "dlserver-truncate-stream"; |
| |
| dlClient.routingService.addHost(name, dlServer.getAddress()); |
| |
| long txid = 1; |
| Map<Long, DLSN> txid2DLSN = new HashMap<Long, DLSN>(); |
| for (int s = 1; s <= 2; s++) { |
| for (long i = 1; i <= 10; i++) { |
| long curTxId = txid++; |
| logger.debug("Write entry {} to stream {}.", curTxId, name); |
| DLSN dlsn = dlClient.dlClient.write(name, |
| ByteBuffer.wrap(("" + curTxId).getBytes())).get(); |
| txid2DLSN.put(curTxId, dlsn); |
| } |
| if (s == 1) { |
| dlClient.dlClient.release(name).get(); |
| } |
| } |
| |
| DLSN dlsnToDelete = txid2DLSN.get(11L); |
| dlClient.dlClient.truncate(name, dlsnToDelete).get(); |
| |
| DistributedLogManager readDLM = DLMTestUtil.createNewDLM(name, conf, getUri()); |
| LogReader reader = readDLM.getInputStream(1); |
| int numRead = 0; |
| int curTxId = 11; |
| LogRecord r = reader.readNext(false); |
| while (null != r) { |
| int i = Integer.parseInt(new String(r.getPayload())); |
| assertEquals(curTxId++, i); |
| ++numRead; |
| r = reader.readNext(false); |
| } |
| assertEquals(10, numRead); |
| reader.close(); |
| readDLM.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testRequestDenied() throws Exception { |
| String name = "request-denied"; |
| |
| dlClient.routingService.addHost(name, dlServer.getAddress()); |
| |
| AccessControlEntry ace = new AccessControlEntry(); |
| ace.setDenyWrite(true); |
| ZooKeeperClient zkc = TestZooKeeperClientBuilder |
| .newBuilder() |
| .uri(getUri()) |
| .connectionTimeoutMs(60000) |
| .sessionTimeoutMs(60000) |
| .build(); |
| DistributedLogNamespace dlNamespace = dlServer.dlServer.getLeft().getDistributedLogNamespace(); |
| BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(zkc, getUri()); |
| String zkPath = getUri().getPath() + "/" + bkdlConfig.getACLRootPath() + "/" + name; |
| ZKAccessControl accessControl = new ZKAccessControl(ace, zkPath); |
| accessControl.create(zkc); |
| |
| AccessControlManager acm = dlNamespace.createAccessControlManager(); |
| while (acm.allowWrite(name)) { |
| Thread.sleep(100); |
| } |
| |
| try { |
| Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8)))); |
| fail("Should fail with request denied exception"); |
| } catch (DLException dle) { |
| assertEquals(StatusCode.REQUEST_DENIED, dle.getCode()); |
| } |
| } |
| |
| @Test(timeout = 60000) |
| public void testNoneStreamNameRegex() throws Exception { |
| String streamNamePrefix = "none-stream-name-regex-"; |
| int numStreams = 5; |
| Set<String> streams = new HashSet<String>(); |
| |
| for (int i = 0; i < numStreams; i++) { |
| streams.add(streamNamePrefix + i); |
| } |
| testStreamNameRegex(streams, ".*", streams); |
| } |
| |
| @Test(timeout = 60000) |
| public void testStreamNameRegex() throws Exception { |
| String streamNamePrefix = "stream-name-regex-"; |
| int numStreams = 5; |
| Set<String> streams = new HashSet<String>(); |
| Set<String> expectedStreams = new HashSet<String>(); |
| String streamNameRegex = streamNamePrefix + "1"; |
| |
| for (int i = 0; i < numStreams; i++) { |
| streams.add(streamNamePrefix + i); |
| } |
| expectedStreams.add(streamNamePrefix + "1"); |
| |
| testStreamNameRegex(streams, streamNameRegex, expectedStreams); |
| } |
| |
| private void testStreamNameRegex(Set<String> streams, String streamNameRegex, |
| Set<String> expectedStreams) |
| throws Exception { |
| for (String streamName : streams) { |
| dlClient.routingService.addHost(streamName, dlServer.getAddress()); |
| Await.result(dlClient.dlClient.write(streamName, |
| ByteBuffer.wrap(streamName.getBytes(UTF_8)))); |
| } |
| |
| DLClient client = createDistributedLogClient( |
| "test-stream-name-regex", |
| streamNameRegex, |
| Optional.<String>absent()); |
| try { |
| client.routingService.addHost("unknown", dlServer.getAddress()); |
| client.handshake(); |
| Map<SocketAddress, Set<String>> distribution = |
| client.dlClient.getStreamOwnershipDistribution(); |
| assertEquals(1, distribution.size()); |
| Set<String> cachedStreams = distribution.values().iterator().next(); |
| assertNotNull(cachedStreams); |
| assertEquals(expectedStreams.size(), cachedStreams.size()); |
| |
| for (String streamName : cachedStreams) { |
| assertTrue(expectedStreams.contains(streamName)); |
| } |
| } finally { |
| client.shutdown(); |
| } |
| } |
| |
| @Test(timeout = 60000) |
| public void testReleaseStream() throws Exception { |
| String name = "dlserver-release-stream"; |
| |
| dlClient.routingService.addHost(name, dlServer.getAddress()); |
| |
| Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8)))); |
| checkStream(1, 1, 1, name, dlServer.getAddress(), true, true); |
| |
| // release the stream |
| Await.result(dlClient.dlClient.release(name)); |
| checkStream(0, 0, 0, name, dlServer.getAddress(), false, false); |
| } |
| |
| protected void checkStream(int expectedNumProxiesInClient, int expectedClientCacheSize, int expectedServerCacheSize, |
| String name, SocketAddress owner, boolean existedInServer, boolean existedInClient) { |
| Map<SocketAddress, Set<String>> distribution = dlClient.dlClient.getStreamOwnershipDistribution(); |
| assertEquals(expectedNumProxiesInClient, distribution.size()); |
| |
| if (expectedNumProxiesInClient > 0) { |
| Map.Entry<SocketAddress, Set<String>> localEntry = |
| distribution.entrySet().iterator().next(); |
| assertEquals(owner, localEntry.getKey()); |
| assertEquals(expectedClientCacheSize, localEntry.getValue().size()); |
| assertEquals(existedInClient, localEntry.getValue().contains(name)); |
| } |
| |
| |
| StreamManagerImpl streamManager = (StreamManagerImpl) dlServer.dlServer.getKey().getStreamManager(); |
| Set<String> cachedStreams = streamManager.getCachedStreams().keySet(); |
| Set<String> acquiredStreams = streamManager.getCachedStreams().keySet(); |
| |
| assertEquals(expectedServerCacheSize, cachedStreams.size()); |
| assertEquals(existedInServer, cachedStreams.contains(name)); |
| assertEquals(expectedServerCacheSize, acquiredStreams.size()); |
| assertEquals(existedInServer, acquiredStreams.contains(name)); |
| } |
| |
| } |