| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.distributedlog.benchmark; |
| |
| import static com.google.common.base.Preconditions.checkArgument; |
| |
| import com.google.common.base.Stopwatch; |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| import com.twitter.common.zookeeper.ServerSet; |
| import org.apache.distributedlog.api.AsyncLogReader; |
| import org.apache.distributedlog.DLSN; |
| import org.apache.distributedlog.DistributedLogConfiguration; |
| import org.apache.distributedlog.api.DistributedLogManager; |
| import org.apache.distributedlog.LogRecordSet; |
| import org.apache.distributedlog.LogRecordWithDLSN; |
| import org.apache.distributedlog.api.namespace.Namespace; |
| import org.apache.distributedlog.benchmark.thrift.Message; |
| import org.apache.distributedlog.client.serverset.DLZkServerSet; |
| import org.apache.distributedlog.exceptions.DLInterruptedException; |
| import org.apache.distributedlog.api.namespace.NamespaceBuilder; |
| import org.apache.distributedlog.service.DistributedLogClient; |
| import org.apache.distributedlog.service.DistributedLogClientBuilder; |
| import org.apache.distributedlog.common.concurrent.FutureUtils; |
| import org.apache.distributedlog.common.util.SchedulerUtils; |
| import com.twitter.finagle.builder.ClientBuilder; |
| import com.twitter.finagle.stats.StatsReceiver; |
| import com.twitter.finagle.thrift.ClientId$; |
| import com.twitter.util.Duration$; |
| import com.twitter.util.Function; |
| import com.twitter.util.Future; |
| import com.twitter.util.FutureEventListener; |
| import com.twitter.util.Promise; |
| import java.io.IOException; |
| import java.net.URI; |
| import java.util.List; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import org.apache.bookkeeper.stats.Counter; |
| import org.apache.bookkeeper.stats.Gauge; |
| import org.apache.bookkeeper.stats.OpStatsLogger; |
| import org.apache.bookkeeper.stats.StatsLogger; |
| import org.apache.thrift.TException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * The benchmark for core library reader. |
| */ |
| public class ReaderWorker implements Worker { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(ReaderWorker.class); |
| |
| static final int BACKOFF_MS = 200; |
| |
| final String streamPrefix; |
| final int startStreamId; |
| final int endStreamId; |
| final ScheduledExecutorService executorService; |
| final ExecutorService callbackExecutor; |
| final Namespace namespace; |
| final DistributedLogManager[] dlms; |
| final AsyncLogReader[] logReaders; |
| final StreamReader[] streamReaders; |
| final int numStreams; |
| final boolean readFromHead; |
| |
| final int truncationIntervalInSeconds; |
| // DL Client Related Variables |
| final DLZkServerSet[] serverSets; |
| final List<String> finagleNames; |
| final DistributedLogClient dlc; |
| |
| volatile boolean running = true; |
| |
| final StatsReceiver statsReceiver; |
| final StatsLogger statsLogger; |
| final OpStatsLogger e2eStat; |
| final OpStatsLogger deliveryStat; |
| final OpStatsLogger negativeE2EStat; |
| final OpStatsLogger negativeDeliveryStat; |
| final OpStatsLogger truncationStat; |
| final Counter invalidRecordsCounter; |
| final Counter outOfOrderSequenceIdCounter; |
| |
| class StreamReader implements |
| org.apache.distributedlog.common.concurrent.FutureEventListener<List<LogRecordWithDLSN>>, |
| Runnable, Gauge<Number> { |
| |
| final int streamIdx; |
| final String streamName; |
| DLSN prevDLSN = null; |
| long prevSequenceId = Long.MIN_VALUE; |
| private static final String gaugeLabel = "sequence_id"; |
| |
| StreamReader(int idx, StatsLogger statsLogger) { |
| this.streamIdx = idx; |
| int streamId = startStreamId + streamIdx; |
| streamName = String.format("%s_%d", streamPrefix, streamId); |
| statsLogger.scope(streamName).registerGauge(gaugeLabel, this); |
| } |
| |
| @Override |
| public void onSuccess(final List<LogRecordWithDLSN> records) { |
| for (final LogRecordWithDLSN record : records) { |
| if (record.isRecordSet()) { |
| try { |
| processRecordSet(record); |
| } catch (IOException e) { |
| onFailure(e); |
| } |
| } else { |
| processRecord(record); |
| } |
| } |
| readLoop(); |
| } |
| |
| public void processRecordSet(final LogRecordWithDLSN record) throws IOException { |
| LogRecordSet.Reader reader = LogRecordSet.of(record); |
| LogRecordWithDLSN nextRecord = reader.nextRecord(); |
| while (null != nextRecord) { |
| processRecord(nextRecord); |
| nextRecord = reader.nextRecord(); |
| } |
| } |
| |
| public void processRecord(final LogRecordWithDLSN record) { |
| Message msg; |
| try { |
| msg = Utils.parseMessage(record.getPayload()); |
| } catch (TException e) { |
| invalidRecordsCounter.inc(); |
| LOG.warn("Failed to parse record {} for stream {} : size = {} , ", |
| new Object[] { record, streamIdx, record.getPayload().length, e }); |
| return; |
| } |
| long curTimeMillis = System.currentTimeMillis(); |
| long e2eLatency = curTimeMillis - msg.getPublishTime(); |
| long deliveryLatency = curTimeMillis - record.getTransactionId(); |
| if (e2eLatency >= 0) { |
| e2eStat.registerSuccessfulEvent(e2eLatency); |
| } else { |
| negativeE2EStat.registerSuccessfulEvent(-e2eLatency); |
| } |
| if (deliveryLatency >= 0) { |
| deliveryStat.registerSuccessfulEvent(deliveryLatency); |
| } else { |
| negativeDeliveryStat.registerSuccessfulEvent(-deliveryLatency); |
| } |
| |
| prevDLSN = record.getDlsn(); |
| } |
| |
| @Override |
| public void onFailure(Throwable cause) { |
| scheduleReinitStream(streamIdx).map(new Function<Void, Void>() { |
| @Override |
| public Void apply(Void value) { |
| prevDLSN = null; |
| prevSequenceId = Long.MIN_VALUE; |
| readLoop(); |
| return null; |
| } |
| }); |
| } |
| |
| void readLoop() { |
| if (!running) { |
| return; |
| } |
| logReaders[streamIdx].readBulk(10).whenComplete(this); |
| } |
| |
| @Override |
| public void run() { |
| final DLSN dlsnToTruncate = prevDLSN; |
| if (null == dlsnToTruncate) { |
| return; |
| } |
| final Stopwatch stopwatch = Stopwatch.createStarted(); |
| dlc.truncate(streamName, dlsnToTruncate).addEventListener( |
| new FutureEventListener<Boolean>() { |
| @Override |
| public void onSuccess(Boolean value) { |
| truncationStat.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MILLISECONDS)); |
| } |
| |
| @Override |
| public void onFailure(Throwable cause) { |
| truncationStat.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MILLISECONDS)); |
| LOG.error("Failed to truncate stream {} to {} : ", |
| new Object[]{streamName, dlsnToTruncate, cause}); |
| } |
| }); |
| } |
| |
| @Override |
| public Number getDefaultValue() { |
| return Long.MIN_VALUE; |
| } |
| |
| @Override |
| public synchronized Number getSample() { |
| return prevSequenceId; |
| } |
| |
| void unregisterGauge() { |
| statsLogger.scope(streamName).unregisterGauge(gaugeLabel, this); |
| } |
| } |
| |
| public ReaderWorker(DistributedLogConfiguration conf, |
| URI uri, |
| String streamPrefix, |
| int startStreamId, |
| int endStreamId, |
| int readThreadPoolSize, |
| List<String> serverSetPaths, |
| List<String> finagleNames, |
| int truncationIntervalInSeconds, |
| boolean readFromHead, /* read from the earliest data of log */ |
| StatsReceiver statsReceiver, |
| StatsLogger statsLogger) throws IOException { |
| checkArgument(startStreamId <= endStreamId); |
| this.streamPrefix = streamPrefix; |
| this.startStreamId = startStreamId; |
| this.endStreamId = endStreamId; |
| this.truncationIntervalInSeconds = truncationIntervalInSeconds; |
| this.readFromHead = readFromHead; |
| this.statsReceiver = statsReceiver; |
| this.statsLogger = statsLogger; |
| this.e2eStat = this.statsLogger.getOpStatsLogger("e2e"); |
| this.negativeE2EStat = this.statsLogger.getOpStatsLogger("e2eNegative"); |
| this.deliveryStat = this.statsLogger.getOpStatsLogger("delivery"); |
| this.negativeDeliveryStat = this.statsLogger.getOpStatsLogger("deliveryNegative"); |
| this.truncationStat = this.statsLogger.getOpStatsLogger("truncation"); |
| this.invalidRecordsCounter = this.statsLogger.getCounter("invalid_records"); |
| this.outOfOrderSequenceIdCounter = this.statsLogger.getCounter("out_of_order_seq_id"); |
| this.executorService = Executors.newScheduledThreadPool( |
| readThreadPoolSize, new ThreadFactoryBuilder().setNameFormat("benchmark.reader-%d").build()); |
| this.callbackExecutor = Executors.newFixedThreadPool( |
| Runtime.getRuntime().availableProcessors(), |
| new ThreadFactoryBuilder().setNameFormat("benchmark.reader-callback-%d").build()); |
| this.finagleNames = finagleNames; |
| this.serverSets = createServerSets(serverSetPaths); |
| |
| conf.setDeserializeRecordSetOnReads(false); |
| |
| if (truncationIntervalInSeconds > 0 && (!finagleNames.isEmpty() || !serverSetPaths.isEmpty())) { |
| // Construct client for truncation |
| DistributedLogClientBuilder builder = DistributedLogClientBuilder.newBuilder() |
| .clientId(ClientId$.MODULE$.apply("dlog_loadtest_reader")) |
| .clientBuilder(ClientBuilder.get() |
| .hostConnectionLimit(10) |
| .hostConnectionCoresize(10) |
| .tcpConnectTimeout(Duration$.MODULE$.fromSeconds(1)) |
| .requestTimeout(Duration$.MODULE$.fromSeconds(2))) |
| .redirectBackoffStartMs(100) |
| .redirectBackoffMaxMs(500) |
| .requestTimeoutMs(2000) |
| .statsReceiver(statsReceiver) |
| .thriftmux(true) |
| .name("reader"); |
| |
| if (serverSetPaths.isEmpty()) { |
| // Prepare finagle names |
| String local = finagleNames.get(0); |
| String[] remotes = new String[finagleNames.size() - 1]; |
| finagleNames.subList(1, finagleNames.size()).toArray(remotes); |
| |
| builder = builder.finagleNameStrs(local, remotes); |
| LOG.info("Initialized distributedlog client for truncation @ {}.", finagleNames); |
| } else if (serverSets.length != 0){ |
| ServerSet local = this.serverSets[0].getServerSet(); |
| ServerSet[] remotes = new ServerSet[this.serverSets.length - 1]; |
| for (int i = 1; i < serverSets.length; i++) { |
| remotes[i - 1] = serverSets[i].getServerSet(); |
| } |
| |
| builder = builder.serverSets(local, remotes); |
| LOG.info("Initialized distributedlog client for truncation @ {}.", serverSetPaths); |
| } else { |
| builder = builder.uri(uri); |
| LOG.info("Initialized distributedlog client for namespace {}", uri); |
| } |
| dlc = builder.build(); |
| } else { |
| dlc = null; |
| } |
| |
| // construct the factory |
| this.namespace = NamespaceBuilder.newBuilder() |
| .conf(conf) |
| .uri(uri) |
| .statsLogger(statsLogger.scope("dl")) |
| .build(); |
| this.numStreams = endStreamId - startStreamId; |
| this.dlms = new DistributedLogManager[numStreams]; |
| this.logReaders = new AsyncLogReader[numStreams]; |
| final CountDownLatch latch = new CountDownLatch(numStreams); |
| for (int i = 0; i < numStreams; i++) { |
| final int idx = i; |
| executorService.submit(new Runnable() { |
| @Override |
| public void run() { |
| reinitStream(idx).map(new Function<Void, Void>() { |
| @Override |
| public Void apply(Void value) { |
| LOG.info("Initialized stream reader {}.", idx); |
| latch.countDown(); |
| return null; |
| } |
| }); |
| } |
| }); |
| } |
| try { |
| latch.await(); |
| } catch (InterruptedException e) { |
| throw new DLInterruptedException("Failed to intialize benchmark readers : ", e); |
| } |
| this.streamReaders = new StreamReader[numStreams]; |
| for (int i = 0; i < numStreams; i++) { |
| streamReaders[i] = new StreamReader(i, statsLogger.scope("perstream")); |
| if (truncationIntervalInSeconds > 0) { |
| executorService.scheduleWithFixedDelay(streamReaders[i], |
| truncationIntervalInSeconds, truncationIntervalInSeconds, TimeUnit.SECONDS); |
| } |
| } |
| LOG.info("Initialized benchmark reader on {} streams {} : [{} - {})", |
| new Object[] { numStreams, streamPrefix, startStreamId, endStreamId }); |
| } |
| |
| protected DLZkServerSet[] createServerSets(List<String> serverSetPaths) { |
| DLZkServerSet[] serverSets = new DLZkServerSet[serverSetPaths.size()]; |
| for (int i = 0; i < serverSets.length; i++) { |
| String serverSetPath = serverSetPaths.get(i); |
| serverSets[i] = DLZkServerSet.of(URI.create(serverSetPath), 60000); |
| } |
| return serverSets; |
| } |
| |
| private Future<Void> reinitStream(int idx) { |
| Promise<Void> promise = new Promise<Void>(); |
| reinitStream(idx, promise); |
| return promise; |
| } |
| |
| private void reinitStream(int idx, Promise<Void> promise) { |
| int streamId = startStreamId + idx; |
| String streamName = String.format("%s_%d", streamPrefix, streamId); |
| |
| if (logReaders[idx] != null) { |
| try { |
| FutureUtils.result(logReaders[idx].asyncClose()); |
| } catch (Exception e) { |
| LOG.warn("Failed on closing stream reader {} : ", streamName, e); |
| } |
| logReaders[idx] = null; |
| } |
| if (dlms[idx] != null) { |
| try { |
| dlms[idx].close(); |
| } catch (IOException e) { |
| LOG.warn("Failed on closing dlm {} : ", streamName, e); |
| } |
| dlms[idx] = null; |
| } |
| |
| try { |
| dlms[idx] = namespace.openLog(streamName); |
| } catch (IOException ioe) { |
| LOG.error("Failed on creating dlm {} : ", streamName, ioe); |
| scheduleReinitStream(idx, promise); |
| return; |
| } |
| DLSN lastDLSN; |
| if (readFromHead) { |
| lastDLSN = DLSN.InitialDLSN; |
| } else { |
| try { |
| lastDLSN = dlms[idx].getLastDLSN(); |
| } catch (IOException ioe) { |
| LOG.error("Failed on getting last dlsn from stream {} : ", streamName, ioe); |
| scheduleReinitStream(idx, promise); |
| return; |
| } |
| } |
| try { |
| logReaders[idx] = dlms[idx].getAsyncLogReader(lastDLSN); |
| } catch (IOException ioe) { |
| LOG.error("Failed on opening reader for stream {} starting from {} : ", |
| new Object[] { streamName, lastDLSN, ioe }); |
| scheduleReinitStream(idx, promise); |
| return; |
| } |
| LOG.info("Opened reader for stream {}, starting from {}.", streamName, lastDLSN); |
| promise.setValue(null); |
| } |
| |
| Future<Void> scheduleReinitStream(int idx) { |
| Promise<Void> promise = new Promise<Void>(); |
| scheduleReinitStream(idx, promise); |
| return promise; |
| } |
| |
| void scheduleReinitStream(final int idx, final Promise<Void> promise) { |
| executorService.schedule(new Runnable() { |
| @Override |
| public void run() { |
| reinitStream(idx, promise); |
| } |
| }, BACKOFF_MS, TimeUnit.MILLISECONDS); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| this.running = false; |
| for (AsyncLogReader reader : logReaders) { |
| if (null != reader) { |
| org.apache.distributedlog.util.Utils.ioResult(reader.asyncClose()); |
| } |
| } |
| for (DistributedLogManager dlm : dlms) { |
| if (null != dlm) { |
| dlm.close(); |
| } |
| } |
| namespace.close(); |
| SchedulerUtils.shutdownScheduler(executorService, 2, TimeUnit.MINUTES); |
| SchedulerUtils.shutdownScheduler(callbackExecutor, 2, TimeUnit.MINUTES); |
| if (this.dlc != null) { |
| this.dlc.close(); |
| } |
| for (DLZkServerSet serverSet: serverSets) { |
| serverSet.close(); |
| } |
| // Unregister gauges to prevent GC spirals |
| for (StreamReader sr : streamReaders) { |
| sr.unregisterGauge(); |
| } |
| } |
| |
| @Override |
| public void run() { |
| LOG.info("Starting reader (prefix = {}, numStreams = {}).", |
| streamPrefix, numStreams); |
| for (StreamReader sr : streamReaders) { |
| sr.readLoop(); |
| } |
| } |
| } |