| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package com.twitter.distributedlog.service; |
| |
| import static com.google.common.base.Charsets.UTF_8; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import com.google.common.collect.Lists; |
| import com.twitter.distributedlog.DLSN; |
| import com.twitter.distributedlog.DistributedLogConfiguration; |
| import com.twitter.distributedlog.TestDistributedLogBase; |
| import com.twitter.distributedlog.acl.DefaultAccessControlManager; |
| import com.twitter.distributedlog.client.routing.LocalRoutingService; |
| import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException; |
| import com.twitter.distributedlog.exceptions.StreamUnavailableException; |
| import com.twitter.distributedlog.service.config.NullStreamConfigProvider; |
| import com.twitter.distributedlog.service.config.ServerConfiguration; |
| import com.twitter.distributedlog.service.placement.EqualLoadAppraiser; |
| import com.twitter.distributedlog.service.stream.Stream; |
| import com.twitter.distributedlog.service.stream.StreamImpl; |
| import com.twitter.distributedlog.service.stream.StreamImpl.StreamStatus; |
| import com.twitter.distributedlog.service.stream.StreamManagerImpl; |
| import com.twitter.distributedlog.service.stream.WriteOp; |
| import com.twitter.distributedlog.service.streamset.DelimiterStreamPartitionConverter; |
| import com.twitter.distributedlog.service.streamset.IdentityStreamPartitionConverter; |
| import com.twitter.distributedlog.service.streamset.StreamPartitionConverter; |
| import com.twitter.distributedlog.thrift.service.HeartbeatOptions; |
| import com.twitter.distributedlog.thrift.service.StatusCode; |
| import com.twitter.distributedlog.thrift.service.WriteContext; |
| import com.twitter.distributedlog.thrift.service.WriteResponse; |
| import com.twitter.distributedlog.util.ConfUtils; |
| import com.twitter.distributedlog.util.FutureUtils; |
| import com.twitter.distributedlog.util.ProtocolUtils; |
| import com.twitter.util.Await; |
| import com.twitter.util.Future; |
| import java.net.URI; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import org.apache.bookkeeper.feature.SettableFeature; |
| import org.apache.bookkeeper.stats.NullStatsLogger; |
| import org.apache.bookkeeper.util.ReflectionUtils; |
| import org.apache.commons.configuration.ConfigurationException; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.TestName; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Test Case for DistributedLog Service. |
| */ |
| public class TestDistributedLogService extends TestDistributedLogBase { |
| |
| private static final Logger logger = LoggerFactory.getLogger(TestDistributedLogService.class); |
| |
| @Rule |
| public TestName testName = new TestName(); |
| |
| private ServerConfiguration serverConf; |
| private DistributedLogConfiguration dlConf; |
| private URI uri; |
| private final CountDownLatch latch = new CountDownLatch(1); |
| private DistributedLogServiceImpl service; |
| |
| @Before |
| @Override |
| public void setup() throws Exception { |
| super.setup(); |
| dlConf = new DistributedLogConfiguration(); |
| dlConf.addConfiguration(conf); |
| dlConf.setLockTimeout(0) |
| .setOutputBufferSize(0) |
| .setPeriodicFlushFrequencyMilliSeconds(10) |
| .setSchedulerShutdownTimeoutMs(100); |
| serverConf = newLocalServerConf(); |
| uri = createDLMURI("/" + testName.getMethodName()); |
| ensureURICreated(uri); |
| service = createService(serverConf, dlConf, latch); |
| } |
| |
| @After |
| @Override |
| public void teardown() throws Exception { |
| if (null != service) { |
| service.shutdown(); |
| } |
| super.teardown(); |
| } |
| |
| private DistributedLogConfiguration newLocalConf() { |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.addConfiguration(dlConf); |
| return confLocal; |
| } |
| |
| private ServerConfiguration newLocalServerConf() { |
| ServerConfiguration serverConf = new ServerConfiguration(); |
| serverConf.loadConf(dlConf); |
| serverConf.setServerThreads(1); |
| return serverConf; |
| } |
| |
| private DistributedLogServiceImpl createService( |
| ServerConfiguration serverConf, |
| DistributedLogConfiguration dlConf) throws Exception { |
| return createService(serverConf, dlConf, new CountDownLatch(1)); |
| } |
| |
| private DistributedLogServiceImpl createService( |
| ServerConfiguration serverConf, |
| DistributedLogConfiguration dlConf, |
| CountDownLatch latch) throws Exception { |
| // Build the stream partition converter |
| StreamPartitionConverter converter; |
| try { |
| converter = ReflectionUtils.newInstance(serverConf.getStreamPartitionConverterClass()); |
| } catch (ConfigurationException e) { |
| logger.warn("Failed to load configured stream-to-partition converter. Fallback to use {}", |
| IdentityStreamPartitionConverter.class.getName()); |
| converter = new IdentityStreamPartitionConverter(); |
| } |
| return new DistributedLogServiceImpl( |
| serverConf, |
| dlConf, |
| ConfUtils.getConstDynConf(dlConf), |
| new NullStreamConfigProvider(), |
| uri, |
| converter, |
| new LocalRoutingService(), |
| NullStatsLogger.INSTANCE, |
| NullStatsLogger.INSTANCE, |
| latch, |
| new EqualLoadAppraiser()); |
| } |
| |
| private StreamImpl createUnstartedStream(DistributedLogServiceImpl service, |
| String name) throws Exception { |
| StreamImpl stream = (StreamImpl) service.newStream(name); |
| stream.initialize(); |
| return stream; |
| } |
| |
| private ByteBuffer createRecord(long txid) { |
| return ByteBuffer.wrap(("record-" + txid).getBytes(UTF_8)); |
| } |
| |
| private WriteOp createWriteOp(DistributedLogServiceImpl service, |
| String streamName, |
| long txid) { |
| ByteBuffer data = createRecord(txid); |
| return service.newWriteOp(streamName, data, null); |
| } |
| |
| @Test(timeout = 60000) |
| public void testAcquireStreams() throws Exception { |
| String streamName = testName.getMethodName(); |
| StreamImpl s0 = createUnstartedStream(service, streamName); |
| ServerConfiguration serverConf1 = new ServerConfiguration(); |
| serverConf1.addConfiguration(serverConf); |
| serverConf1.setServerPort(9999); |
| DistributedLogServiceImpl service1 = createService(serverConf1, dlConf); |
| StreamImpl s1 = createUnstartedStream(service1, streamName); |
| |
| // create write ops |
| WriteOp op0 = createWriteOp(service, streamName, 0L); |
| s0.submit(op0); |
| |
| WriteOp op1 = createWriteOp(service1, streamName, 1L); |
| s1.submit(op1); |
| |
| // check pending size |
| assertEquals("Write Op 0 should be pending in service 0", |
| 1, s0.numPendingOps()); |
| assertEquals("Write Op 1 should be pending in service 1", |
| 1, s1.numPendingOps()); |
| |
| // start acquiring s0 |
| s0.start(); |
| WriteResponse wr0 = Await.result(op0.result()); |
| assertEquals("Op 0 should succeed", |
| StatusCode.SUCCESS, wr0.getHeader().getCode()); |
| assertEquals("Service 0 should acquire stream", |
| StreamStatus.INITIALIZED, s0.getStatus()); |
| assertNotNull(s0.getManager()); |
| assertNotNull(s0.getWriter()); |
| assertNull(s0.getLastException()); |
| |
| // start acquiring s1 |
| s1.start(); |
| WriteResponse wr1 = Await.result(op1.result()); |
| assertEquals("Op 1 should fail", |
| StatusCode.FOUND, wr1.getHeader().getCode()); |
| // the stream will be set to ERROR and then be closed. |
| assertTrue("Service 1 should be in unavailable state", |
| StreamStatus.isUnavailable(s1.getStatus())); |
| assertNotNull(s1.getManager()); |
| assertNull(s1.getWriter()); |
| assertNotNull(s1.getLastException()); |
| assertTrue(s1.getLastException() instanceof OwnershipAcquireFailedException); |
| |
| service1.shutdown(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testAcquireStreamsWhenExceedMaxCachedPartitions() throws Exception { |
| String streamName = testName.getMethodName() + "_0000"; |
| |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.addConfiguration(dlConf); |
| confLocal.setMaxCachedPartitionsPerProxy(1); |
| |
| ServerConfiguration serverConfLocal = new ServerConfiguration(); |
| serverConfLocal.addConfiguration(serverConf); |
| serverConfLocal.setStreamPartitionConverterClass(DelimiterStreamPartitionConverter.class); |
| |
| DistributedLogServiceImpl serviceLocal = createService(serverConfLocal, confLocal); |
| Stream stream = serviceLocal.getLogWriter(streamName); |
| |
| // stream is cached |
| assertNotNull(stream); |
| assertEquals(1, serviceLocal.getStreamManager().numCached()); |
| |
| // create write ops |
| WriteOp op0 = createWriteOp(service, streamName, 0L); |
| stream.submit(op0); |
| WriteResponse wr0 = Await.result(op0.result()); |
| assertEquals("Op 0 should succeed", |
| StatusCode.SUCCESS, wr0.getHeader().getCode()); |
| assertEquals(1, serviceLocal.getStreamManager().numAcquired()); |
| |
| // should fail to acquire another partition |
| try { |
| serviceLocal.getLogWriter(testName.getMethodName() + "_0001"); |
| fail("Should fail to acquire new streams"); |
| } catch (StreamUnavailableException sue) { |
| // expected |
| } |
| assertEquals(1, serviceLocal.getStreamManager().numCached()); |
| assertEquals(1, serviceLocal.getStreamManager().numAcquired()); |
| |
| // should be able to acquire partitions from other streams |
| String anotherStreamName = testName.getMethodName() + "-another_0001"; |
| Stream anotherStream = serviceLocal.getLogWriter(anotherStreamName); |
| assertNotNull(anotherStream); |
| assertEquals(2, serviceLocal.getStreamManager().numCached()); |
| |
| // create write ops |
| WriteOp op1 = createWriteOp(service, anotherStreamName, 0L); |
| anotherStream.submit(op1); |
| WriteResponse wr1 = Await.result(op1.result()); |
| assertEquals("Op 1 should succeed", |
| StatusCode.SUCCESS, wr1.getHeader().getCode()); |
| assertEquals(2, serviceLocal.getStreamManager().numAcquired()); |
| } |
| |
| @Test(timeout = 60000) |
| public void testAcquireStreamsWhenExceedMaxAcquiredPartitions() throws Exception { |
| String streamName = testName.getMethodName() + "_0000"; |
| |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.addConfiguration(dlConf); |
| confLocal.setMaxCachedPartitionsPerProxy(-1); |
| confLocal.setMaxAcquiredPartitionsPerProxy(1); |
| |
| ServerConfiguration serverConfLocal = new ServerConfiguration(); |
| serverConfLocal.addConfiguration(serverConf); |
| serverConfLocal.setStreamPartitionConverterClass(DelimiterStreamPartitionConverter.class); |
| |
| DistributedLogServiceImpl serviceLocal = createService(serverConfLocal, confLocal); |
| Stream stream = serviceLocal.getLogWriter(streamName); |
| |
| // stream is cached |
| assertNotNull(stream); |
| assertEquals(1, serviceLocal.getStreamManager().numCached()); |
| |
| // create write ops |
| WriteOp op0 = createWriteOp(service, streamName, 0L); |
| stream.submit(op0); |
| WriteResponse wr0 = Await.result(op0.result()); |
| assertEquals("Op 0 should succeed", |
| StatusCode.SUCCESS, wr0.getHeader().getCode()); |
| assertEquals(1, serviceLocal.getStreamManager().numAcquired()); |
| |
| // should be able to cache partitions from same stream |
| String anotherStreamName = testName.getMethodName() + "_0001"; |
| Stream anotherStream = serviceLocal.getLogWriter(anotherStreamName); |
| assertNotNull(anotherStream); |
| assertEquals(2, serviceLocal.getStreamManager().numCached()); |
| |
| // create write ops |
| WriteOp op1 = createWriteOp(service, anotherStreamName, 0L); |
| anotherStream.submit(op1); |
| WriteResponse wr1 = Await.result(op1.result()); |
| assertEquals("Op 1 should fail", |
| StatusCode.STREAM_UNAVAILABLE, wr1.getHeader().getCode()); |
| assertEquals(1, serviceLocal.getStreamManager().numAcquired()); |
| } |
| |
| @Test(timeout = 60000) |
| public void testCloseShouldErrorOutPendingOps() throws Exception { |
| String streamName = testName.getMethodName(); |
| StreamImpl s = createUnstartedStream(service, streamName); |
| |
| int numWrites = 10; |
| List<Future<WriteResponse>> futureList = new ArrayList<Future<WriteResponse>>(numWrites); |
| for (int i = 0; i < numWrites; i++) { |
| WriteOp op = createWriteOp(service, streamName, i); |
| s.submit(op); |
| futureList.add(op.result()); |
| } |
| assertEquals(numWrites, s.numPendingOps()); |
| Await.result(s.requestClose("close stream")); |
| assertEquals("Stream " + streamName + " is set to " + StreamStatus.CLOSED, |
| StreamStatus.CLOSED, s.getStatus()); |
| for (int i = 0; i < numWrites; i++) { |
| Future<WriteResponse> future = futureList.get(i); |
| WriteResponse wr = Await.result(future); |
| assertEquals("Pending op should fail with " + StatusCode.STREAM_UNAVAILABLE, |
| StatusCode.STREAM_UNAVAILABLE, wr.getHeader().getCode()); |
| } |
| } |
| |
| @Test(timeout = 60000) |
| public void testCloseTwice() throws Exception { |
| String streamName = testName.getMethodName(); |
| StreamImpl s = createUnstartedStream(service, streamName); |
| |
| int numWrites = 10; |
| List<Future<WriteResponse>> futureList = new ArrayList<Future<WriteResponse>>(numWrites); |
| for (int i = 0; i < numWrites; i++) { |
| WriteOp op = createWriteOp(service, streamName, i); |
| s.submit(op); |
| futureList.add(op.result()); |
| } |
| assertEquals(numWrites, s.numPendingOps()); |
| |
| Future<Void> closeFuture0 = s.requestClose("close 0"); |
| assertTrue("Stream " + streamName + " should be set to " + StreamStatus.CLOSING, |
| StreamStatus.CLOSING == s.getStatus() |
| || StreamStatus.CLOSED == s.getStatus()); |
| Future<Void> closeFuture1 = s.requestClose("close 1"); |
| assertTrue("Stream " + streamName + " should be set to " + StreamStatus.CLOSING, |
| StreamStatus.CLOSING == s.getStatus() |
| || StreamStatus.CLOSED == s.getStatus()); |
| |
| Await.result(closeFuture0); |
| assertEquals("Stream " + streamName + " should be set to " + StreamStatus.CLOSED, |
| StreamStatus.CLOSED, s.getStatus()); |
| Await.result(closeFuture1); |
| assertEquals("Stream " + streamName + " should be set to " + StreamStatus.CLOSED, |
| StreamStatus.CLOSED, s.getStatus()); |
| |
| for (int i = 0; i < numWrites; i++) { |
| Future<WriteResponse> future = futureList.get(i); |
| WriteResponse wr = Await.result(future); |
| assertEquals("Pending op should fail with " + StatusCode.STREAM_UNAVAILABLE, |
| StatusCode.STREAM_UNAVAILABLE, wr.getHeader().getCode()); |
| } |
| } |
| |
| @Test(timeout = 60000) |
| public void testFailRequestsDuringClosing() throws Exception { |
| String streamName = testName.getMethodName(); |
| StreamImpl s = createUnstartedStream(service, streamName); |
| |
| Future<Void> closeFuture = s.requestClose("close"); |
| assertTrue("Stream " + streamName + " should be set to " + StreamStatus.CLOSING, |
| StreamStatus.CLOSING == s.getStatus() |
| || StreamStatus.CLOSED == s.getStatus()); |
| WriteOp op1 = createWriteOp(service, streamName, 0L); |
| s.submit(op1); |
| WriteResponse response1 = Await.result(op1.result()); |
| assertEquals("Op should fail with " + StatusCode.STREAM_UNAVAILABLE + " if it is closing", |
| StatusCode.STREAM_UNAVAILABLE, response1.getHeader().getCode()); |
| |
| Await.result(closeFuture); |
| assertEquals("Stream " + streamName + " should be set to " + StreamStatus.CLOSED, |
| StreamStatus.CLOSED, s.getStatus()); |
| WriteOp op2 = createWriteOp(service, streamName, 1L); |
| s.submit(op2); |
| WriteResponse response2 = Await.result(op2.result()); |
| assertEquals("Op should fail with " + StatusCode.STREAM_UNAVAILABLE + " if it is closed", |
| StatusCode.STREAM_UNAVAILABLE, response2.getHeader().getCode()); |
| } |
| |
| @Test(timeout = 60000) |
| public void testServiceTimeout() throws Exception { |
| DistributedLogConfiguration confLocal = newLocalConf(); |
| confLocal.setOutputBufferSize(Integer.MAX_VALUE) |
| .setImmediateFlushEnabled(false) |
| .setPeriodicFlushFrequencyMilliSeconds(0); |
| ServerConfiguration serverConfLocal = newLocalServerConf(); |
| serverConfLocal.addConfiguration(serverConf); |
| serverConfLocal.setServiceTimeoutMs(200) |
| .setStreamProbationTimeoutMs(100); |
| String streamName = testName.getMethodName(); |
| // create a new service with 200ms timeout |
| DistributedLogServiceImpl localService = createService(serverConfLocal, confLocal); |
| StreamManagerImpl streamManager = (StreamManagerImpl) localService.getStreamManager(); |
| |
| int numWrites = 10; |
| List<Future<WriteResponse>> futureList = new ArrayList<Future<WriteResponse>>(numWrites); |
| for (int i = 0; i < numWrites; i++) { |
| futureList.add(localService.write(streamName, createRecord(i))); |
| } |
| |
| assertTrue("Stream " + streamName + " should be cached", |
| streamManager.getCachedStreams().containsKey(streamName)); |
| |
| StreamImpl s = (StreamImpl) streamManager.getCachedStreams().get(streamName); |
| // the stream should be set CLOSING |
| while (StreamStatus.CLOSING != s.getStatus() |
| && StreamStatus.CLOSED != s.getStatus()) { |
| TimeUnit.MILLISECONDS.sleep(20); |
| } |
| assertNotNull("Writer should be initialized", s.getWriter()); |
| assertNull("No exception should be thrown", s.getLastException()); |
| Future<Void> closeFuture = s.getCloseFuture(); |
| Await.result(closeFuture); |
| for (int i = 0; i < numWrites; i++) { |
| assertTrue("Write should not fail before closing", |
| futureList.get(i).isDefined()); |
| WriteResponse response = Await.result(futureList.get(i)); |
| assertTrue("Op should fail with " + StatusCode.WRITE_CANCELLED_EXCEPTION, |
| StatusCode.BK_TRANSMIT_ERROR == response.getHeader().getCode() |
| || StatusCode.WRITE_EXCEPTION == response.getHeader().getCode() |
| || StatusCode.WRITE_CANCELLED_EXCEPTION == response.getHeader().getCode()); |
| } |
| |
| while (streamManager.getCachedStreams().containsKey(streamName)) { |
| TimeUnit.MILLISECONDS.sleep(20); |
| } |
| |
| assertFalse("Stream should be removed from cache", |
| streamManager.getCachedStreams().containsKey(streamName)); |
| assertFalse("Stream should be removed from acquired cache", |
| streamManager.getAcquiredStreams().containsKey(streamName)); |
| |
| localService.shutdown(); |
| } |
| |
| private DistributedLogServiceImpl createConfiguredLocalService() throws Exception { |
| DistributedLogConfiguration confLocal = newLocalConf(); |
| confLocal.setOutputBufferSize(0) |
| .setImmediateFlushEnabled(true) |
| .setPeriodicFlushFrequencyMilliSeconds(0); |
| return createService(serverConf, confLocal); |
| } |
| |
| private ByteBuffer getTestDataBuffer() { |
| return ByteBuffer.wrap("test-data".getBytes()); |
| } |
| |
| @Test(timeout = 60000) |
| public void testNonDurableWrite() throws Exception { |
| DistributedLogConfiguration confLocal = newLocalConf(); |
| confLocal.setOutputBufferSize(Integer.MAX_VALUE) |
| .setImmediateFlushEnabled(false) |
| .setPeriodicFlushFrequencyMilliSeconds(0) |
| .setDurableWriteEnabled(false); |
| ServerConfiguration serverConfLocal = new ServerConfiguration(); |
| serverConfLocal.addConfiguration(serverConf); |
| serverConfLocal.enableDurableWrite(false); |
| serverConfLocal.setServiceTimeoutMs(Integer.MAX_VALUE) |
| .setStreamProbationTimeoutMs(Integer.MAX_VALUE); |
| String streamName = testName.getMethodName(); |
| DistributedLogServiceImpl localService = |
| createService(serverConfLocal, confLocal); |
| StreamManagerImpl streamManager = (StreamManagerImpl) localService.getStreamManager(); |
| |
| int numWrites = 10; |
| List<Future<WriteResponse>> futureList = new ArrayList<Future<WriteResponse>>(); |
| for (int i = 0; i < numWrites; i++) { |
| futureList.add(localService.write(streamName, createRecord(i))); |
| } |
| assertTrue("Stream " + streamName + " should be cached", |
| streamManager.getCachedStreams().containsKey(streamName)); |
| List<WriteResponse> resultList = FutureUtils.result(Future.collect(futureList)); |
| for (WriteResponse wr : resultList) { |
| assertEquals(DLSN.InvalidDLSN, DLSN.deserialize(wr.getDlsn())); |
| } |
| |
| localService.shutdown(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testWriteOpNoChecksum() throws Exception { |
| DistributedLogServiceImpl localService = createConfiguredLocalService(); |
| WriteContext ctx = new WriteContext(); |
| Future<WriteResponse> result = localService.writeWithContext("test", getTestDataBuffer(), ctx); |
| WriteResponse resp = Await.result(result); |
| assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode()); |
| localService.shutdown(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testTruncateOpNoChecksum() throws Exception { |
| DistributedLogServiceImpl localService = createConfiguredLocalService(); |
| WriteContext ctx = new WriteContext(); |
| Future<WriteResponse> result = localService.truncate("test", new DLSN(1, 2, 3).serialize(), ctx); |
| WriteResponse resp = Await.result(result); |
| assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode()); |
| localService.shutdown(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testStreamOpNoChecksum() throws Exception { |
| DistributedLogServiceImpl localService = createConfiguredLocalService(); |
| WriteContext ctx = new WriteContext(); |
| HeartbeatOptions option = new HeartbeatOptions(); |
| option.setSendHeartBeatToReader(true); |
| |
| // hearbeat to acquire the stream and then release the stream |
| Future<WriteResponse> result = localService.heartbeatWithOptions("test", ctx, option); |
| WriteResponse resp = Await.result(result); |
| assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode()); |
| result = localService.release("test", ctx); |
| resp = Await.result(result); |
| assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode()); |
| |
| // heartbeat to acquire the stream and then delete the stream |
| result = localService.heartbeatWithOptions("test", ctx, option); |
| resp = Await.result(result); |
| assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode()); |
| result = localService.delete("test", ctx); |
| resp = Await.result(result); |
| assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode()); |
| |
| // shutdown the local service |
| localService.shutdown(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testWriteOpChecksumBadChecksum() throws Exception { |
| DistributedLogServiceImpl localService = createConfiguredLocalService(); |
| WriteContext ctx = new WriteContext().setCrc32(999); |
| Future<WriteResponse> result = localService.writeWithContext("test", getTestDataBuffer(), ctx); |
| WriteResponse resp = Await.result(result); |
| assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode()); |
| localService.shutdown(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testWriteOpChecksumBadStream() throws Exception { |
| DistributedLogServiceImpl localService = createConfiguredLocalService(); |
| WriteContext ctx = new WriteContext().setCrc32( |
| ProtocolUtils.writeOpCRC32("test", getTestDataBuffer().array())); |
| Future<WriteResponse> result = localService.writeWithContext("test1", getTestDataBuffer(), ctx); |
| WriteResponse resp = Await.result(result); |
| assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode()); |
| localService.shutdown(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testWriteOpChecksumBadData() throws Exception { |
| DistributedLogServiceImpl localService = createConfiguredLocalService(); |
| ByteBuffer buffer = getTestDataBuffer(); |
| WriteContext ctx = new WriteContext().setCrc32( |
| ProtocolUtils.writeOpCRC32("test", buffer.array())); |
| |
| // Overwrite 1 byte to corrupt data. |
| buffer.put(1, (byte) 0xab); |
| Future<WriteResponse> result = localService.writeWithContext("test", buffer, ctx); |
| WriteResponse resp = Await.result(result); |
| assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode()); |
| localService.shutdown(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testStreamOpChecksumBadChecksum() throws Exception { |
| DistributedLogServiceImpl localService = createConfiguredLocalService(); |
| WriteContext ctx = new WriteContext().setCrc32(999); |
| Future<WriteResponse> result = localService.heartbeat("test", ctx); |
| WriteResponse resp = Await.result(result); |
| assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode()); |
| result = localService.release("test", ctx); |
| resp = Await.result(result); |
| assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode()); |
| result = localService.delete("test", ctx); |
| resp = Await.result(result); |
| assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode()); |
| localService.shutdown(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testTruncateOpChecksumBadChecksum() throws Exception { |
| DistributedLogServiceImpl localService = createConfiguredLocalService(); |
| WriteContext ctx = new WriteContext().setCrc32(999); |
| Future<WriteResponse> result = localService.truncate("test", new DLSN(1, 2, 3).serialize(), ctx); |
| WriteResponse resp = Await.result(result); |
| assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode()); |
| localService.shutdown(); |
| } |
| |
| private WriteOp getWriteOp(String name, SettableFeature disabledFeature, Long checksum) { |
| return new WriteOp(name, |
| ByteBuffer.wrap("test".getBytes()), |
| new NullStatsLogger(), |
| new NullStatsLogger(), |
| new IdentityStreamPartitionConverter(), |
| new ServerConfiguration(), |
| (byte) 0, |
| checksum, |
| false, |
| disabledFeature, |
| DefaultAccessControlManager.INSTANCE); |
| } |
| |
| @Test(timeout = 60000) |
| public void testStreamOpBadChecksumWithChecksumDisabled() throws Exception { |
| String streamName = testName.getMethodName(); |
| |
| SettableFeature disabledFeature = new SettableFeature("", 0); |
| |
| WriteOp writeOp0 = getWriteOp(streamName, disabledFeature, 919191L); |
| WriteOp writeOp1 = getWriteOp(streamName, disabledFeature, 919191L); |
| |
| try { |
| writeOp0.preExecute(); |
| fail("should have thrown"); |
| } catch (Exception ex) { |
| } |
| |
| disabledFeature.set(1); |
| writeOp1.preExecute(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testStreamOpGoodChecksumWithChecksumDisabled() throws Exception { |
| String streamName = testName.getMethodName(); |
| |
| SettableFeature disabledFeature = new SettableFeature("", 1); |
| WriteOp writeOp0 = getWriteOp( |
| streamName, |
| disabledFeature, |
| ProtocolUtils.writeOpCRC32(streamName, "test".getBytes())); |
| WriteOp writeOp1 = getWriteOp( |
| streamName, |
| disabledFeature, |
| ProtocolUtils.writeOpCRC32(streamName, "test".getBytes())); |
| |
| writeOp0.preExecute(); |
| disabledFeature.set(0); |
| writeOp1.preExecute(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testCloseStreamsShouldFlush() throws Exception { |
| DistributedLogConfiguration confLocal = newLocalConf(); |
| confLocal.setOutputBufferSize(Integer.MAX_VALUE) |
| .setImmediateFlushEnabled(false) |
| .setPeriodicFlushFrequencyMilliSeconds(0); |
| |
| String streamNamePrefix = testName.getMethodName(); |
| DistributedLogServiceImpl localService = createService(serverConf, confLocal); |
| StreamManagerImpl streamManager = (StreamManagerImpl) localService.getStreamManager(); |
| |
| int numStreams = 10; |
| int numWrites = 10; |
| List<Future<WriteResponse>> futureList = |
| Lists.newArrayListWithExpectedSize(numStreams * numWrites); |
| for (int i = 0; i < numStreams; i++) { |
| String streamName = streamNamePrefix + "-" + i; |
| HeartbeatOptions hbOptions = new HeartbeatOptions(); |
| hbOptions.setSendHeartBeatToReader(true); |
| // make sure the first log segment of each stream created |
| FutureUtils.result(localService.heartbeatWithOptions(streamName, new WriteContext(), hbOptions)); |
| for (int j = 0; j < numWrites; j++) { |
| futureList.add(localService.write(streamName, createRecord(i * numWrites + j))); |
| } |
| } |
| |
| assertEquals("There should be " + numStreams + " streams in cache", |
| numStreams, streamManager.getCachedStreams().size()); |
| while (streamManager.getAcquiredStreams().size() < numStreams) { |
| TimeUnit.MILLISECONDS.sleep(20); |
| } |
| |
| Future<List<Void>> closeResult = localService.closeStreams(); |
| List<Void> closedStreams = Await.result(closeResult); |
| assertEquals("There should be " + numStreams + " streams closed", |
| numStreams, closedStreams.size()); |
| // all writes should be flushed |
| for (Future<WriteResponse> future : futureList) { |
| WriteResponse response = Await.result(future); |
| assertTrue("Op should succeed or be rejected : " + response.getHeader().getCode(), |
| StatusCode.SUCCESS == response.getHeader().getCode() |
| || StatusCode.WRITE_EXCEPTION == response.getHeader().getCode() |
| || StatusCode.STREAM_UNAVAILABLE == response.getHeader().getCode()); |
| } |
| assertTrue("There should be no streams in the cache", |
| streamManager.getCachedStreams().isEmpty()); |
| assertTrue("There should be no streams in the acquired cache", |
| streamManager.getAcquiredStreams().isEmpty()); |
| |
| localService.shutdown(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testCloseStreamsShouldAbort() throws Exception { |
| DistributedLogConfiguration confLocal = newLocalConf(); |
| confLocal.setOutputBufferSize(Integer.MAX_VALUE) |
| .setImmediateFlushEnabled(false) |
| .setPeriodicFlushFrequencyMilliSeconds(0); |
| |
| String streamNamePrefix = testName.getMethodName(); |
| DistributedLogServiceImpl localService = createService(serverConf, confLocal); |
| StreamManagerImpl streamManager = (StreamManagerImpl) localService.getStreamManager(); |
| |
| int numStreams = 10; |
| int numWrites = 10; |
| List<Future<WriteResponse>> futureList = |
| Lists.newArrayListWithExpectedSize(numStreams * numWrites); |
| for (int i = 0; i < numStreams; i++) { |
| String streamName = streamNamePrefix + "-" + i; |
| HeartbeatOptions hbOptions = new HeartbeatOptions(); |
| hbOptions.setSendHeartBeatToReader(true); |
| // make sure the first log segment of each stream created |
| FutureUtils.result(localService.heartbeatWithOptions(streamName, new WriteContext(), hbOptions)); |
| for (int j = 0; j < numWrites; j++) { |
| futureList.add(localService.write(streamName, createRecord(i * numWrites + j))); |
| } |
| } |
| |
| assertEquals("There should be " + numStreams + " streams in cache", |
| numStreams, streamManager.getCachedStreams().size()); |
| while (streamManager.getAcquiredStreams().size() < numStreams) { |
| TimeUnit.MILLISECONDS.sleep(20); |
| } |
| |
| for (Stream s : streamManager.getAcquiredStreams().values()) { |
| StreamImpl stream = (StreamImpl) s; |
| stream.setStatus(StreamStatus.ERROR); |
| } |
| |
| Future<List<Void>> closeResult = localService.closeStreams(); |
| List<Void> closedStreams = Await.result(closeResult); |
| assertEquals("There should be " + numStreams + " streams closed", |
| numStreams, closedStreams.size()); |
| // all writes should be flushed |
| for (Future<WriteResponse> future : futureList) { |
| WriteResponse response = Await.result(future); |
| assertTrue("Op should fail with " + StatusCode.BK_TRANSMIT_ERROR + " or be rejected : " |
| + response.getHeader().getCode(), |
| StatusCode.BK_TRANSMIT_ERROR == response.getHeader().getCode() |
| || StatusCode.WRITE_EXCEPTION == response.getHeader().getCode() |
| || StatusCode.WRITE_CANCELLED_EXCEPTION == response.getHeader().getCode()); |
| } |
| // acquired streams should all been removed after we close them |
| assertTrue("There should be no streams in the acquired cache", |
| streamManager.getAcquiredStreams().isEmpty()); |
| localService.shutdown(); |
| // cached streams wouldn't be removed immediately after streams are closed |
| // but they should be removed after we shutdown the service |
| assertTrue("There should be no streams in the cache after shutting down the service", |
| streamManager.getCachedStreams().isEmpty()); |
| } |
| |
| @Test(timeout = 60000) |
| public void testShutdown() throws Exception { |
| service.shutdown(); |
| StreamManagerImpl streamManager = (StreamManagerImpl) service.getStreamManager(); |
| WriteResponse response = |
| Await.result(service.write(testName.getMethodName(), createRecord(0L))); |
| assertEquals("Write should fail with " + StatusCode.SERVICE_UNAVAILABLE, |
| StatusCode.SERVICE_UNAVAILABLE, response.getHeader().getCode()); |
| assertTrue("There should be no streams created after shutdown", |
| streamManager.getCachedStreams().isEmpty()); |
| assertTrue("There should be no streams acquired after shutdown", |
| streamManager.getAcquiredStreams().isEmpty()); |
| } |
| |
| @Test(timeout = 60000) |
| public void testGetOwner() throws Exception { |
| ((LocalRoutingService) service.getRoutingService()) |
| .addHost("stream-0", service.getServiceAddress().getSocketAddress()) |
| .setAllowRetrySameHost(false); |
| |
| service.startPlacementPolicy(); |
| |
| WriteResponse response = FutureUtils.result(service.getOwner("stream-1", new WriteContext())); |
| assertEquals(StatusCode.FOUND, response.getHeader().getCode()); |
| assertEquals(service.getServiceAddress().toString(), |
| response.getHeader().getLocation()); |
| |
| // service cache "stream-2" |
| StreamImpl stream = (StreamImpl) service.getStreamManager().getOrCreateStream("stream-2", false); |
| // create write ops to stream-2 to make service acquire the stream |
| WriteOp op = createWriteOp(service, "stream-2", 0L); |
| stream.submit(op); |
| stream.start(); |
| WriteResponse wr = Await.result(op.result()); |
| assertEquals("Op should succeed", |
| StatusCode.SUCCESS, wr.getHeader().getCode()); |
| assertEquals("Service should acquire stream", |
| StreamStatus.INITIALIZED, stream.getStatus()); |
| assertNotNull(stream.getManager()); |
| assertNotNull(stream.getWriter()); |
| assertNull(stream.getLastException()); |
| |
| // the stream is acquired |
| response = FutureUtils.result(service.getOwner("stream-2", new WriteContext())); |
| assertEquals(StatusCode.FOUND, response.getHeader().getCode()); |
| assertEquals(service.getServiceAddress().toString(), |
| response.getHeader().getLocation()); |
| } |
| |
| } |