Add simulated backpressure test code
diff --git a/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/ConsumerSlowerPIDControllerTest.java b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/ConsumerSlowerPIDControllerTest.java
new file mode 100644
index 0000000..f8c6513
--- /dev/null
+++ b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/ConsumerSlowerPIDControllerTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.runtimer.service.backpressure;
+
+/**
+ * @Description 模拟消费速度比生产速度慢时,驻留容量的变化以及生产速度的变化
+ */
+public class ConsumerSlowerPIDControllerTest {
+
+ public static void main(String[] args) {
+ PIDContextTest pidContextTest = new PIDContextTest();
+
+ //Kp=1,Ki=0.2,kD=0 此处是参考Spark 默认参数
+ PIDController pidEventQueue = new PIDController(1, 0.008, 0);
+ // 队列驻留数据条数
+ pidEventQueue.setResidentCapacity(300);
+ // 最大生产速度
+ pidEventQueue.setMaxSpeed(100);
+
+ // TargetQueue反压计算参数
+ PIDController pidTargetQueue = new PIDController(1, 0.008, 0);
+ // 队列驻留数据条数
+ pidTargetQueue.setResidentCapacity(300);
+ pidTargetQueue.setMaxSpeed(100);
+
+ // TargetQueue反压估算器
+ RateEstimatorTest rateEstimatorEventQueue =new RateEstimatorTest(pidContextTest, pidEventQueue,"eventQueue");
+ rateEstimatorEventQueue.start();
+
+ RateEstimatorTest rateEstimatorTargetQueue = new RateEstimatorTest(pidContextTest, pidTargetQueue,"targetQueue");
+ rateEstimatorTargetQueue.start();
+
+ // 生产者,模拟从队列获取消息
+ new MockEventBusListenerTest(pidContextTest,rateEstimatorEventQueue).start();
+
+ // 转换器,模拟将eventrecord转换成pusherrecord
+ new TransformTest(pidContextTest,rateEstimatorTargetQueue).start();
+
+ // 模拟消费者
+ new PusherTest(pidContextTest).start();
+ }
+}
diff --git a/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/MockEventBusListenerTest.java b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/MockEventBusListenerTest.java
new file mode 100644
index 0000000..a1910d3
--- /dev/null
+++ b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/MockEventBusListenerTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.runtimer.service.backpressure;
+
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
+
+import java.util.UUID;
+
+public class MockEventBusListenerTest extends ServiceThread {
+ private PIDContextTest pidContextTest;
+ private RateEstimatorTest rateEstimatorTest;
+
+ public MockEventBusListenerTest(PIDContextTest pidContextTest, RateEstimatorTest rateEstimatorTest) {
+ this.pidContextTest = pidContextTest;
+ this.rateEstimatorTest = rateEstimatorTest;
+ }
+
+ @Override
+ public String getServiceName() {
+ return RateEstimatorTest.class.getSimpleName();
+ }
+
+ @Override
+ public void run() {
+ while (!stopped) {
+ // 用于控制速度。例如计算得到速度为20条/s,令牌桶设置为20。需要1秒生成20个,每次拿一个,拿20次拿完即止,等待下一次采样
+ if (rateEstimatorTest.getNewSpeed() == 0) {
+ this.waitForRunning(300);
+ }
+ // 通过令牌桶控制速度
+ rateEstimatorTest.acquire(1);
+ // 通过阻塞队列控制速度
+ //rateEstimatorTest.acquire();
+ try {
+ pidContextTest.getEventQueue().put(UUID.randomUUID().toString());
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}
diff --git a/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/PIDContextTest.java b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/PIDContextTest.java
new file mode 100644
index 0000000..ffbbfd3
--- /dev/null
+++ b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/PIDContextTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.runtimer.service.backpressure;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.util.concurrent.*;
+
+public class PIDContextTest {
+ private BlockingQueue<String> eventQueue = new LinkedBlockingQueue<>(50000);
+ private BlockingQueue<String> targetQueue = new LinkedBlockingQueue<>(50000);
+ private ExecutorService threadPoolExecutor;
+
+ public PIDContextTest() {
+ threadPoolExecutor = initDefaultThreadPoolExecutor();
+ }
+
+ public BlockingQueue<String> getEventQueue() {
+ return eventQueue;
+ }
+
+ public BlockingQueue<String> getTargetQueue() {
+ return targetQueue;
+ }
+
+
+ public ExecutorService getThreadPoolExecutor() {
+ return threadPoolExecutor;
+ }
+
+ public synchronized boolean canExecute() {
+ // System.out.printf("thread queue size=>%s", ((ThreadPoolExecutor) threadPoolExecutor).getQueue().size());
+ return ((ThreadPoolExecutor) threadPoolExecutor).getQueue().size() < 300;
+ }
+
+ /**
+ * init default thread poll param, support auto config
+ *
+ * @return
+ */
+ private ExecutorService initDefaultThreadPoolExecutor() {
+ ThreadFactoryBuilder threadFactory = new ThreadFactoryBuilder().setNameFormat("pid-test");
+ return new ThreadPoolExecutor(4, 8, 10, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(300), threadFactory.build());
+ }
+}
diff --git a/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/PIDController.java b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/PIDController.java
new file mode 100644
index 0000000..37d31f0
--- /dev/null
+++ b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/PIDController.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.runtimer.service.backpressure;
+
+import java.text.SimpleDateFormat;
+
+/**
+ * @Description
+ * (1)增大比例系数Kp一般将加快系统的响应,在有静差的情况下有利于减小静差。但过大的比例系数会使系统有较大的超调,并产生振荡,使系统的稳定性变坏;
+ * (2)增大积分时间TI一般有利于减小超调,减小振荡,使系统更加稳定,但系统静差的消除将随之减慢;
+ * (3)增大微分时间TD亦有利于加快系统的响应,减小振荡,使系统稳定性增加,但系统对干扰的抑制能力减弱,对扰动有较敏感的响应;另外,过大的微分系数也将使系统的稳定性变坏。
+ */
+public class PIDController {
+ private double Kp; // 比例系数
+ private double Ki; // 积分系数
+ private double Kd; // 微分系数
+
+ private long residentCapacity; // 目标值 队列容量
+ private long integral; // 积分累计值
+ private long lastError; // 上一次误差值
+
+ // 最小生产速度
+ private long minSpeed = 1;
+ // 最大生产速度
+ private long maxSpeed = 300;
+
+ // 当前速度
+ private long currentSpeed = 20;
+
+ // 振幅输出转换成生产速度的缩放因子
+ private double speedScalingFactor = 1;
+
+ private volatile static int time = 0;
+
+ SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+ public PIDController(double Kp, double Ki, double Kd) {
+ this.Kp = Kp;
+ this.Ki = Ki;
+ this.Kd = Kd;
+ }
+
+ public long getMaxSpeed() {
+ return maxSpeed;
+ }
+
+ public void setMaxSpeed(long maxSpeed) {
+ this.maxSpeed = maxSpeed;
+ }
+
+ public long getResidentCapacity() {
+ return residentCapacity;
+ }
+
+ public void setResidentCapacity(long residentCapacity) {
+ this.residentCapacity = residentCapacity;
+ }
+
+ /**
+ * PID算法实现
+ *
+ * @param input 当前队列容量
+ * @param queueCapacity 队列最大初始化大小
+ * @return
+ */
+ public long compute(long input, long queueCapacity) {
+ long error = residentCapacity - input;
+
+ // 计算积分值
+ integral += error;
+
+ // 防止积分值过大或过小
+ if (integral > queueCapacity) {
+ integral = queueCapacity;
+ } else if (integral < -queueCapacity) {
+ integral = -queueCapacity;
+ }
+
+ // 计算微分值
+ long derivative = error - lastError;
+
+ // 计算输出值
+ long output = Math.round(Kp * error + Ki * integral + Kd * derivative);
+
+ // 保存上一次误差值
+ lastError = error;
+
+ return output;
+ }
+
+ /**
+ * 将输出振幅映射到速度,此处speedScalingFactor默认为1 直接映射成速度
+ *
+ * @param input
+ * @param queueCapacity
+ * @return
+ */
+ public long mappeSpeed(long input, long queueCapacity, String queueName) {
+ long output = compute(input, queueCapacity);
+
+ long speedIncrement = Math.round(output * speedScalingFactor); // 根据实际情况调整比例系数,将输出值转换为速度增量
+ long newSpeed = currentSpeed + speedIncrement;
+ if (newSpeed > maxSpeed) {
+ newSpeed = maxSpeed;
+ } else if (newSpeed < minSpeed) {
+ newSpeed = minSpeed;
+ }
+ if ("targetQueue".equals(queueName))
+ System.out.printf("data.push([" + (time++) + "," + input + "]); \n");
+ //System.out.printf("输出振幅:output=>%s,\t上一次采样生产速度:currentSpeed=>%s,\t下次生产速度:newSpeed=>%s,\t (%s)数据容量:input=>%s,\t\tcurrentTime=>%s \n", output, currentSpeed, newSpeed, queueName, input, simpleDateFormat.format(new Date()));
+ currentSpeed = newSpeed;
+ return newSpeed; // 设置新的生产速度
+ }
+}
diff --git a/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/ProductSlowerPIDControllerTest.java b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/ProductSlowerPIDControllerTest.java
new file mode 100644
index 0000000..47f45f3
--- /dev/null
+++ b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/ProductSlowerPIDControllerTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.runtimer.service.backpressure;
+
+/**
+ * @Description 消费速度比生产速度快时,驻留容量的变化以及生产速度的变化
+ */
+public class ProductSlowerPIDControllerTest {
+
+ public static void main(String[] args) {
+ PIDContextTest pidContextTest = new PIDContextTest();
+
+ //Kp=1,Ki=0.2,kD=0 此处是参考Spark 默认参数
+ PIDController pid = new PIDController(5, 0, 0);
+ pid.setResidentCapacity(1000);
+ pid.setMaxSpeed(20);
+
+ RateEstimatorTest rateEstimatorEventQueue =new RateEstimatorTest(pidContextTest, pid,"eventQueue");
+ rateEstimatorEventQueue.start();
+
+ RateEstimatorTest rateEstimatorTargetQueue = new RateEstimatorTest(pidContextTest, pid,"targetQueue");
+ rateEstimatorTargetQueue.start();
+
+ new MockEventBusListenerTest(pidContextTest,rateEstimatorEventQueue).start();
+
+ new TransformTest(pidContextTest,rateEstimatorTargetQueue).start();
+
+ new PusherTest(pidContextTest).start();
+ }
+}
diff --git a/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/PusherTest.java b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/PusherTest.java
new file mode 100644
index 0000000..b15db56
--- /dev/null
+++ b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/PusherTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.runtimer.service.backpressure;
+
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
+
+import java.text.SimpleDateFormat;
+import java.util.Objects;
+import java.util.Random;
+
+public class PusherTest extends ServiceThread {
+ private PIDContextTest pidContextTest;
+
+ SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+ public PusherTest(PIDContextTest pidContextTest) {
+ this.pidContextTest = pidContextTest;
+ }
+
+ @Override
+ public String getServiceName() {
+ return PusherTest.class.getSimpleName();
+ }
+
+ Random random = new Random();
+
+ @Override
+ public void run() {
+ while (!stopped) {
+ try {
+ if (!pidContextTest.canExecute()) {
+ this.waitForRunning(1000);
+ }
+ String a = pidContextTest.getTargetQueue().take();
+ // System.out.printf("处理信息:a=>%s,\t当前队列数据容量:input=>%s,\t\tcurrentTime=>%s \n", a, pidContextTest.getTargetQueue().size(), simpleDateFormat.format(new Date()));
+ if (Objects.isNull(a)) {
+ this.waitForRunning(random.nextInt(1000));
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ pidContextTest.getThreadPoolExecutor().execute(() -> {
+ // 当下游消费速度在一定范围波动时
+ this.waitForRunning(random.nextInt(100) + 100);
+ });
+ }
+ }
+}
\ No newline at end of file
diff --git a/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/RateEstimatorTest.java b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/RateEstimatorTest.java
new file mode 100644
index 0000000..b8e0f06
--- /dev/null
+++ b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/RateEstimatorTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.runtimer.service.backpressure;
+
+import com.google.common.util.concurrent.RateLimiter;
+import lombok.SneakyThrows;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class RateEstimatorTest extends ServiceThread {
+ private PIDContextTest pidContextTest;
+
+ private PIDController pid; // PID控制器
+
+ private volatile long newSpeed;
+ private volatile RateLimiter rateLimiter;
+
+ private BlockingQueue<String> speedLimiter = new LinkedBlockingQueue<>(300);
+
+ private String queueName;
+
+ public RateEstimatorTest(PIDContextTest pidContextTest, PIDController pid, String queueName) {
+ this.pidContextTest = pidContextTest;
+ this.pid = pid;
+ this.queueName = queueName;
+ rateLimiter = RateLimiter.create(1);
+ }
+
+ @Override
+ public String getServiceName() {
+ return MockEventBusListenerTest.class.getSimpleName();
+ }
+
+ @Override
+ public void run() {
+ while (!stopped) {
+ long input;
+ if ("eventQueue".equals(queueName)) {
+ input = pidContextTest.getEventQueue().size();
+ } else {
+ input = pidContextTest.getTargetQueue().size();
+ }
+
+ // double output = pid.compute(input, 10000);
+ newSpeed = pid.mappeSpeed(input, 50000, queueName);
+ rateLimiter = RateLimiter.create((newSpeed > 0) ? newSpeed : 1);
+ create(newSpeed);
+ this.waitForRunning(500);
+ }
+ }
+
+ public long getNewSpeed() {
+ return newSpeed;
+ }
+
+ public double acquire(int permits) {
+ return rateLimiter.acquire(permits);
+ }
+
+ @SneakyThrows
+ private void create(long newSpeed) {
+ if (newSpeed > 0) {
+ speedLimiter.clear();
+ }
+ for (int i = 0; i < newSpeed; i++) {
+ speedLimiter.put(i + "");
+ }
+ }
+
+ @SneakyThrows
+ public String acquire() {
+ return speedLimiter.take();
+ }
+}
diff --git a/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/TransformTest.java b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/TransformTest.java
new file mode 100644
index 0000000..43e7b86
--- /dev/null
+++ b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/TransformTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.runtimer.service.backpressure;
+
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
+
+import java.text.SimpleDateFormat;
+import java.util.Objects;
+
+public class TransformTest extends ServiceThread {
+ private PIDContextTest pidContextTest;
+ private RateEstimatorTest rateEstimatorTest;
+
+ SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+ public TransformTest(PIDContextTest pidContextTest, RateEstimatorTest rateEstimatorTest) {
+ this.pidContextTest = pidContextTest;
+ this.rateEstimatorTest = rateEstimatorTest;
+ }
+
+ @Override
+ public String getServiceName() {
+ return TransformTest.class.getSimpleName();
+ }
+
+ @Override
+ public void run() {
+ while (!stopped) {
+ try {
+ String a = pidContextTest.getEventQueue().take();
+ if (Objects.isNull(a)) {
+ this.waitForRunning(1000);
+ }
+ // 模拟有5个转换器
+ for (int i = 0; i < 5; i++) {
+ // 通过令牌桶控制速度
+ rateEstimatorTest.acquire(1);
+ // 通过阻塞队列控制速度
+ // rateEstimatorTest.acquire();
+ pidContextTest.getTargetQueue().put(a);
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}
\ No newline at end of file