Add simulated backpressure test code
diff --git a/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/ConsumerSlowerPIDControllerTest.java b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/ConsumerSlowerPIDControllerTest.java
new file mode 100644
index 0000000..f8c6513
--- /dev/null
+++ b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/ConsumerSlowerPIDControllerTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.runtimer.service.backpressure;
+
+/**
+ * @Description 模拟消费速度比生产速度慢时,驻留容量的变化以及生产速度的变化
+ */
+public class ConsumerSlowerPIDControllerTest {
+
+    public static void main(String[] args) {
+        PIDContextTest pidContextTest = new PIDContextTest();
+
+        //Kp=1,Ki=0.2,kD=0 此处是参考Spark 默认参数
+        PIDController pidEventQueue = new PIDController(1, 0.008, 0);
+        // 队列驻留数据条数
+        pidEventQueue.setResidentCapacity(300);
+        // 最大生产速度
+        pidEventQueue.setMaxSpeed(100);
+
+        // TargetQueue反压计算参数
+        PIDController pidTargetQueue = new PIDController(1, 0.008, 0);
+        // 队列驻留数据条数
+        pidTargetQueue.setResidentCapacity(300);
+        pidTargetQueue.setMaxSpeed(100);
+
+        // TargetQueue反压估算器
+        RateEstimatorTest rateEstimatorEventQueue =new RateEstimatorTest(pidContextTest, pidEventQueue,"eventQueue");
+        rateEstimatorEventQueue.start();
+
+        RateEstimatorTest rateEstimatorTargetQueue = new RateEstimatorTest(pidContextTest, pidTargetQueue,"targetQueue");
+        rateEstimatorTargetQueue.start();
+
+        // 生产者,模拟从队列获取消息
+        new MockEventBusListenerTest(pidContextTest,rateEstimatorEventQueue).start();
+
+        // 转换器,模拟将eventrecord转换成pusherrecord
+        new TransformTest(pidContextTest,rateEstimatorTargetQueue).start();
+
+        // 模拟消费者
+        new PusherTest(pidContextTest).start();
+    }
+}
diff --git a/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/MockEventBusListenerTest.java b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/MockEventBusListenerTest.java
new file mode 100644
index 0000000..a1910d3
--- /dev/null
+++ b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/MockEventBusListenerTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.runtimer.service.backpressure;
+
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
+
+import java.util.UUID;
+
+public class MockEventBusListenerTest extends ServiceThread {
+    private PIDContextTest pidContextTest;
+    private RateEstimatorTest rateEstimatorTest;
+
+    public MockEventBusListenerTest(PIDContextTest pidContextTest, RateEstimatorTest rateEstimatorTest) {
+        this.pidContextTest = pidContextTest;
+        this.rateEstimatorTest = rateEstimatorTest;
+    }
+
+    @Override
+    public String getServiceName() {
+        return RateEstimatorTest.class.getSimpleName();
+    }
+
+    @Override
+    public void run() {
+        while (!stopped) {
+            // 用于控制速度。例如计算得到速度为20条/s,令牌桶设置为20。需要1秒生成20个,每次拿一个,拿20次拿完即止,等待下一次采样
+            if (rateEstimatorTest.getNewSpeed() == 0) {
+                this.waitForRunning(300);
+            }
+            // 通过令牌桶控制速度
+            rateEstimatorTest.acquire(1);
+            // 通过阻塞队列控制速度
+            //rateEstimatorTest.acquire();
+            try {
+                pidContextTest.getEventQueue().put(UUID.randomUUID().toString());
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+}
diff --git a/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/PIDContextTest.java b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/PIDContextTest.java
new file mode 100644
index 0000000..ffbbfd3
--- /dev/null
+++ b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/PIDContextTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.runtimer.service.backpressure;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.util.concurrent.*;
+
+public class PIDContextTest {
+    private BlockingQueue<String> eventQueue = new LinkedBlockingQueue<>(50000);
+    private BlockingQueue<String> targetQueue = new LinkedBlockingQueue<>(50000);
+    private ExecutorService threadPoolExecutor;
+
+    public PIDContextTest() {
+        threadPoolExecutor = initDefaultThreadPoolExecutor();
+    }
+
+    public BlockingQueue<String> getEventQueue() {
+        return eventQueue;
+    }
+
+    public BlockingQueue<String> getTargetQueue() {
+        return targetQueue;
+    }
+
+
+    public ExecutorService getThreadPoolExecutor() {
+        return threadPoolExecutor;
+    }
+
+    public synchronized boolean  canExecute() {
+        // System.out.printf("thread queue size=>%s", ((ThreadPoolExecutor) threadPoolExecutor).getQueue().size());
+        return ((ThreadPoolExecutor) threadPoolExecutor).getQueue().size() < 300;
+    }
+
+    /**
+     * init default thread poll param, support auto config
+     *
+     * @return
+     */
+    private ExecutorService initDefaultThreadPoolExecutor() {
+        ThreadFactoryBuilder threadFactory = new ThreadFactoryBuilder().setNameFormat("pid-test");
+        return new ThreadPoolExecutor(4, 8, 10, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(300), threadFactory.build());
+    }
+}
diff --git a/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/PIDController.java b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/PIDController.java
new file mode 100644
index 0000000..37d31f0
--- /dev/null
+++ b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/PIDController.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.runtimer.service.backpressure;
+
+import java.text.SimpleDateFormat;
+
+/**
+ * @Description   
+ *   (1)增大比例系数Kp一般将加快系统的响应,在有静差的情况下有利于减小静差。但过大的比例系数会使系统有较大的超调,并产生振荡,使系统的稳定性变坏;
+ *   (2)增大积分时间TI一般有利于减小超调,减小振荡,使系统更加稳定,但系统静差的消除将随之减慢;
+ *   (3)增大微分时间TD亦有利于加快系统的响应,减小振荡,使系统稳定性增加,但系统对干扰的抑制能力减弱,对扰动有较敏感的响应;另外,过大的微分系数也将使系统的稳定性变坏。
+ */
+public class PIDController {
+    private double Kp;  // 比例系数
+    private double Ki;  // 积分系数
+    private double Kd;  // 微分系数
+
+    private long residentCapacity;  // 目标值 队列容量
+    private long integral;  // 积分累计值
+    private long lastError; // 上一次误差值
+
+    // 最小生产速度
+    private long minSpeed = 1;
+    // 最大生产速度
+    private long maxSpeed = 300;
+
+    // 当前速度
+    private long currentSpeed = 20;
+
+    // 振幅输出转换成生产速度的缩放因子
+    private double speedScalingFactor = 1;
+
+    private volatile static int time = 0;
+
+    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+    public PIDController(double Kp, double Ki, double Kd) {
+        this.Kp = Kp;
+        this.Ki = Ki;
+        this.Kd = Kd;
+    }
+
+    public long getMaxSpeed() {
+        return maxSpeed;
+    }
+
+    public void setMaxSpeed(long maxSpeed) {
+        this.maxSpeed = maxSpeed;
+    }
+
+    public long getResidentCapacity() {
+        return residentCapacity;
+    }
+
+    public void setResidentCapacity(long residentCapacity) {
+        this.residentCapacity = residentCapacity;
+    }
+
+    /**
+     * PID算法实现
+     *
+     * @param input         当前队列容量
+     * @param queueCapacity 队列最大初始化大小
+     * @return
+     */
+    public long compute(long input, long queueCapacity) {
+        long error = residentCapacity - input;
+
+        // 计算积分值
+        integral += error;
+
+        // 防止积分值过大或过小
+        if (integral > queueCapacity) {
+            integral = queueCapacity;
+        } else if (integral < -queueCapacity) {
+            integral = -queueCapacity;
+        }
+
+        // 计算微分值
+        long derivative = error - lastError;
+
+        // 计算输出值
+        long output = Math.round(Kp * error + Ki * integral + Kd * derivative);
+
+        // 保存上一次误差值
+        lastError = error;
+
+        return output;
+    }
+
+    /**
+     * 将输出振幅映射到速度,此处speedScalingFactor默认为1 直接映射成速度
+     *
+     * @param input
+     * @param queueCapacity
+     * @return
+     */
+    public long mappeSpeed(long input, long queueCapacity, String queueName) {
+        long output = compute(input, queueCapacity);
+
+        long speedIncrement = Math.round(output * speedScalingFactor);  // 根据实际情况调整比例系数,将输出值转换为速度增量
+        long newSpeed = currentSpeed + speedIncrement;
+        if (newSpeed > maxSpeed) {
+            newSpeed = maxSpeed;
+        } else if (newSpeed < minSpeed) {
+            newSpeed = minSpeed;
+        }
+        if ("targetQueue".equals(queueName))
+            System.out.printf("data.push([" + (time++) + "," + input + "]); \n");
+        //System.out.printf("输出振幅:output=>%s,\t上一次采样生产速度:currentSpeed=>%s,\t下次生产速度:newSpeed=>%s,\t (%s)数据容量:input=>%s,\t\tcurrentTime=>%s \n", output, currentSpeed, newSpeed, queueName, input, simpleDateFormat.format(new Date()));
+        currentSpeed = newSpeed;
+        return newSpeed;  // 设置新的生产速度
+    }
+}
diff --git a/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/ProductSlowerPIDControllerTest.java b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/ProductSlowerPIDControllerTest.java
new file mode 100644
index 0000000..47f45f3
--- /dev/null
+++ b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/ProductSlowerPIDControllerTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.runtimer.service.backpressure;
+
+/**
+ * @Description 消费速度比生产速度快时,驻留容量的变化以及生产速度的变化
+ */
+public class ProductSlowerPIDControllerTest {
+
+    public static void main(String[] args) {
+        PIDContextTest pidContextTest = new PIDContextTest();
+
+        //Kp=1,Ki=0.2,kD=0 此处是参考Spark 默认参数
+        PIDController pid = new PIDController(5, 0, 0);
+        pid.setResidentCapacity(1000);
+        pid.setMaxSpeed(20);
+
+        RateEstimatorTest rateEstimatorEventQueue =new RateEstimatorTest(pidContextTest, pid,"eventQueue");
+        rateEstimatorEventQueue.start();
+
+        RateEstimatorTest rateEstimatorTargetQueue = new RateEstimatorTest(pidContextTest, pid,"targetQueue");
+        rateEstimatorTargetQueue.start();
+
+        new MockEventBusListenerTest(pidContextTest,rateEstimatorEventQueue).start();
+
+        new TransformTest(pidContextTest,rateEstimatorTargetQueue).start();
+
+        new PusherTest(pidContextTest).start();
+    }
+}
diff --git a/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/PusherTest.java b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/PusherTest.java
new file mode 100644
index 0000000..b15db56
--- /dev/null
+++ b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/PusherTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.runtimer.service.backpressure;
+
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
+
+import java.text.SimpleDateFormat;
+import java.util.Objects;
+import java.util.Random;
+
+public class PusherTest extends ServiceThread {
+    private PIDContextTest pidContextTest;
+
+    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+    public PusherTest(PIDContextTest pidContextTest) {
+        this.pidContextTest = pidContextTest;
+    }
+
+    @Override
+    public String getServiceName() {
+        return PusherTest.class.getSimpleName();
+    }
+
+    Random random = new Random();
+
+    @Override
+    public void run() {
+        while (!stopped) {
+            try {
+                if (!pidContextTest.canExecute()) {
+                    this.waitForRunning(1000);
+                }
+                String a = pidContextTest.getTargetQueue().take();
+                // System.out.printf("处理信息:a=>%s,\t当前队列数据容量:input=>%s,\t\tcurrentTime=>%s \n", a, pidContextTest.getTargetQueue().size(), simpleDateFormat.format(new Date()));
+                if (Objects.isNull(a)) {
+                    this.waitForRunning(random.nextInt(1000));
+                }
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+
+            pidContextTest.getThreadPoolExecutor().execute(() -> {
+                // 当下游消费速度在一定范围波动时
+                 this.waitForRunning(random.nextInt(100) + 100);
+            });
+        }
+    }
+}
\ No newline at end of file
diff --git a/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/RateEstimatorTest.java b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/RateEstimatorTest.java
new file mode 100644
index 0000000..b8e0f06
--- /dev/null
+++ b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/RateEstimatorTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.runtimer.service.backpressure;
+
+import com.google.common.util.concurrent.RateLimiter;
+import lombok.SneakyThrows;
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class RateEstimatorTest extends ServiceThread {
+    private PIDContextTest pidContextTest;
+
+    private PIDController pid;  // PID控制器
+
+    private volatile long newSpeed;
+    private volatile RateLimiter rateLimiter;
+
+    private BlockingQueue<String> speedLimiter = new LinkedBlockingQueue<>(300);
+
+    private String queueName;
+
+    public RateEstimatorTest(PIDContextTest pidContextTest, PIDController pid, String queueName) {
+        this.pidContextTest = pidContextTest;
+        this.pid = pid;
+        this.queueName = queueName;
+        rateLimiter = RateLimiter.create(1);
+    }
+
+    @Override
+    public String getServiceName() {
+        return MockEventBusListenerTest.class.getSimpleName();
+    }
+
+    @Override
+    public void run() {
+        while (!stopped) {
+            long input;
+            if ("eventQueue".equals(queueName)) {
+                input = pidContextTest.getEventQueue().size();
+            } else {
+                input = pidContextTest.getTargetQueue().size();
+            }
+
+            // double output = pid.compute(input, 10000);
+            newSpeed = pid.mappeSpeed(input, 50000, queueName);
+            rateLimiter = RateLimiter.create((newSpeed > 0) ? newSpeed : 1);
+            create(newSpeed);
+            this.waitForRunning(500);
+        }
+    }
+
+    public long getNewSpeed() {
+        return newSpeed;
+    }
+
+    public double acquire(int permits) {
+        return rateLimiter.acquire(permits);
+    }
+
+    @SneakyThrows
+    private void create(long newSpeed) {
+        if (newSpeed > 0) {
+            speedLimiter.clear();
+        }
+        for (int i = 0; i < newSpeed; i++) {
+            speedLimiter.put(i + "");
+        }
+    }
+
+    @SneakyThrows
+    public String acquire() {
+        return speedLimiter.take();
+    }
+}
diff --git a/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/TransformTest.java b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/TransformTest.java
new file mode 100644
index 0000000..43e7b86
--- /dev/null
+++ b/adapter/runtimer/src/test/java/org/apache/rocketmq/eventbridge/runtimer/service/backpressure/TransformTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.runtimer.service.backpressure;
+
+import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
+
+import java.text.SimpleDateFormat;
+import java.util.Objects;
+
+public class TransformTest extends ServiceThread {
+    private PIDContextTest pidContextTest;
+    private RateEstimatorTest rateEstimatorTest;
+
+    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+    public TransformTest(PIDContextTest pidContextTest, RateEstimatorTest rateEstimatorTest) {
+        this.pidContextTest = pidContextTest;
+        this.rateEstimatorTest = rateEstimatorTest;
+    }
+
+    @Override
+    public String getServiceName() {
+        return TransformTest.class.getSimpleName();
+    }
+
+    @Override
+    public void run() {
+        while (!stopped) {
+            try {
+                String a = pidContextTest.getEventQueue().take();
+                if (Objects.isNull(a)) {
+                    this.waitForRunning(1000);
+                }
+                // 模拟有5个转换器
+                for (int i = 0; i < 5; i++) {
+                    // 通过令牌桶控制速度
+                    rateEstimatorTest.acquire(1);
+                    // 通过阻塞队列控制速度
+                    // rateEstimatorTest.acquire();
+                    pidContextTest.getTargetQueue().put(a);
+                }
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+}
\ No newline at end of file