1、基于fileSink写入文件行数计算TPS
diff --git a/adapter/benchmark/pom.xml b/adapter/benchmark/pom.xml
new file mode 100644
index 0000000..ed56a11
--- /dev/null
+++ b/adapter/benchmark/pom.xml
@@ -0,0 +1,58 @@
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>rocketmq-eventbridge-adapter</artifactId>
+ <groupId>org.apache.rocketmq</groupId>
+ <version>1.0.0</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>rocketmq-eventbridge-adapter-benchmark</artifactId>
+ <version>1.0.0</version>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ </properties>
+
+
+ <dependencies>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-common</artifactId>
+ <version>5.1.1</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.11</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/AbstractEventCommon.java b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/AbstractEventCommon.java
new file mode 100644
index 0000000..ed6a91c
--- /dev/null
+++ b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/AbstractEventCommon.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.adapter.benchmark;
+
+import org.apache.rocketmq.common.UtilAll;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.LineNumberReader;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
+
+public abstract class AbstractEventCommon {
+ protected File file;
+ protected LineNumberReader lineNumberReader;
+ protected AtomicReference<Integer> previousRowCount;
+ protected ScheduledExecutorService executorService;
+ protected LongAdder writeCount = new LongAdder();
+ protected LongAdder costTime = new LongAdder();
+
+ protected String twoDecimal(double doubleValue) {
+ BigDecimal bigDecimal = new BigDecimal(doubleValue).setScale(2, RoundingMode.HALF_UP);
+ return bigDecimal.toString();
+ }
+
+ protected void printStats() throws IOException {
+
+ int currentRowCount = getLineNumber();
+ if (previousRowCount.get() == null || previousRowCount.get() == 0) {
+ previousRowCount.set(currentRowCount);
+ return;
+ }
+
+ // tps: 每秒文件打印的行数
+ final long tps = currentRowCount - previousRowCount.get();
+ previousRowCount.set(currentRowCount);
+ writeCount.add(currentRowCount);
+ costTime.add(1000);
+ // delayTime(条/ms)=接收的数量/花费的时间
+ final double delayTime = writeCount.longValue() / costTime.longValue();
+ // String delayTimeStr = twoDecimal(delayTime);
+
+ String info = String.format("Current Time: %s | TPS: %d ",
+ UtilAll.timeMillisToHumanString2(System.currentTimeMillis()), tps);
+
+ System.out.println(info);
+ }
+
+
+ public abstract void start();
+
+ protected int getLineNumber() throws IOException {
+ lineNumberReader.skip(Long.MAX_VALUE);
+ int lineNumber = lineNumberReader.getLineNumber();
+ return lineNumber;
+ }
+}
\ No newline at end of file
diff --git a/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventTPSCommon.java b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventTPSCommon.java
new file mode 100644
index 0000000..e9acb87
--- /dev/null
+++ b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventTPSCommon.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.adapter.benchmark;
+
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.TimerTask;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * 整条链路
+ */
+public class EventTPSCommon extends AbstractEventCommon {
+ public static void main(String[] args) {
+ String filePath = System.getProperty("user.home") + "/demo.eventbridge";
+ if (args.length > 0) {
+ filePath = args[0];
+ }
+ EventTPSCommon tpsCommon = null;
+ try {
+ tpsCommon = new EventTPSCommon(filePath);
+ tpsCommon.start();
+ } catch (FileNotFoundException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public EventTPSCommon(String filePath) throws FileNotFoundException {
+ init(filePath);
+ }
+
+ private void init(String filePath) throws FileNotFoundException {
+ file = new File(filePath);
+ lineNumberReader = new LineNumberReader(new FileReader(file));
+ previousRowCount = new AtomicReference<>();
+ previousRowCount.set(0);
+ executorService = new ScheduledThreadPoolExecutor(1,
+ new BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-all-%d").build());
+ }
+
+ @Override
+ public void start() {
+ executorService.scheduleAtFixedRate(new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ printStats();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }, 1000, 1000, TimeUnit.MILLISECONDS);
+ }
+}
\ No newline at end of file
diff --git a/adapter/benchmark/src/test/java/org/apache/rocketmq/AppTest.java b/adapter/benchmark/src/test/java/org/apache/rocketmq/AppTest.java
new file mode 100644
index 0000000..4271fe4
--- /dev/null
+++ b/adapter/benchmark/src/test/java/org/apache/rocketmq/AppTest.java
@@ -0,0 +1,20 @@
+package org.apache.rocketmq;
+
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+/**
+ * Unit test for simple App.
+ */
+public class AppTest
+{
+ /**
+ * Rigorous Test :-)
+ */
+ @Test
+ public void shouldAnswerWithTrue()
+ {
+ assertTrue( true );
+ }
+}
diff --git a/adapter/pom.xml b/adapter/pom.xml
index 887be3e..dc61d42 100644
--- a/adapter/pom.xml
+++ b/adapter/pom.xml
@@ -27,6 +27,7 @@
<module>rpc</module>
<module>runtime</module>
<module>storage</module>
+ <module>benchmark</module>
</modules>
</project>
\ No newline at end of file