DL-2: DistributedLog should work with the official apache bookkeeper
This change is to upgrade bookkeeper version to BK 4.5.0.
- upgrade bookkeeper version to 4.5.0-SNAPSHOT (still waiting a few pull requests to merge apache/bookkeeper#297 apache/bookkeeper#287
- change registerSuccessEvent for StatsLogger to add TimeUnit
- use netty4 eventloop
- move twitter repository dependencies to proxy related module only. core library will not depend on scala dependency anymore.
This change is a collaboration change with sijie
We will provide a performance comparison between 0.4.0 (using Twitter BK) and 0.5.0 (using BK 4.5.) in a separate pull request or email.
Author: Jia Zhai <zhaijia03@gmail.com>
Author: Jia Zhai <zhaijia@apache.org>
Author: Sijie Guo <sijie@apache.org>
Reviewers: Leigh Stewart <lstewart@apache.org>
This closes #135 from zhaijack/bump_dl_version
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/DLWriterWorker.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/DLWriterWorker.java
index 5c9b2a9..015cf46 100644
--- a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/DLWriterWorker.java
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/DLWriterWorker.java
@@ -228,12 +228,14 @@
writer.write(new LogRecord(requestMillis, data)).whenComplete(new FutureEventListener<DLSN>() {
@Override
public void onSuccess(DLSN value) {
- requestStat.registerSuccessfulEvent(System.currentTimeMillis() - requestMillis);
+ requestStat.registerSuccessfulEvent(
+ System.currentTimeMillis() - requestMillis, TimeUnit.MILLISECONDS);
}
@Override
public void onFailure(Throwable cause) {
- requestStat.registerFailedEvent(System.currentTimeMillis() - requestMillis);
+ requestStat.registerFailedEvent(
+ System.currentTimeMillis() - requestMillis, TimeUnit.MILLISECONDS);
LOG.error("Failed to publish, rescue it : ", cause);
scheduleRescue(streamIdx, writer, 0);
}
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/ReaderWorker.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/ReaderWorker.java
index ad95a59..b81dad4 100644
--- a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/ReaderWorker.java
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/ReaderWorker.java
@@ -156,14 +156,14 @@
long e2eLatency = curTimeMillis - msg.getPublishTime();
long deliveryLatency = curTimeMillis - record.getTransactionId();
if (e2eLatency >= 0) {
- e2eStat.registerSuccessfulEvent(e2eLatency);
+ e2eStat.registerSuccessfulEvent(e2eLatency, TimeUnit.MILLISECONDS);
} else {
- negativeE2EStat.registerSuccessfulEvent(-e2eLatency);
+ negativeE2EStat.registerSuccessfulEvent(-e2eLatency, TimeUnit.MILLISECONDS);
}
if (deliveryLatency >= 0) {
- deliveryStat.registerSuccessfulEvent(deliveryLatency);
+ deliveryStat.registerSuccessfulEvent(deliveryLatency, TimeUnit.MILLISECONDS);
} else {
- negativeDeliveryStat.registerSuccessfulEvent(-deliveryLatency);
+ negativeDeliveryStat.registerSuccessfulEvent(-deliveryLatency, TimeUnit.MILLISECONDS);
}
prevDLSN = record.getDlsn();
@@ -200,12 +200,14 @@
new FutureEventListener<Boolean>() {
@Override
public void onSuccess(Boolean value) {
- truncationStat.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
+ truncationStat.registerSuccessfulEvent(
+ stopwatch.stop().elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
}
@Override
public void onFailure(Throwable cause) {
- truncationStat.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
+ truncationStat.registerFailedEvent(
+ stopwatch.stop().elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
LOG.error("Failed to truncate stream {} to {} : ",
new Object[]{streamName, dlsnToTruncate, cause});
}
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/WriterWorker.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/WriterWorker.java
index 46f9dfc..9879e71 100644
--- a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/WriterWorker.java
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/WriterWorker.java
@@ -272,10 +272,12 @@
@Override
public void run() {
if (null != dlsn) {
- requestStat.registerSuccessfulEvent(System.currentTimeMillis() - requestMillis);
+ requestStat.registerSuccessfulEvent(
+ System.currentTimeMillis() - requestMillis, TimeUnit.MILLISECONDS);
} else {
LOG.error("Failed to publish to {} : ", streamName, cause);
- requestStat.registerFailedEvent(System.currentTimeMillis() - requestMillis);
+ requestStat.registerFailedEvent(
+ System.currentTimeMillis() - requestMillis, TimeUnit.MILLISECONDS);
exceptionsLogger.getCounter(cause.getClass().getName()).inc();
if (cause instanceof DLException) {
DLException dle = (DLException) cause;
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AsyncReaderBenchmark.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AsyncReaderBenchmark.java
index 4c8e372..a2dbdd7 100644
--- a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AsyncReaderBenchmark.java
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AsyncReaderBenchmark.java
@@ -109,11 +109,12 @@
reader = FutureUtils.result(dlm.openAsyncLogReader(lastDLSN));
}
long elapsedMs = stopwatch.elapsed(TimeUnit.MICROSECONDS);
- openReaderStats.registerSuccessfulEvent(elapsedMs);
+ openReaderStats.registerSuccessfulEvent(elapsedMs, TimeUnit.MICROSECONDS);
logger.info("It took {} ms to position the reader to transaction id = {}, dlsn = {}",
lastTxId, lastDLSN);
} catch (Exception ioe) {
- openReaderStats.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+ openReaderStats.registerFailedEvent(
+ stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
logger.warn("Failed to create reader for stream {} reading from tx id = {}, dlsn = {}.",
new Object[] { streamName, lastTxId, lastDLSN });
}
@@ -133,7 +134,7 @@
stopwatch.start();
records = FutureUtils.result(reader.readBulk(batchSize));
long elapsedMicros = stopwatch.stop().elapsed(TimeUnit.MICROSECONDS);
- blockingReadStats.registerSuccessfulEvent(elapsedMicros);
+ blockingReadStats.registerSuccessfulEvent(elapsedMicros, TimeUnit.MICROSECONDS);
if (!records.isEmpty()) {
readCounter.add(records.size());
LogRecordWithDLSN lastRecord = records.get(records.size() - 1);
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/SyncReaderBenchmark.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/SyncReaderBenchmark.java
index cbd7f67..3e8c8bb 100644
--- a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/SyncReaderBenchmark.java
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/SyncReaderBenchmark.java
@@ -98,10 +98,11 @@
try {
reader = dlm.getInputStream(lastTxId);
long elapsedMs = stopwatch.elapsed(TimeUnit.MICROSECONDS);
- openReaderStats.registerSuccessfulEvent(elapsedMs);
+ openReaderStats.registerSuccessfulEvent(elapsedMs, TimeUnit.MICROSECONDS);
logger.info("It took {} ms to position the reader to transaction id {}", lastTxId);
} catch (IOException ioe) {
- openReaderStats.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+ openReaderStats.registerFailedEvent(
+ stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
logger.warn("Failed to create reader for stream {} reading from {}.", streamName, lastTxId);
}
if (null == reader) {
@@ -129,11 +130,11 @@
if (null != record) {
long elapsedMicros = stopwatch.stop().elapsed(TimeUnit.MICROSECONDS);
if (nonBlocking) {
- nonBlockingReadStats.registerSuccessfulEvent(elapsedMicros);
+ nonBlockingReadStats.registerSuccessfulEvent(elapsedMicros, TimeUnit.MICROSECONDS);
} else {
numCatchupBytes += record.getPayload().length;
++numCatchupReads;
- blockingReadStats.registerSuccessfulEvent(elapsedMicros);
+ blockingReadStats.registerSuccessfulEvent(elapsedMicros, TimeUnit.MICROSECONDS);
}
lastTxId = record.getTransactionId();
} else {
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/BroadCastStatsLogger.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/BroadCastStatsLogger.java
index 61a20f1..6ae269b 100644
--- a/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/BroadCastStatsLogger.java
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/BroadCastStatsLogger.java
@@ -19,6 +19,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.stats.CachingStatsLogger;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.Gauge;
@@ -63,16 +64,29 @@
final OpStatsLogger firstLogger = first.getOpStatsLogger(statName);
final OpStatsLogger secondLogger = second.getOpStatsLogger(statName);
return new OpStatsLogger() {
+
@Override
- public void registerFailedEvent(long l) {
- firstLogger.registerFailedEvent(l);
- secondLogger.registerFailedEvent(l);
+ public void registerFailedEvent(long l, TimeUnit timeUnit) {
+ firstLogger.registerFailedEvent(l, timeUnit);
+ secondLogger.registerFailedEvent(l, timeUnit);
}
@Override
- public void registerSuccessfulEvent(long l) {
- firstLogger.registerSuccessfulEvent(l);
- secondLogger.registerSuccessfulEvent(l);
+ public void registerSuccessfulEvent(long l, TimeUnit timeUnit) {
+ firstLogger.registerSuccessfulEvent(l, timeUnit);
+ secondLogger.registerSuccessfulEvent(l, timeUnit);
+ }
+
+ @Override
+ public void registerSuccessfulValue(long l) {
+ firstLogger.registerSuccessfulValue(l);
+ secondLogger.registerSuccessfulValue(l);
+ }
+
+ @Override
+ public void registerFailedValue(long l) {
+ firstLogger.registerFailedValue(l);
+ secondLogger.registerFailedValue(l);
}
@Override
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/OpStatsListener.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/OpStatsListener.java
index e71a799..4145b39 100644
--- a/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/OpStatsListener.java
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/OpStatsListener.java
@@ -44,11 +44,11 @@
@Override
public void onSuccess(T value) {
- opStatsLogger.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+ opStatsLogger.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
}
@Override
public void onFailure(Throwable cause) {
- opStatsLogger.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+ opStatsLogger.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
}
}
diff --git a/distributedlog-common/src/test/java/org/apache/distributedlog/common/concurrent/TestFutureUtils.java b/distributedlog-common/src/test/java/org/apache/distributedlog/common/concurrent/TestFutureUtils.java
index ddfb7ae..a887c59 100644
--- a/distributedlog-common/src/test/java/org/apache/distributedlog/common/concurrent/TestFutureUtils.java
+++ b/distributedlog-common/src/test/java/org/apache/distributedlog/common/concurrent/TestFutureUtils.java
@@ -312,7 +312,8 @@
Stopwatch.createStarted());
underlyFuture.complete(1234L);
FutureUtils.result(statsFuture);
- verify(statsLogger, times(1)).registerSuccessfulEvent(anyLong());
+ verify(statsLogger, times(1))
+ .registerSuccessfulEvent(anyLong(), eq(TimeUnit.MICROSECONDS));
}
@Test
@@ -325,7 +326,8 @@
Stopwatch.createStarted());
underlyFuture.completeExceptionally(new TestException());
FutureUtils.result(FutureUtils.ignore(statsFuture));
- verify(statsLogger, times(1)).registerFailedEvent(anyLong());
+ verify(statsLogger, times(1))
+ .registerFailedEvent(anyLong(), eq(TimeUnit.MICROSECONDS));
}
@Test
diff --git a/distributedlog-core-twitter/pom.xml b/distributedlog-core-twitter/pom.xml
index bec9b03..9cf11be 100644
--- a/distributedlog-core-twitter/pom.xml
+++ b/distributedlog-core-twitter/pom.xml
@@ -138,4 +138,11 @@
</plugin>
</plugins>
</build>
+ <repositories>
+ <repository>
+ <id>twitter-repo</id>
+ <name>Twitter Maven Repo</name>
+ <url>http://maven.twttr.com</url>
+ </repository>
+ </repositories>
</project>
diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/stats/OpStatsListener.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/stats/OpStatsListener.java
index a9c7b21..67911f7 100644
--- a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/stats/OpStatsListener.java
+++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/stats/OpStatsListener.java
@@ -45,11 +45,13 @@
@Override
public void onSuccess(T value) {
- opStatsLogger.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+ opStatsLogger.registerSuccessfulEvent(
+ stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
}
@Override
public void onFailure(Throwable cause) {
- opStatsLogger.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+ opStatsLogger.registerFailedEvent(
+ stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
}
}
diff --git a/distributedlog-core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java b/distributedlog-core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
index 39a0f77..dc9d09d 100644
--- a/distributedlog-core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
+++ b/distributedlog-core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
@@ -17,16 +17,9 @@
*/
package org.apache.bookkeeper.client;
-import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.proto.BookieClient;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
-import org.apache.bookkeeper.proto.BookkeeperProtocol;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBufferInputStream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.Unpooled;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -36,6 +29,12 @@
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Reader used for DL tools to read entries
@@ -93,17 +92,21 @@
final Set<ReadResult<InputStream>> readResults = new HashSet<ReadResult<InputStream>>();
ReadEntryCallback readEntryCallback = new ReadEntryCallback() {
@Override
- public void readEntryComplete(int rc, long lid, long eid, ChannelBuffer buffer, Object ctx) {
+ public void readEntryComplete(int rc, long lid, long eid, ByteBuf buffer, Object ctx) {
BookieSocketAddress bookieAddress = (BookieSocketAddress) ctx;
ReadResult<InputStream> rr;
if (BKException.Code.OK != rc) {
rr = new ReadResult<InputStream>(eid, rc, null, bookieAddress.getSocketAddress());
} else {
+ ByteBuf content;
try {
- ChannelBufferInputStream is = lh.macManager.verifyDigestAndReturnData(eid, buffer);
- rr = new ReadResult<InputStream>(eid, BKException.Code.OK, is, bookieAddress.getSocketAddress());
+ content = lh.macManager.verifyDigestAndReturnData(eid, buffer);
+ ByteBuf toRet = Unpooled.copiedBuffer(content);
+ rr = new ReadResult<InputStream>(eid, BKException.Code.OK, new ByteBufInputStream(toRet), bookieAddress.getSocketAddress());
} catch (BKException.BKDigestMatchException e) {
rr = new ReadResult<InputStream>(eid, BKException.Code.DigestMatchException, null, bookieAddress.getSocketAddress());
+ } finally {
+ buffer.release();
}
}
readResults.add(rr);
@@ -184,7 +187,7 @@
final Set<ReadResult<Long>> readResults = new HashSet<ReadResult<Long>>();
ReadEntryCallback readEntryCallback = new ReadEntryCallback() {
@Override
- public void readEntryComplete(int rc, long lid, long eid, ChannelBuffer buffer, Object ctx) {
+ public void readEntryComplete(int rc, long lid, long eid, ByteBuf buffer, Object ctx) {
InetSocketAddress bookieAddress = (InetSocketAddress) ctx;
ReadResult<Long> rr;
if (BKException.Code.OK != rc) {
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
index 26a4a76..5be9e19 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
@@ -163,8 +163,10 @@
void completeExceptionally(Throwable throwable) {
Stopwatch stopwatch = Stopwatch.createStarted();
if (promise.completeExceptionally(throwable)) {
- futureSetLatency.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
- delayUntilPromiseSatisfied.registerFailedEvent(enqueueTime.elapsed(TimeUnit.MICROSECONDS));
+ futureSetLatency.registerFailedEvent(
+ stopwatch.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
+ delayUntilPromiseSatisfied.registerFailedEvent(
+ enqueueTime.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
}
}
@@ -191,10 +193,12 @@
if (LOG.isTraceEnabled()) {
LOG.trace("{} : Satisfied promise with {} records", readHandler.getFullyQualifiedName(), records.size());
}
- delayUntilPromiseSatisfied.registerSuccessfulEvent(enqueueTime.stop().elapsed(TimeUnit.MICROSECONDS));
+ delayUntilPromiseSatisfied.registerSuccessfulEvent(
+ enqueueTime.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
Stopwatch stopwatch = Stopwatch.createStarted();
promise.complete(records);
- futureSetLatency.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+ futureSetLatency.registerSuccessfulEvent(
+ stopwatch.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
}
}
@@ -396,7 +400,8 @@
private synchronized CompletableFuture<List<LogRecordWithDLSN>> readInternal(int numEntries,
long deadlineTime,
TimeUnit deadlineTimeUnit) {
- timeBetweenReadNexts.registerSuccessfulEvent(readNextDelayStopwatch.elapsed(TimeUnit.MICROSECONDS));
+ timeBetweenReadNexts.registerSuccessfulEvent(
+ readNextDelayStopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
readNextDelayStopwatch.reset().start();
final PendingReadRequest readRequest = new PendingReadRequest(numEntries, deadlineTime, deadlineTimeUnit);
@@ -443,7 +448,8 @@
}
}
- readNextExecTime.registerSuccessfulEvent(readNextDelayStopwatch.elapsed(TimeUnit.MICROSECONDS));
+ readNextExecTime.registerSuccessfulEvent(
+ readNextDelayStopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
readNextDelayStopwatch.reset().start();
return readRequest.getPromise();
@@ -553,7 +559,8 @@
public void run() {
synchronized(scheduleLock) {
if (scheduleDelayStopwatch.isRunning()) {
- scheduleLatency.registerSuccessfulEvent(scheduleDelayStopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+ scheduleLatency.registerSuccessfulEvent(
+ scheduleDelayStopwatch.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
}
Stopwatch runTime = Stopwatch.createStarted();
@@ -573,7 +580,8 @@
if (null == nextRequest) {
LOG.trace("{}: Queue Empty waiting for Input", readHandler.getFullyQualifiedName());
scheduleCount.set(0);
- backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
+ backgroundReaderRunTime.registerSuccessfulEvent(
+ runTime.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
return;
}
@@ -599,7 +607,8 @@
if (!(lastException.get().getCause() instanceof LogNotFoundException)) {
LOG.warn("{}: Exception", readHandler.getFullyQualifiedName(), lastException.get());
}
- backgroundReaderRunTime.registerFailedEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
+ backgroundReaderRunTime.registerFailedEvent(
+ runTime.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
return;
}
@@ -646,7 +655,8 @@
if (nextRequest.hasReadRecords()) {
long remainingWaitTime = nextRequest.getRemainingWaitTime();
if (remainingWaitTime > 0 && !nextRequest.hasReadEnoughRecords()) {
- backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
+ backgroundReaderRunTime.registerSuccessfulEvent(
+ runTime.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
scheduleDelayStopwatch.reset().start();
scheduleCount.set(0);
// the request could still wait for more records
@@ -681,7 +691,8 @@
} else {
if (0 == scheduleCountLocal) {
LOG.trace("Schedule count dropping to zero", lastException.get());
- backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
+ backgroundReaderRunTime.registerSuccessfulEvent(
+ runTime.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
return;
}
scheduleCountLocal = scheduleCount.decrementAndGet();
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogHandler.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogHandler.java
index a7d0d25..9f2e750 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogHandler.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogHandler.java
@@ -426,13 +426,16 @@
).whenComplete(new FutureEventListener<LogRecordWithDLSN>() {
@Override
public void onSuccess(LogRecordWithDLSN value) {
- recoverLastEntryStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
- recoverScannedEntriesStats.registerSuccessfulEvent(numRecordsScanned.get());
+ recoverLastEntryStats.registerSuccessfulEvent(
+ stopwatch.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
+ recoverScannedEntriesStats.registerSuccessfulValue(numRecordsScanned.get());
}
@Override
public void onFailure(Throwable cause) {
- recoverLastEntryStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+ recoverLastEntryStats.registerFailedEvent(
+ stopwatch.stop().elapsed(TimeUnit.MICROSECONDS),
+ TimeUnit.MICROSECONDS);
}
});
}
@@ -508,9 +511,9 @@
LOG.warn("{} received inprogress log segment in {} millis: {}",
new Object[] { getFullyQualifiedName(), elapsedMillis, metadata });
}
- getInprogressSegmentStat.registerSuccessfulEvent(elapsedMicroSec);
+ getInprogressSegmentStat.registerSuccessfulEvent(elapsedMicroSec, TimeUnit.MICROSECONDS);
} else {
- negativeGetInprogressSegmentStat.registerSuccessfulEvent(-elapsedMicroSec);
+ negativeGetInprogressSegmentStat.registerSuccessfulEvent(-elapsedMicroSec, TimeUnit.MICROSECONDS);
}
} else {
long elapsedMillis = ts - metadata.getCompletionTime();
@@ -520,9 +523,9 @@
LOG.warn("{} received completed log segment in {} millis : {}",
new Object[] { getFullyQualifiedName(), elapsedMillis, metadata });
}
- getCompletedSegmentStat.registerSuccessfulEvent(elapsedMicroSec);
+ getCompletedSegmentStat.registerSuccessfulEvent(elapsedMicroSec, TimeUnit.MICROSECONDS);
} else {
- negativeGetCompletedSegmentStat.registerSuccessfulEvent(-elapsedMicroSec);
+ negativeGetCompletedSegmentStat.registerSuccessfulEvent(-elapsedMicroSec, TimeUnit.MICROSECONDS);
}
}
}
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
index a4016c8..6f201a6 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
@@ -1131,8 +1131,10 @@
final BKTransmitPacket transmitPacket = (BKTransmitPacket) ctx;
// Time from transmit until receipt of addComplete callback
- addCompleteTime.registerSuccessfulEvent(TimeUnit.MICROSECONDS.convert(
- System.nanoTime() - transmitPacket.getTransmitTime(), TimeUnit.NANOSECONDS));
+ addCompleteTime.registerSuccessfulEvent(
+ TimeUnit.MICROSECONDS.convert(
+ System.nanoTime() - transmitPacket.getTransmitTime(), TimeUnit.NANOSECONDS),
+ TimeUnit.MICROSECONDS);
if (BKException.Code.OK == rc) {
EntryBuffer recordSet = transmitPacket.getRecordSet();
@@ -1149,9 +1151,13 @@
@Override
public Void call() {
final Stopwatch deferredTime = Stopwatch.createStarted();
- addCompleteQueuedTime.registerSuccessfulEvent(queuedTime.elapsed(TimeUnit.MICROSECONDS));
+ addCompleteQueuedTime.registerSuccessfulEvent(
+ queuedTime.elapsed(TimeUnit.MICROSECONDS),
+ TimeUnit.MICROSECONDS);
addCompleteDeferredProcessing(transmitPacket, entryId, effectiveRC.get());
- addCompleteDeferredTime.registerSuccessfulEvent(deferredTime.elapsed(TimeUnit.MICROSECONDS));
+ addCompleteDeferredTime.registerSuccessfulEvent(
+ deferredTime.elapsed(TimeUnit.MICROSECONDS),
+ TimeUnit.MILLISECONDS);
return null;
}
@Override
@@ -1199,14 +1205,16 @@
if (transmitResult.get() != BKException.Code.OK) {
if (recordSet.hasUserRecords()) {
- transmitDataPacketSize.registerFailedEvent(recordSet.getNumBytes());
+ transmitDataPacketSize.registerFailedEvent(
+ recordSet.getNumBytes(), TimeUnit.MICROSECONDS);
}
} else {
// If we had data that we flushed then we need it to make sure that
// background flush in the next pass will make the previous writes
// visible by advancing the lastAck
if (recordSet.hasUserRecords()) {
- transmitDataPacketSize.registerSuccessfulEvent(recordSet.getNumBytes());
+ transmitDataPacketSize.registerSuccessfulEvent(
+ recordSet.getNumBytes(), TimeUnit.MICROSECONDS);
controlFlushNeeded = true;
if (immediateFlushEnabled) {
if (0 == minDelayBetweenImmediateFlushMs) {
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java
index 1293d00..a310ea5 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java
@@ -430,9 +430,11 @@
return writer;
} finally {
if (success) {
- openOpStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+ openOpStats.registerSuccessfulEvent(
+ stopwatch.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
} else {
- openOpStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+ openOpStats.registerFailedEvent(
+ stopwatch.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
}
}
}
@@ -743,9 +745,13 @@
return completedLogSegment;
} finally {
if (success) {
- closeOpStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+ closeOpStats.registerSuccessfulEvent(
+ stopwatch.stop().elapsed(TimeUnit.MICROSECONDS),
+ TimeUnit.MICROSECONDS);
} else {
- closeOpStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+ closeOpStats.registerFailedEvent(
+ stopwatch.stop().elapsed(TimeUnit.MICROSECONDS),
+ TimeUnit.MICROSECONDS);
}
}
}
@@ -1196,12 +1202,16 @@
promise.whenComplete(new FutureEventListener<LogSegmentMetadata>() {
@Override
public void onSuccess(LogSegmentMetadata segment) {
- deleteOpStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+ deleteOpStats.registerSuccessfulEvent(
+ stopwatch.stop().elapsed(TimeUnit.MICROSECONDS),
+ TimeUnit.MICROSECONDS);
}
@Override
public void onFailure(Throwable cause) {
- deleteOpStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+ deleteOpStats.registerFailedEvent(
+ stopwatch.stop().elapsed(TimeUnit.MICROSECONDS),
+ TimeUnit.MICROSECONDS);
}
});
entryStore.deleteLogSegment(ledgerMetadata)
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClient.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClient.java
index 2ea3b5d..a50c391 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClient.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClient.java
@@ -18,6 +18,9 @@
package org.apache.distributedlog;
import com.google.common.base.Optional;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.HashedWheelTimer;
+import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
@@ -40,8 +43,6 @@
import org.apache.distributedlog.util.ConfUtils;
import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.zookeeper.KeeperException;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.util.HashedWheelTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,7 +67,7 @@
private final String zkServers;
private final String ledgersPath;
private final byte[] passwd;
- private final ClientSocketChannelFactory channelFactory;
+ private final EventLoopGroup eventLoopGroup;
private final HashedWheelTimer requestTimer;
private final StatsLogger statsLogger;
@@ -80,8 +81,10 @@
@SuppressWarnings("deprecation")
private synchronized void commonInitialization(
- DistributedLogConfiguration conf, String ledgersPath,
- ClientSocketChannelFactory channelFactory, StatsLogger statsLogger, HashedWheelTimer requestTimer)
+ DistributedLogConfiguration conf,
+ String ledgersPath,
+ EventLoopGroup eventLoopGroup,
+ StatsLogger statsLogger, HashedWheelTimer requestTimer)
throws IOException, InterruptedException, KeeperException {
ClientConfiguration bkConfig = new ClientConfiguration();
bkConfig.setAddEntryTimeout(conf.getBKClientWriteTimeout());
@@ -106,11 +109,10 @@
final DNSToSwitchMapping dnsResolver =
NetUtils.getDNSResolver(dnsResolverCls, conf.getBkDNSResolverOverrides());
- this.bkc = BookKeeper.newBuilder()
- .config(bkConfig)
- .zk(zkc.get())
- .channelFactory(channelFactory)
- .statsLogger(statsLogger)
+ this.bkc = BookKeeper.forConfig(bkConfig)
+ .setZookeeper(zkc.get())
+ .setEventLoopGroup(eventLoopGroup)
+ .setStatsLogger(statsLogger)
.dnsResolver(dnsResolver)
.requestTimer(requestTimer)
.featureProvider(featureProvider.orNull())
@@ -122,7 +124,7 @@
String zkServers,
ZooKeeperClient zkc,
String ledgersPath,
- ClientSocketChannelFactory channelFactory,
+ EventLoopGroup eventLoopGroup,
HashedWheelTimer requestTimer,
StatsLogger statsLogger,
Optional<FeatureProvider> featureProvider) {
@@ -131,7 +133,7 @@
this.zkServers = zkServers;
this.ledgersPath = ledgersPath;
this.passwd = conf.getBKDigestPW().getBytes(UTF_8);
- this.channelFactory = channelFactory;
+ this.eventLoopGroup = eventLoopGroup;
this.requestTimer = requestTimer;
this.statsLogger = statsLogger;
this.featureProvider = featureProvider;
@@ -162,7 +164,7 @@
}
try {
- commonInitialization(conf, ledgersPath, channelFactory, statsLogger, requestTimer);
+ commonInitialization(conf, ledgersPath, eventLoopGroup, statsLogger, requestTimer);
} catch (InterruptedException e) {
throw new DLInterruptedException("Interrupted on creating bookkeeper client " + name + " : ", e);
} catch (KeeperException e) {
@@ -216,7 +218,7 @@
promise.completeExceptionally(BKException.create(rc));
}
}
- }, null);
+ }, null, Collections.emptyMap());
return promise;
}
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClientBuilder.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClientBuilder.java
index a356f9f..1149ad5 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClientBuilder.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClientBuilder.java
@@ -19,14 +19,12 @@
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.HashedWheelTimer;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.util.HashedWheelTimer;
import org.apache.bookkeeper.feature.FeatureProvider;
-import org.apache.bookkeeper.feature.Feature;
-
/**
* Builder to build bookkeeper client.
*/
@@ -55,7 +53,7 @@
// statsLogger
private StatsLogger statsLogger = NullStatsLogger.INSTANCE;
// client channel factory
- private ClientSocketChannelFactory channelFactory = null;
+ private EventLoopGroup eventLoopGroup = null;
// request timer
private HashedWheelTimer requestTimer = null;
// feature provider
@@ -150,12 +148,12 @@
/**
* Build BookKeeper client using existing <i>channelFactory</i>.
*
- * @param channelFactory
- * Channel Factory used to build bookkeeper client.
+ * @param eventLoopGroup
+ * event loop group used to build bookkeeper client.
* @return bookkeeper client builder.
*/
- public synchronized BookKeeperClientBuilder channelFactory(ClientSocketChannelFactory channelFactory) {
- this.channelFactory = channelFactory;
+ public synchronized BookKeeperClientBuilder eventLoopGroup(EventLoopGroup eventLoopGroup) {
+ this.eventLoopGroup = eventLoopGroup;
return this;
}
@@ -204,6 +202,15 @@
private BookKeeperClient buildClient() {
validateParameters();
- return new BookKeeperClient(dlConfig, name, zkServers, zkc, ledgersPath, channelFactory, requestTimer, statsLogger, featureProvider);
+ return new BookKeeperClient(
+ dlConfig,
+ name,
+ zkServers,
+ zkc,
+ ledgersPath,
+ eventLoopGroup,
+ requestTimer,
+ statsLogger,
+ featureProvider);
}
}
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/LocalDLMEmulator.java b/distributedlog-core/src/main/java/org/apache/distributedlog/LocalDLMEmulator.java
index f2c510d..bf3a491 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/LocalDLMEmulator.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/LocalDLMEmulator.java
@@ -21,7 +21,6 @@
import org.apache.distributedlog.impl.metadata.BKDLConfig;
import org.apache.distributedlog.metadata.DLMetadata;
import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.LocalBookKeeper;
@@ -33,14 +32,12 @@
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.io.File;
import java.io.IOException;
import java.net.BindException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
-import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -180,36 +177,6 @@
return uri;
}
- public BookieServer newBookie() throws Exception {
- ServerConfiguration bookieConf = new ServerConfiguration();
- bookieConf.setZkTimeout(zkTimeoutSec * 1000);
- bookieConf.setBookiePort(0);
- bookieConf.setAllowLoopback(true);
- File tmpdir = File.createTempFile("bookie" + UUID.randomUUID() + "_",
- "test");
- if (!tmpdir.delete()) {
- LOG.debug("Fail to delete tmpdir " + tmpdir);
- }
- if (!tmpdir.mkdir()) {
- throw new IOException("Fail to create tmpdir " + tmpdir);
- }
- tmpDirs.add(tmpdir);
-
- bookieConf.setZkServers(zkEnsemble);
- bookieConf.setJournalDirName(tmpdir.getPath());
- bookieConf.setLedgerDirNames(new String[]{tmpdir.getPath()});
-
- BookieServer b = new BookieServer(bookieConf);
- b.start();
- for (int i = 0; i < 10 && !b.isRunning(); i++) {
- Thread.sleep(10000);
- }
- if (!b.isRunning()) {
- throw new IOException("Bookie would not start");
- }
- return b;
- }
-
/**
* Check that a number of bookies are available
*
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java
index 21fe227..36ff437 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java
@@ -20,6 +20,12 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.HashedWheelTimer;
+import java.util.concurrent.ThreadFactory;
+import org.apache.commons.lang.SystemUtils;
import org.apache.distributedlog.BookKeeperClient;
import org.apache.distributedlog.BookKeeperClientBuilder;
import org.apache.distributedlog.DistributedLogConfiguration;
@@ -58,9 +64,6 @@
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.common.PathUtils;
import org.apache.zookeeper.data.Stat;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.util.HashedWheelTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,7 +72,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -130,7 +132,7 @@
// {@link org.apache.distributedlog.BookKeeperClient#get()}. So it is safe to
// keep builders and their client wrappers here, as they will be used when
// instantiating readers or writers.
- private ClientSocketChannelFactory channelFactory;
+ private EventLoopGroup eventLoopGroup;
private HashedWheelTimer requestTimer;
private BookKeeperClientBuilder sharedWriterBKCBuilder;
private BookKeeperClient writerBKC;
@@ -250,11 +252,22 @@
return bkdlConfig;
}
+ static EventLoopGroup getDefaultEventLoopGroup(int numThreads) {
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("DL-io-%s").build();
+ if (SystemUtils.IS_OS_LINUX) {
+ try {
+ return new EpollEventLoopGroup(numThreads, threadFactory);
+ } catch (Throwable t) {
+ LOG.warn("Could not use Netty Epoll event loop for bookie server:", t);
+ return new NioEventLoopGroup(numThreads, threadFactory);
+ }
+ } else {
+ return new NioEventLoopGroup(numThreads, threadFactory);
+ }
+ }
+
private void initializeBookKeeperClients() throws IOException {
- this.channelFactory = new NioClientSocketChannelFactory(
- Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("DL-netty-boss-%d").build()),
- Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("DL-netty-worker-%d").build()),
- conf.getBKClientNumberIOThreads());
+ this.eventLoopGroup = getDefaultEventLoopGroup(conf.getBKClientNumberIOThreads());
this.requestTimer = new HashedWheelTimer(
new ThreadFactoryBuilder().setNameFormat("DLFactoryTimer-%d").build(),
conf.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS,
@@ -265,7 +278,7 @@
conf,
bkdlConfig.getBkZkServersForWriter(),
bkdlConfig.getBkLedgersPath(),
- channelFactory,
+ eventLoopGroup,
requestTimer,
Optional.of(featureProvider.scope("bkc")),
statsLogger);
@@ -280,7 +293,7 @@
conf,
bkdlConfig.getBkZkServersForReader(),
bkdlConfig.getBkLedgersPath(),
- channelFactory,
+ eventLoopGroup,
requestTimer,
Optional.<FeatureProvider>absent(),
statsLogger);
@@ -393,7 +406,7 @@
writerZKC.close();
readerZKC.close();
// release bookkeeper resources
- channelFactory.releaseExternalResources();
+ eventLoopGroup.shutdownGracefully();
LOG.info("Release external resources used by channel factory.");
requestTimer.stop();
LOG.info("Stopped request timer");
@@ -582,7 +595,7 @@
DistributedLogConfiguration conf,
String zkServers,
String ledgersPath,
- ClientSocketChannelFactory channelFactory,
+ EventLoopGroup eventLoopGroup,
HashedWheelTimer requestTimer,
Optional<FeatureProvider> featureProviderOptional,
StatsLogger statsLogger) {
@@ -591,7 +604,7 @@
.dlConfig(conf)
.zkServers(zkServers)
.ledgersPath(ledgersPath)
- .channelFactory(channelFactory)
+ .eventLoopGroup(eventLoopGroup)
.requestTimer(requestTimer)
.featureProvider(featureProviderOptional)
.statsLogger(statsLogger);
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/limiter/ChainedRequestLimiter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/limiter/ChainedRequestLimiter.java
index fdad69b..bbb4f7e 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/limiter/ChainedRequestLimiter.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/limiter/ChainedRequestLimiter.java
@@ -70,7 +70,7 @@
limiter.apply(request);
}
} finally {
- applyTime.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+ applyTime.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
}
}
}
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java
index 77151df..664d0bc 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java
@@ -147,11 +147,15 @@
promise.whenComplete(new FutureEventListener<ZKDistributedLock>() {
@Override
public void onSuccess(ZKDistributedLock lock) {
- acquireStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+ acquireStats.registerSuccessfulEvent(
+ stopwatch.stop().elapsed(TimeUnit.MICROSECONDS),
+ TimeUnit.MICROSECONDS);
}
@Override
public void onFailure(Throwable cause) {
- acquireStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+ acquireStats.registerFailedEvent(
+ stopwatch.stop().elapsed(TimeUnit.MICROSECONDS),
+ TimeUnit.MICROSECONDS);
// release the lock if fail to acquire
asyncClose();
}
@@ -496,7 +500,9 @@
synchronized (ZKDistributedLock.this) {
lockReacquireFuture = null;
}
- reacquireStats.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+ reacquireStats.registerSuccessfulEvent(
+ stopwatch.elapsed(TimeUnit.MICROSECONDS),
+ TimeUnit.MICROSECONDS);
}
@Override
@@ -509,7 +515,9 @@
"Exception on re-acquiring lock", cause);
}
}
- reacquireStats.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+ reacquireStats.registerFailedEvent(
+ stopwatch.elapsed(TimeUnit.MICROSECONDS),
+ TimeUnit.MICROSECONDS);
}
});
}
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java
index 9fdcbf1..1325751 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java
@@ -838,9 +838,13 @@
throw new LockingException(lockPath, message, ex);
} finally {
if (success) {
- tryStats.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+ tryStats.registerSuccessfulEvent(
+ stopwatch.elapsed(TimeUnit.MICROSECONDS),
+ TimeUnit.MICROSECONDS);
} else {
- tryStats.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+ tryStats.registerFailedEvent(
+ stopwatch.elapsed(TimeUnit.MICROSECONDS),
+ TimeUnit.MICROSECONDS);
}
// This can only happen for a Throwable thats not an
// Exception, i.e. an Error
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java b/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java
index 64229d1..88c5d0f 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java
@@ -1665,7 +1665,7 @@
System.out.println("Skip inprogress log segment " + segment);
return;
}
- LedgerHandle lh = bkAdmin.openLedger(segment.getLogSegmentId(), true);
+ LedgerHandle lh = bkAdmin.openLedger(segment.getLogSegmentId());
long lac = lh.getLastAddConfirmed();
Enumeration<LedgerEntry> entries = lh.readEntries(lac, lac);
if (!entries.hasMoreElements()) {
@@ -1727,6 +1727,7 @@
abstract protected int runBKCmd(ZooKeeperClient zkc, BookKeeperClient bkc) throws Exception;
}
+ /**
static class RecoverCommand extends PerBKCommand {
final List<Long> ledgers = new ArrayList<Long>();
@@ -2035,6 +2036,7 @@
return "recover [options] <bookiesSrc>";
}
}
+ **/
/**
* Per Ledger Command, which parse common options for per ledger. e.g. ledger id.
@@ -2854,7 +2856,8 @@
addCommand(new ListCommand());
addCommand(new ReadLastConfirmedCommand());
addCommand(new ReadEntriesCommand());
- addCommand(new RecoverCommand());
+ // TODO: Fix it later, tracking by https://github.com/apache/distributedlog/issues/150
+ // addCommand(new RecoverCommand());
addCommand(new RecoverLedgerCommand());
addCommand(new ShowCommand());
addCommand(new TruncateCommand());
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/util/SimplePermitLimiter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/util/SimplePermitLimiter.java
index 3697b3f..35cdfa0 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/util/SimplePermitLimiter.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/util/SimplePermitLimiter.java
@@ -83,7 +83,7 @@
@Override
public boolean acquire() {
- permitsMetric.registerSuccessfulEvent(permits.get());
+ permitsMetric.registerSuccessfulValue(permits.get());
if (permits.incrementAndGet() <= permitsMax || isDarkmode()) {
return true;
} else {
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestDLMTestUtil.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestDLMTestUtil.java
index a4db3ad..e47d020 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestDLMTestUtil.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestDLMTestUtil.java
@@ -32,6 +32,12 @@
public class TestDLMTestUtil {
static final Logger LOG = LoggerFactory.getLogger(TestDLMTestUtil.class);
+ static {
+ // org.apache.zookeeper.test.ClientBase uses FourLetterWordMain, from 3.5.3 four letter words
+ // are disabled by default due to security reasons
+ System.setProperty("zookeeper.4lw.commands.whitelist", "*");
+ }
+
@Rule
public TestName testNames = new TestName();
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java
index 5e4ba07..5b55cd0 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java
@@ -67,6 +67,12 @@
public class TestDistributedLogBase {
static final Logger LOG = LoggerFactory.getLogger(TestDistributedLogBase.class);
+ static {
+ // org.apache.zookeeper.test.ClientBase uses FourLetterWordMain, from 3.5.3 four letter words
+ // are disabled by default due to security reasons
+ System.setProperty("zookeeper.4lw.commands.whitelist", "*");
+ }
+
// Num worker threads should be one, since the exec service is used for the ordered
// future pool in test cases, and setting to > 1 will therefore result in unordered
// write ops.
diff --git a/distributedlog-core/src/test/resources/bk_server.conf b/distributedlog-core/src/test/resources/bk_server.conf
index bd5ae93..0a746d6 100644
--- a/distributedlog-core/src/test/resources/bk_server.conf
+++ b/distributedlog-core/src/test/resources/bk_server.conf
@@ -122,7 +122,7 @@
journalFlushWhenQueueEmpty=false
journalRemoveFromPageCache=true
journalAdaptiveGroupWrites=true
-journalMaxGroupWaitMSec=15
+journalMaxGroupWaitMSec=2
journalBufferedEntriesThreshold=180
journalBufferedWritesThreshold=262144
journalMaxGroupedEntriesToCommit=200
@@ -138,8 +138,8 @@
fileInfoMaxIdleTime=3600
# Bookie Threads Settings
-numAddWorkerThreads=24
-numJournalCallbackThreads=48
-numReadWorkerThreads=72
-numLongPollWorkerThreads=72
+numAddWorkerThreads=1
+numJournalCallbackThreads=1
+numReadWorkerThreads=4
+numLongPollWorkerThreads=4
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/LZ4CompressionCodec.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/LZ4CompressionCodec.java
index 38d3ba3..b164fef 100644
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/LZ4CompressionCodec.java
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/io/LZ4CompressionCodec.java
@@ -58,7 +58,7 @@
Stopwatch watch = Stopwatch.createStarted();
byte[] compressed = compressor.compress(data, offset, length);
- compressionStat.registerSuccessfulEvent(watch.elapsed(TimeUnit.MICROSECONDS));
+ compressionStat.registerSuccessfulEvent(watch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
return compressed;
}
@@ -75,7 +75,7 @@
while (true) {
try {
byte[] decompressed = safeDecompressor.decompress(data, offset, length, outLength);
- decompressionStat.registerSuccessfulEvent(watch.elapsed(TimeUnit.MICROSECONDS));
+ decompressionStat.registerSuccessfulEvent(watch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
return decompressed;
} catch (LZ4Exception e) {
outLength *= 2;
@@ -95,7 +95,7 @@
Stopwatch watch = Stopwatch.createStarted();
byte[] decompressed = fastDecompressor.decompress(data, offset, decompressedSize);
- decompressionStat.registerSuccessfulEvent(watch.elapsed(TimeUnit.MICROSECONDS));
+ decompressionStat.registerSuccessfulEvent(watch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
return decompressed;
}
}
diff --git a/distributedlog-proxy-client/pom.xml b/distributedlog-proxy-client/pom.xml
index 3ae7357..718be4d 100644
--- a/distributedlog-proxy-client/pom.xml
+++ b/distributedlog-proxy-client/pom.xml
@@ -169,4 +169,11 @@
</plugin>
</plugins>
</build>
+ <repositories>
+ <repository>
+ <id>twitter-repo</id>
+ <name>Twitter Maven Repo</name>
+ <url>http://maven.twttr.com</url>
+ </repository>
+ </repositories>
</project>
diff --git a/distributedlog-proxy-protocol/pom.xml b/distributedlog-proxy-protocol/pom.xml
index 468ee09..609be7e 100644
--- a/distributedlog-proxy-protocol/pom.xml
+++ b/distributedlog-proxy-protocol/pom.xml
@@ -127,4 +127,11 @@
</plugin>
</plugins>
</build>
+ <repositories>
+ <repository>
+ <id>twitter-repo</id>
+ <name>Twitter Maven Repo</name>
+ <url>http://maven.twttr.com</url>
+ </repository>
+ </repositories>
</project>
diff --git a/distributedlog-proxy-server/pom.xml b/distributedlog-proxy-server/pom.xml
index 0e294f3..a80a4d2 100644
--- a/distributedlog-proxy-server/pom.xml
+++ b/distributedlog-proxy-server/pom.xml
@@ -272,4 +272,11 @@
</dependencies>
</profile>
</profiles>
+ <repositories>
+ <repository>
+ <id>twitter-repo</id>
+ <name>Twitter Maven Repo</name>
+ <url>http://maven.twttr.com</url>
+ </repository>
+ </repositories>
</project>
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/StatsFilter.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/StatsFilter.java
index ee64580..872fecb 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/StatsFilter.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/StatsFilter.java
@@ -42,11 +42,13 @@
final Stopwatch stopwatch = Stopwatch.createStarted();
try {
result = service.apply(req);
- serviceExec.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+ serviceExec.registerSuccessfulEvent(
+ stopwatch.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
} finally {
outstandingAsync.dec();
if (null == result) {
- serviceExec.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+ serviceExec.registerFailedEvent(
+ stopwatch.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
}
}
return result;
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java
index 1336ddd..2ecaf36 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java
@@ -17,6 +17,7 @@
*/
package org.apache.distributedlog.service.placement;
+import java.util.concurrent.TimeUnit;
import org.apache.distributedlog.client.routing.RoutingService;
import org.apache.distributedlog.api.namespace.Namespace;
import com.twitter.util.Duration;
@@ -175,14 +176,16 @@
}).onSuccess(new Function<TreeSet<ServerLoad>, BoxedUnit>() {
@Override
public BoxedUnit apply(TreeSet<ServerLoad> serverLoads) {
- placementCalcStats.registerSuccessfulEvent(System.currentTimeMillis() - startTime);
+ placementCalcStats.registerSuccessfulEvent(
+ System.currentTimeMillis() - startTime, TimeUnit.MICROSECONDS);
return BoxedUnit.UNIT;
}
}).onFailure(new Function<Throwable, BoxedUnit>() {
@Override
public BoxedUnit apply(Throwable t) {
logger.error("Failure calculating loads", t);
- placementCalcStats.registerFailedEvent(System.currentTimeMillis() - startTime);
+ placementCalcStats.registerFailedEvent(
+ System.currentTimeMillis() - startTime, TimeUnit.MICROSECONDS);
return BoxedUnit.UNIT;
}
});
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java
index 7700184..12c4783 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java
@@ -96,7 +96,8 @@
.addEventListener(new FutureEventListener<Response>() {
@Override
public void onSuccess(Response response) {
- opStatsLogger.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+ opStatsLogger.registerSuccessfulEvent(
+ stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
setResponse(response);
}
@Override
@@ -119,7 +120,8 @@
OwnershipAcquireFailedException oafe = (OwnershipAcquireFailedException) cause;
fail(ResponseUtils.ownerToHeader(oafe.getCurrentOwner()));
} else {
- opStatsLogger.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+ opStatsLogger.registerFailedEvent(
+ stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
fail(ResponseUtils.exceptionToHeader(cause));
}
}
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java
index 372703a..93dedb6 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java
@@ -122,16 +122,19 @@
@Override
public void onSuccess(BulkWriteResponse response) {
if (response.getHeader().getCode() == StatusCode.SUCCESS) {
- latencyStat.registerSuccessfulEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS));
+ latencyStat.registerSuccessfulEvent(
+ stopwatch().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
bytes.add(size);
bulkWriteBytes.add(size);
} else {
- latencyStat.registerFailedEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS));
+ latencyStat.registerFailedEvent(
+ stopwatch().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
}
}
@Override
public void onFailure(Throwable cause) {
- latencyStat.registerFailedEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS));
+ latencyStat.registerFailedEvent(
+ stopwatch().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
}
});
}
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java
index df3d64f..71a9531 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java
@@ -654,9 +654,11 @@
Stopwatch stopwatch,
Promise<Boolean> acquirePromise) {
if (success) {
- streamAcquireStat.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+ streamAcquireStat.registerSuccessfulEvent(
+ stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
} else {
- streamAcquireStat.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+ streamAcquireStat.registerFailedEvent(
+ stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
}
for (StreamOp op : oldPendingOps) {
executeOp(op, success);
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java
index 0a8a2da..e3aa703 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java
@@ -102,16 +102,19 @@
@Override
public void onSuccess(WriteResponse response) {
if (response.getHeader().getCode() == StatusCode.SUCCESS) {
- latencyStat.registerSuccessfulEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS));
+ latencyStat.registerSuccessfulEvent(
+ stopwatch().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
bytes.add(size);
writeBytes.add(size);
} else {
- latencyStat.registerFailedEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS));
+ latencyStat.registerFailedEvent(
+ stopwatch().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
}
}
@Override
public void onFailure(Throwable cause) {
- latencyStat.registerFailedEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS));
+ latencyStat.registerFailedEvent(
+ stopwatch().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
}
});
}
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java
index c3c5d81..d790016 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java
@@ -84,14 +84,14 @@
@Override
public WriteResponse map(WriteResponse response) {
opStatsLogger.registerSuccessfulEvent(
- stopwatch.elapsed(TimeUnit.MICROSECONDS));
+ stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
return response;
}
@Override
public WriteResponse handle(Throwable cause) {
opStatsLogger.registerFailedEvent(
- stopwatch.elapsed(TimeUnit.MICROSECONDS));
+ stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
return ResponseUtils.write(ResponseUtils.exceptionToHeader(cause));
}
diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java
index 58b5b2a..a8ed1b2 100644
--- a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java
+++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java
@@ -49,6 +49,12 @@
*/
public abstract class DistributedLogServerTestCase {
+ static {
+ // org.apache.zookeeper.test.ClientBase uses FourLetterWordMain, from 3.5.3 four letter words
+ // are disabled by default due to security reasons
+ System.setProperty("zookeeper.4lw.commands.whitelist", "*");
+ }
+
protected static DistributedLogConfiguration conf =
new DistributedLogConfiguration().setLockTimeout(10)
.setOutputBufferSize(0).setPeriodicFlushFrequencyMilliSeconds(10);
diff --git a/distributedlog-tutorials/distributedlog-mapreduce/README.md b/distributedlog-tutorials/distributedlog-mapreduce/README.md
deleted file mode 100644
index ab50d28..0000000
--- a/distributedlog-tutorials/distributedlog-mapreduce/README.md
+++ /dev/null
Binary files differ
diff --git a/distributedlog-tutorials/distributedlog-mapreduce/pom.xml b/distributedlog-tutorials/distributedlog-mapreduce/pom.xml
deleted file mode 100644
index e7693df..0000000
--- a/distributedlog-tutorials/distributedlog-mapreduce/pom.xml
+++ /dev/null
@@ -1,56 +0,0 @@
-<?xml version="1.0"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <artifactId>distributedlog</artifactId>
- <groupId>org.apache.distributedlog</groupId>
- <version>0.5.0-SNAPSHOT</version>
- <relativePath>../..</relativePath>
- </parent>
- <groupId>org.apache.distributedlog</groupId>
- <artifactId>distributedlog-mapreduce</artifactId>
- <name>Apache DistributedLog :: Tutorials :: MapReduce Tutorial</name>
- <url>http://maven.apache.org</url>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <project.libdir>${basedir}/lib</project.libdir>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.apache.distributedlog</groupId>
- <artifactId>distributedlog-core</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.distributedlog</groupId>
- <artifactId>distributedlog-proxy-client</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>2.7.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
- <version>2.7.2</version>
- </dependency>
- </dependencies>
-</project>
diff --git a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/DistributedLogInputFormat.java b/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/DistributedLogInputFormat.java
deleted file mode 100644
index 6fd017c..0000000
--- a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/DistributedLogInputFormat.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.mapreduce;
-
-import com.google.common.collect.Lists;
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.api.DistributedLogManager;
-import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.LogRecordWithDLSN;
-import org.apache.distributedlog.LogSegmentMetadata;
-import org.apache.distributedlog.api.namespace.Namespace;
-import org.apache.distributedlog.impl.BKNamespaceDriver;
-import org.apache.distributedlog.api.namespace.NamespaceBuilder;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.BookKeeperAccessor;
-import org.apache.bookkeeper.client.LedgerMetadata;
-import org.apache.bookkeeper.meta.LedgerManager;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * InputFormat to read data from a distributedlog stream.
- */
-public class DistributedLogInputFormat
- extends InputFormat<DLSN, LogRecordWithDLSN> implements Configurable {
-
- private static final String DL_URI = "distributedlog.uri";
- private static final String DL_STREAM = "distributedlog.stream";
-
- protected Configuration conf;
- protected DistributedLogConfiguration dlConf;
- protected URI dlUri;
- protected Namespace namespace;
- protected String streamName;
- protected DistributedLogManager dlm;
-
- /** {@inheritDoc} */
- @Override
- public void setConf(Configuration configuration) {
- this.conf = configuration;
- dlConf = new DistributedLogConfiguration();
- dlUri = URI.create(configuration.get(DL_URI, ""));
- streamName = configuration.get(DL_STREAM, "");
- try {
- namespace = NamespaceBuilder.newBuilder()
- .conf(dlConf)
- .uri(dlUri)
- .build();
- dlm = namespace.openLog(streamName);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public List<InputSplit> getSplits(JobContext jobContext)
- throws IOException, InterruptedException {
- List<LogSegmentMetadata> segments = dlm.getLogSegments();
- List<InputSplit> inputSplits = Lists.newArrayListWithCapacity(segments.size());
- BookKeeper bk = ((BKNamespaceDriver) namespace.getNamespaceDriver()).getReaderBKC().get();
- LedgerManager lm = BookKeeperAccessor.getLedgerManager(bk);
- final AtomicInteger rcHolder = new AtomicInteger(0);
- final AtomicReference<LedgerMetadata> metadataHolder = new AtomicReference<LedgerMetadata>(null);
- for (LogSegmentMetadata segment : segments) {
- final CountDownLatch latch = new CountDownLatch(1);
- lm.readLedgerMetadata(segment.getLogSegmentId(),
- new BookkeeperInternalCallbacks.GenericCallback<LedgerMetadata>() {
- @Override
- public void operationComplete(int rc, LedgerMetadata ledgerMetadata) {
- metadataHolder.set(ledgerMetadata);
- rcHolder.set(rc);
- latch.countDown();
- }
- });
- latch.await();
- if (BKException.Code.OK != rcHolder.get()) {
- throw new IOException("Faild to get log segment metadata for " + segment + " : "
- + BKException.getMessage(rcHolder.get()));
- }
- inputSplits.add(new LogSegmentSplit(segment, metadataHolder.get()));
- }
- return inputSplits;
- }
-
- @Override
- public RecordReader<DLSN, LogRecordWithDLSN> createRecordReader(
- InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
- throws IOException, InterruptedException {
- return new LogSegmentReader(
- streamName,
- dlConf,
- ((BKNamespaceDriver) namespace.getNamespaceDriver()).getReaderBKC().get(),
- (LogSegmentSplit) inputSplit);
- }
-}
diff --git a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/LogSegmentReader.java b/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/LogSegmentReader.java
deleted file mode 100644
index 541db3b..0000000
--- a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/LogSegmentReader.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.mapreduce;
-
-import org.apache.distributedlog.*;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.LedgerEntry;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-import java.util.Enumeration;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-/**
- * Record Reader to read from a log segment split
- */
-class LogSegmentReader extends RecordReader<DLSN, LogRecordWithDLSN> {
-
- final String streamName;
- final BookKeeper bk;
- final LedgerHandle lh;
- final LogSegmentMetadata metadata;
-
- long entryId = -1L;
- Entry.Reader reader = null;
- LogRecordWithDLSN currentRecord = null;
- int readPos = 0;
-
- LogSegmentReader(String streamName,
- DistributedLogConfiguration conf,
- BookKeeper bk,
- LogSegmentSplit split)
- throws IOException {
- this.streamName = streamName;
- this.bk = bk;
- this.metadata = split.getMetadata();
- try {
- this.lh = bk.openLedgerNoRecovery(
- split.getLogSegmentId(),
- BookKeeper.DigestType.CRC32,
- conf.getBKDigestPW().getBytes(UTF_8));
- } catch (BKException e) {
- throw new IOException(e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException(e);
- }
- }
-
- @Override
- public void initialize(InputSplit split, TaskAttemptContext context)
- throws IOException, InterruptedException {
- // do nothing
- }
-
- @Override
- public boolean nextKeyValue()
- throws IOException, InterruptedException {
- LogRecordWithDLSN record;
- currentRecord = null;
- if (null != reader) {
- record = reader.nextRecord();
- if (null != record) {
- currentRecord = record;
- readPos = record.getPositionWithinLogSegment();
- return true;
- } else {
- return false;
- }
- }
- ++entryId;
- if (entryId > lh.getLastAddConfirmed()) {
- return false;
- }
- try {
- Enumeration<LedgerEntry> entries =
- lh.readEntries(entryId, entryId);
- if (entries.hasMoreElements()) {
- LedgerEntry entry = entries.nextElement();
- reader = Entry.newBuilder()
- .setLogSegmentInfo(metadata.getLogSegmentSequenceNumber(),
- metadata.getStartSequenceId())
- .setEntryId(entry.getEntryId())
- .setEnvelopeEntry(
- LogSegmentMetadata.supportsEnvelopedEntries(metadata.getVersion()))
- .deserializeRecordSet(true)
- .setInputStream(entry.getEntryInputStream())
- .buildReader();
- }
- return nextKeyValue();
- } catch (BKException e) {
- throw new IOException(e);
- }
- }
-
- @Override
- public DLSN getCurrentKey()
- throws IOException, InterruptedException {
- return currentRecord.getDlsn();
- }
-
- @Override
- public LogRecordWithDLSN getCurrentValue()
- throws IOException, InterruptedException {
- return currentRecord;
- }
-
- @Override
- public float getProgress()
- throws IOException, InterruptedException {
- if (metadata.getRecordCount() > 0) {
- return ((float) (readPos + 1)) / metadata.getRecordCount();
- }
- return 1;
- }
-
- @Override
- public void close() throws IOException {
- try {
- lh.close();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException(e);
- } catch (BKException e) {
- throw new IOException(e);
- }
- }
-}
diff --git a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/LogSegmentSplit.java b/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/LogSegmentSplit.java
deleted file mode 100644
index 132e24d..0000000
--- a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/LogSegmentSplit.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.mapreduce;
-
-import com.google.common.collect.Sets;
-import org.apache.distributedlog.LogSegmentMetadata;
-import org.apache.bookkeeper.client.LedgerMetadata;
-import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Set;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-/**
- * A input split that reads from a log segment.
- */
-public class LogSegmentSplit extends InputSplit implements Writable {
-
- private LogSegmentMetadata logSegmentMetadata;
- private LedgerMetadata ledgerMetadata;
-
- public LogSegmentSplit() {}
-
- public LogSegmentSplit(LogSegmentMetadata logSegmentMetadata,
- LedgerMetadata ledgerMetadata) {
- this.logSegmentMetadata = logSegmentMetadata;
- this.ledgerMetadata = ledgerMetadata;
- }
-
- public LogSegmentMetadata getMetadata() {
- return logSegmentMetadata;
- }
-
- public long getLogSegmentId() {
- return logSegmentMetadata.getLogSegmentId();
- }
-
- @Override
- public long getLength()
- throws IOException, InterruptedException {
- return logSegmentMetadata.getRecordCount();
- }
-
- @Override
- public String[] getLocations()
- throws IOException, InterruptedException {
- Set<String> locations = Sets.newHashSet();
- for (ArrayList<BookieSocketAddress> ensemble : ledgerMetadata.getEnsembles().values()) {
- for (BookieSocketAddress host : ensemble) {
- locations.add(host.getHostName());
- }
- }
- return locations.toArray(new String[locations.size()]);
- }
-
- @Override
- public void write(DataOutput dataOutput) throws IOException {
- String lsMetadataStr = logSegmentMetadata.getFinalisedData();
- dataOutput.writeUTF(lsMetadataStr);
- String lhMetadataStr = new String(ledgerMetadata.serialize(), UTF_8);
- dataOutput.writeUTF(lhMetadataStr);
- }
-
- @Override
- public void readFields(DataInput dataInput) throws IOException {
- String lsMetadataStr = dataInput.readUTF();
- logSegmentMetadata = LogSegmentMetadata.parseData("",
- lsMetadataStr.getBytes(UTF_8));
- String lhMetadataStr = dataInput.readUTF();
- ledgerMetadata = LedgerMetadata.parseConfig(lhMetadataStr.getBytes(UTF_8),
- Version.ANY);
- }
-}
diff --git a/distributedlog-tutorials/distributedlog-messaging/pom.xml b/distributedlog-tutorials/distributedlog-messaging/pom.xml
index d10e1b5..f346752 100644
--- a/distributedlog-tutorials/distributedlog-messaging/pom.xml
+++ b/distributedlog-tutorials/distributedlog-messaging/pom.xml
@@ -61,7 +61,7 @@
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
- <version>0.5.0-1</version>
+ <version>${libthrift.version}</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
@@ -114,4 +114,11 @@
</plugin>
</plugins>
</build>
+ <repositories>
+ <repository>
+ <id>twitter-repo</id>
+ <name>Twitter Maven Repo</name>
+ <url>http://maven.twttr.com</url>
+ </repository>
+ </repositories>
</project>
diff --git a/distributedlog-tutorials/pom.xml b/distributedlog-tutorials/pom.xml
index 1346656..dc718d6 100644
--- a/distributedlog-tutorials/pom.xml
+++ b/distributedlog-tutorials/pom.xml
@@ -29,7 +29,6 @@
<module>distributedlog-basic</module>
<module>distributedlog-messaging</module>
<module>distributedlog-kafka</module>
- <module>distributedlog-mapreduce</module>
</modules>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
diff --git a/pom.xml b/pom.xml
index 7b44cf7..2712b92 100644
--- a/pom.xml
+++ b/pom.xml
@@ -97,13 +97,13 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<!-- dependencies -->
- <bookkeeper.version>4.3.7-TWTTR-OSS</bookkeeper.version>
+ <bookkeeper.version>4.5.0</bookkeeper.version>
<codahale.metrics.version>3.0.1</codahale.metrics.version>
<commons-cli.version>1.1</commons-cli.version>
<commons-codec.version>1.6</commons-codec.version>
<commons-lang.version>2.6</commons-lang.version>
<commons-lang3.version>3.3.2</commons-lang3.version>
- <curator.version>3.2.1</curator.version>
+ <curator.version>4.0.0</curator.version>
<finagle.version>6.34.0</finagle.version>
<freebuilder.version>1.12.3</freebuilder.version>
<guava.version>20.0</guava.version>
@@ -117,7 +117,7 @@
<scrooge.version>4.6.0</scrooge.version>
<slf4j.version>1.6.4</slf4j.version>
<stats-util.version>0.0.58</stats-util.version>
- <zookeeper.version>3.5.1-alpha</zookeeper.version>
+ <zookeeper.version>3.5.3-beta</zookeeper.version>
<!-- plugin dependencies -->
<apache-rat-plugin.version>0.7</apache-rat-plugin.version>
<cobertura-maven-plugin.version>2.7</cobertura-maven-plugin.version>
@@ -171,7 +171,7 @@
<artifactId>maven-assembly-plugin</artifactId>
<version>${maven-assembly-plugin.version}</version>
<configuration>
- <tarLongFileMode>gnu</tarLongFileMode>
+ <tarLongFileMode>gnu</tarLongFileMode>
<descriptors>
<descriptor>src/assemble/src.xml</descriptor>
</descriptors>
@@ -231,13 +231,14 @@
<exclude>ChangeLog</exclude>
<exclude>**/README.md</exclude>
<exclude>**/apidocs/*</exclude>
- <exclude>GROUPS</exclude>
- <exclude>OWNERS</exclude>
- <exclude>CONFIG.ini</exclude>
+ <exclude>GROUPS</exclude>
+ <exclude>OWNERS</exclude>
+ <exclude>CONFIG.ini</exclude>
<exclude>**/**.md</exclude>
<exclude>scripts/dev/reviewers</exclude>
<exclude>**/dependency-reduced-pom.xml</exclude>
<exclude>**/org/apache/distributedlog/thrift/*</exclude>
+ <exclude>.repository/**</exclude>
</excludes>
</configuration>
</plugin>
@@ -260,19 +261,4 @@
</plugin>
</plugins>
</build>
- <repositories>
- <repository>
- <id>bookkeeper-twitter-mvn-repo</id>
- <url>https://raw.github.com/twitter/bookkeeper/mvn-repo/${bookkeeper.version}</url>
- <snapshots>
- <enabled>true</enabled>
- <updatePolicy>always</updatePolicy>
- </snapshots>
- </repository>
- <repository>
- <id>twitter-repo</id>
- <name>Twitter Maven Repo</name>
- <url>http://maven.twttr.com</url>
- </repository>
- </repositories>
</project>