blob: 944203c3e5548794a270b19ff950e77945911aaf [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BKException.Code;
import org.apache.bookkeeper.client.BookKeeperClientStats;
import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookieClientImpl;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.proto.BookkeeperProtocol;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger;
import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* Test the bookie client.
*/
public class BookieClientTest {
BookieServer bs;
File tmpDir;
public int port = 13645;
public EventLoopGroup eventLoopGroup;
public OrderedExecutor executor;
private ScheduledExecutorService scheduler;
@Before
public void setUp() throws Exception {
tmpDir = IOUtils.createTempDir("bookieClient", "test");
// Since this test does not rely on the BookKeeper client needing to
// know via ZooKeeper which Bookies are available, okay, so pass in null
// for the zkServers input parameter when constructing the BookieServer.
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
conf.setBookiePort(port)
.setJournalDirName(tmpDir.getPath())
.setLedgerDirNames(new String[] { tmpDir.getPath() })
.setMetadataServiceUri(null);
bs = new BookieServer(conf);
bs.start();
eventLoopGroup = new NioEventLoopGroup();
executor = OrderedExecutor.newBuilder()
.name("BKClientOrderedSafeExecutor")
.numThreads(2)
.build();
scheduler = Executors.newSingleThreadScheduledExecutor(
new DefaultThreadFactory("BookKeeperClientScheduler"));
}
@After
public void tearDown() throws Exception {
scheduler.shutdown();
bs.shutdown();
recursiveDelete(tmpDir);
eventLoopGroup.shutdownGracefully();
executor.shutdown();
}
private static void recursiveDelete(File dir) {
File[] children = dir.listFiles();
if (children != null) {
for (File child : children) {
recursiveDelete(child);
}
}
dir.delete();
}
static class ResultStruct {
int rc = -123456;
ByteBuffer entry;
}
ReadEntryCallback recb = new ReadEntryCallback() {
public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuf bb, Object ctx) {
ResultStruct rs = (ResultStruct) ctx;
synchronized (rs) {
rs.rc = rc;
if (BKException.Code.OK == rc && bb != null) {
bb.readerIndex(24);
rs.entry = bb.nioBuffer();
}
rs.notifyAll();
}
}
};
WriteCallback wrcb = new WriteCallback() {
public void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Object ctx) {
if (ctx != null) {
synchronized (ctx) {
if (ctx instanceof ResultStruct) {
ResultStruct rs = (ResultStruct) ctx;
rs.rc = rc;
}
ctx.notifyAll();
}
}
}
};
@Test
public void testWriteGaps() throws Exception {
final Object notifyObject = new Object();
byte[] passwd = new byte[20];
Arrays.fill(passwd, (byte) 'a');
BookieId addr = bs.getBookieId();
ResultStruct arc = new ResultStruct();
BookieClient bc = new BookieClientImpl(new ClientConfiguration(), eventLoopGroup,
UnpooledByteBufAllocator.DEFAULT, executor, scheduler, NullStatsLogger.INSTANCE,
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
ByteBufList bb = createByteBuffer(1, 1, 1);
bc.addEntry(addr, 1, passwd, 1, bb, wrcb, arc, BookieProtocol.FLAG_NONE, false, WriteFlag.NONE);
synchronized (arc) {
arc.wait(1000);
assertEquals(0, arc.rc);
bc.readEntry(addr, 1, 1, recb, arc, BookieProtocol.FLAG_NONE);
arc.wait(1000);
assertEquals(0, arc.rc);
assertEquals(1, arc.entry.getInt());
}
bb = createByteBuffer(2, 1, 2);
bc.addEntry(addr, 1, passwd, 2, bb, wrcb, null, BookieProtocol.FLAG_NONE, false, WriteFlag.NONE);
bb = createByteBuffer(3, 1, 3);
bc.addEntry(addr, 1, passwd, 3, bb, wrcb, null, BookieProtocol.FLAG_NONE, false, WriteFlag.NONE);
bb = createByteBuffer(5, 1, 5);
bc.addEntry(addr, 1, passwd, 5, bb, wrcb, null, BookieProtocol.FLAG_NONE, false, WriteFlag.NONE);
bb = createByteBuffer(7, 1, 7);
bc.addEntry(addr, 1, passwd, 7, bb, wrcb, null, BookieProtocol.FLAG_NONE, false, WriteFlag.NONE);
synchronized (notifyObject) {
bb = createByteBuffer(11, 1, 11);
bc.addEntry(addr, 1, passwd, 11, bb, wrcb, notifyObject, BookieProtocol.FLAG_NONE, false, WriteFlag.NONE);
notifyObject.wait();
}
synchronized (arc) {
bc.readEntry(addr, 1, 6, recb, arc, BookieProtocol.FLAG_NONE);
arc.wait(1000);
assertEquals(BKException.Code.NoSuchEntryException, arc.rc);
}
synchronized (arc) {
bc.readEntry(addr, 1, 7, recb, arc, BookieProtocol.FLAG_NONE);
arc.wait(1000);
assertEquals(0, arc.rc);
assertEquals(7, arc.entry.getInt(), BookieProtocol.FLAG_NONE);
}
synchronized (arc) {
bc.readEntry(addr, 1, 1, recb, arc, BookieProtocol.FLAG_NONE);
arc.wait(1000);
assertEquals(0, arc.rc);
assertEquals(1, arc.entry.getInt());
}
synchronized (arc) {
bc.readEntry(addr, 1, 2, recb, arc, BookieProtocol.FLAG_NONE);
arc.wait(1000);
assertEquals(0, arc.rc);
assertEquals(2, arc.entry.getInt());
}
synchronized (arc) {
bc.readEntry(addr, 1, 3, recb, arc, BookieProtocol.FLAG_NONE);
arc.wait(1000);
assertEquals(0, arc.rc);
assertEquals(3, arc.entry.getInt());
}
synchronized (arc) {
bc.readEntry(addr, 1, 4, recb, arc, BookieProtocol.FLAG_NONE);
arc.wait(1000);
assertEquals(BKException.Code.NoSuchEntryException, arc.rc);
}
synchronized (arc) {
bc.readEntry(addr, 1, 11, recb, arc, BookieProtocol.FLAG_NONE);
arc.wait(1000);
assertEquals(0, arc.rc);
assertEquals(11, arc.entry.getInt());
}
synchronized (arc) {
bc.readEntry(addr, 1, 5, recb, arc, BookieProtocol.FLAG_NONE);
arc.wait(1000);
assertEquals(0, arc.rc);
assertEquals(5, arc.entry.getInt());
}
synchronized (arc) {
bc.readEntry(addr, 1, 10, recb, arc, BookieProtocol.FLAG_NONE);
arc.wait(1000);
assertEquals(BKException.Code.NoSuchEntryException, arc.rc);
}
synchronized (arc) {
bc.readEntry(addr, 1, 12, recb, arc, BookieProtocol.FLAG_NONE);
arc.wait(1000);
assertEquals(BKException.Code.NoSuchEntryException, arc.rc);
}
synchronized (arc) {
bc.readEntry(addr, 1, 13, recb, arc, BookieProtocol.FLAG_NONE);
arc.wait(1000);
assertEquals(BKException.Code.NoSuchEntryException, arc.rc);
}
}
private ByteBufList createByteBuffer(int i, long lid, long eid) {
ByteBuf bb = Unpooled.buffer(4 + 24);
bb.writeLong(lid);
bb.writeLong(eid);
bb.writeLong(eid - 1);
bb.writeInt(i);
return ByteBufList.get(bb);
}
@Test
public void testNoLedger() throws Exception {
ResultStruct arc = new ResultStruct();
BookieId addr = bs.getBookieId();
BookieClient bc = new BookieClientImpl(new ClientConfiguration(), eventLoopGroup,
UnpooledByteBufAllocator.DEFAULT, executor, scheduler, NullStatsLogger.INSTANCE,
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
synchronized (arc) {
bc.readEntry(addr, 2, 13, recb, arc, BookieProtocol.FLAG_NONE);
arc.wait(1000);
assertEquals(BKException.Code.NoSuchLedgerExistsException, arc.rc);
}
}
@Test
public void testGetBookieInfoWithLimitStatsLogging() throws IOException, InterruptedException {
testGetBookieInfo(true);
}
@Test
public void testGetBookieInfoWithoutLimitStatsLogging() throws IOException, InterruptedException {
testGetBookieInfo(false);
}
public void testGetBookieInfo(boolean limitStatsLogging) throws IOException, InterruptedException {
BookieId bookieId = bs.getBookieId();
BookieSocketAddress addr = bs.getLocalAddress();
ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setLimitStatsLogging(limitStatsLogging);
TestStatsProvider statsProvider = new TestStatsProvider();
TestStatsLogger statsLogger = statsProvider.getStatsLogger("");
BookieClient bc = new BookieClientImpl(clientConf, new NioEventLoopGroup(), UnpooledByteBufAllocator.DEFAULT,
executor, scheduler, statsLogger, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
long flags = BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE
| BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE;
class CallbackObj {
int rc;
long requested;
long freeDiskSpace, totalDiskCapacity;
CountDownLatch latch = new CountDownLatch(1);
CallbackObj(long requested) {
this.requested = requested;
this.rc = 0;
this.freeDiskSpace = 0L;
this.totalDiskCapacity = 0L;
}
}
CallbackObj obj = new CallbackObj(flags);
bc.getBookieInfo(bookieId, flags, new GetBookieInfoCallback() {
@Override
public void getBookieInfoComplete(int rc, BookieInfo bInfo, Object ctx) {
CallbackObj obj = (CallbackObj) ctx;
obj.rc = rc;
if (rc == Code.OK) {
if ((obj.requested & BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE) != 0) {
obj.freeDiskSpace = bInfo.getFreeDiskSpace();
}
if ((obj.requested
& BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE) != 0) {
obj.totalDiskCapacity = bInfo.getTotalDiskSpace();
}
}
obj.latch.countDown();
}
}, obj);
obj.latch.await();
System.out.println("Return code: " + obj.rc + "FreeDiskSpace: " + obj.freeDiskSpace + " TotalCapacity: "
+ obj.totalDiskCapacity);
assertTrue("GetBookieInfo failed with error " + obj.rc, obj.rc == Code.OK);
assertTrue("GetBookieInfo failed with error " + obj.rc, obj.freeDiskSpace <= obj.totalDiskCapacity);
assertTrue("GetBookieInfo failed with error " + obj.rc, obj.totalDiskCapacity > 0);
TestOpStatsLogger perChannelBookieClientScopeOfThisAddr = (TestOpStatsLogger) statsLogger
.scope(BookKeeperClientStats.CHANNEL_SCOPE)
.scopeLabel(BookKeeperClientStats.BOOKIE_LABEL, addr.toBookieId().toString())
.getOpStatsLogger(BookKeeperClientStats.GET_BOOKIE_INFO_OP);
int expectedBookieInfoSuccessCount = (limitStatsLogging) ? 0 : 1;
assertEquals("BookieInfoSuccessCount", expectedBookieInfoSuccessCount,
perChannelBookieClientScopeOfThisAddr.getSuccessCount());
}
}