[ISSUE #5208] develop mcp protocol (#5203)

* basic arch

* refine

* basic arch

* build the basic mcp server without streamable http

* build the basic mcp server without streamable http

* build the basic mcp server without streamable http

* build the basic mcp server with streamable http

* build the basic mcp server

* build the basic mcp server

* build the basic mcp server

* build the basic mcp server

* build the basic mcp server

* build the basic mcp server

* build the basic mcp server

* build the basic mcp server

* build the basic mcp server

* Update RemoteSubscribeInstance.java

* Update McpSinkHandlerRetryWrapper.java

* Update CommonMcpSinkHandler.java

* Update McpSinkConnector.java

* Update McpSinkHandler.java

* Update McpExportRecord.java

* Update McpConnectRecord.java

* Update McpExportRecordPage.java

* Update McpExportMetadata.java

* Update McpSourceConnector.java

* Update McpToolRegistry.java

* Update McpServerConfig.java

* Update Protocol.java

* Update McpSourceConnector.java

* Update McpSourceConstants.java

* Update McpStandardProtocol.java

* Update McpRequest.java

* Update McpResponse.java

* Update McpSinkHandlerRetryWrapper.java

* Update AbstractMcpSinkHandler.java

---------

Co-authored-by: youyun.0601 <youyun.0601@bytedance.com>
Co-authored-by: wqliang <wqliang@users.noreply.github.com>
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/McpRetryConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/McpRetryConfig.java
new file mode 100644
index 0000000..44889f9
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/McpRetryConfig.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.config.connector.mcp;
+
+import lombok.Data;
+
+@Data
+public class McpRetryConfig {
+    // maximum number of retries, default 2, minimum 0
+    private int maxRetries = 2;
+
+    // retry interval, default 1000ms
+    private int interval = 1000;
+
+    // Default value is false, indicating that only requests with network-level errors will be retried.
+    // If set to true, all failed requests will be retried, including network-level errors and non-2xx responses.
+    private boolean retryOnNonSuccess = false;
+}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/McpSinkConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/McpSinkConfig.java
new file mode 100644
index 0000000..ce64551
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/McpSinkConfig.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.config.connector.mcp;
+
+import org.apache.eventmesh.common.config.connector.SinkConfig;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class McpSinkConfig extends SinkConfig {
+
+    public SinkConnectorConfig connectorConfig;
+}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/McpSourceConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/McpSourceConfig.java
new file mode 100644
index 0000000..320cc37
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/McpSourceConfig.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.config.connector.mcp;
+
+import org.apache.eventmesh.common.config.connector.SourceConfig;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class McpSourceConfig extends SourceConfig {
+
+    public SourceConnectorConfig connectorConfig;
+}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/SinkConnectorConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/SinkConnectorConfig.java
new file mode 100644
index 0000000..54a02fb
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/SinkConnectorConfig.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.config.connector.mcp;
+
+
+import lombok.Data;
+
+@Data
+public class SinkConnectorConfig {
+
+    private String connectorName;
+
+    private String[] urls;
+
+    // keepAlive, default true
+    private boolean keepAlive = true;
+
+    // timeunit: ms, default 60000ms
+    private int keepAliveTimeout = 60 * 1000; // Keep units consistent
+
+    // timeunit: ms, default 5000ms, recommended scope: 5000ms - 10000ms
+    private int connectionTimeout = 5000;
+
+    // timeunit: ms, default 5000ms
+    private int idleTimeout = 5000;
+
+    // maximum number of HTTP/1 connections a client will pool, default 50
+    private int maxConnectionPoolSize = 50;
+
+    // retry config
+    private McpRetryConfig retryConfig = new McpRetryConfig();
+
+    private String deliveryStrategy = "ROUND_ROBIN";
+
+    private boolean skipDeliverException = false;
+
+    // managed pipelining param, default true
+    private boolean isParallelized = true;
+
+    private int parallelism = 2;
+
+
+    /**
+     * Fill default values if absent (When there are multiple default values for a field)
+     *
+     * @param config SinkConnectorConfig
+     */
+    public static void populateFieldsWithDefaults(SinkConnectorConfig config) {
+        /*
+         * set default values for idleTimeout
+         * recommended scope: common(5s - 10s), webhook(15s - 30s)
+         */
+        final int commonHttpIdleTimeout = 5000;
+
+        // Set default values for idleTimeout
+        if (config.getIdleTimeout() == 0) {
+            config.setIdleTimeout(commonHttpIdleTimeout);
+        }
+
+    }
+}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/SourceConnectorConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/SourceConnectorConfig.java
new file mode 100644
index 0000000..1880894
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/SourceConnectorConfig.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.config.connector.mcp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import lombok.Data;
+
+@Data
+public class SourceConnectorConfig {
+
+    private String connectorName;
+
+    private String path = "/";
+
+    private int port;
+
+    // timeunit: ms, default 5000ms
+    private int idleTimeout = 5000;
+
+    /**
+     * <ul>
+     *     <li>The maximum size allowed for form attributes when Content-Type is application/x-www-form-urlencoded or multipart/form-data </li>
+     *     <li>Default is 1MB (1024 * 1024 bytes). </li>
+     *     <li>If you receive a "size exceed allowed maximum capacity" error, you can increase this value. </li>
+     *     <li>Note: This applies only when handling form data submissions.</li>
+     * </ul>
+     */
+    private int maxFormAttributeSize = 1024 * 1024;
+
+    // max size of the queue, default 1000
+    private int maxStorageSize = 1000;
+
+    // batch size, default 10
+    private int batchSize = 10;
+
+    // protocol, default CloudEvent
+    private String protocol = "Mcp";
+
+    // extra config, e.g. GitHub secret
+    private Map<String, String> extraConfig = new HashMap<>();
+
+    // data consistency enabled, default true
+    private boolean dataConsistencyEnabled = false;
+
+    private String forwardPath;
+}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java
index f232854..9b1bfe6 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java
@@ -162,6 +162,17 @@
         }
     }
 
+    public static <T> T parseObject(String text, TypeReference<T> typeReference) {
+        if (StringUtils.isEmpty(text)) {
+            return null;
+        }
+        try {
+            return OBJECT_MAPPER.readValue(text, typeReference);
+        } catch (JsonProcessingException e) {
+            throw new JsonException("deserialize json string to object error", e);
+        }
+    }
+
     /**
      * parse json string to object.
      *
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/resources/server-config.yml b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/server-config.yml
index 0cd7b5b..5f66dd0 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/resources/server-config.yml
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/server-config.yml
@@ -16,4 +16,4 @@
 #
 
 sourceEnable: true
-sinkEnable: false
+sinkEnable: true
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/build.gradle b/eventmesh-connectors/eventmesh-connector-mcp/build.gradle
new file mode 100644
index 0000000..82072c2
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/build.gradle
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+dependencies {
+    api project(":eventmesh-openconnect:eventmesh-openconnect-java")
+    implementation project(":eventmesh-common")
+    implementation project(":eventmesh-connectors:eventmesh-connector-http")
+    implementation project(":eventmesh-protocol-plugin:eventmesh-protocol-api")
+    implementation "io.cloudevents:cloudevents-core"
+    implementation "com.google.guava:guava"
+    implementation "io.cloudevents:cloudevents-json-jackson"
+    implementation ("io.grpc:grpc-protobuf:1.68.0") {
+        exclude group: "com.google.protobuf", module: "protobuf-java"
+    }
+    implementation 'io.cloudevents:cloudevents-http-vertx:3.0.0'
+    implementation 'io.vertx:vertx-web:4.5.8'
+    implementation 'io.vertx:vertx-web-client:4.5.9'
+    implementation 'dev.failsafe:failsafe:3.3.2'
+
+
+    testImplementation 'org.apache.httpcomponents.client5:httpclient5:5.4'
+    testImplementation 'org.apache.httpcomponents.client5:httpclient5-fluent:5.4'
+    testImplementation 'org.mock-server:mockserver-netty:5.15.0'
+    implementation 'io.netty:netty-codec-http:4.1.114.Final'
+    compileOnly 'org.projectlombok:lombok'
+    annotationProcessor 'org.projectlombok:lombok'
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/gradle.properties b/eventmesh-connectors/eventmesh-connector-mcp/gradle.properties
new file mode 100644
index 0000000..5e98eb9
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/gradle.properties
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+pluginType=connector
+pluginName=mcp
\ No newline at end of file
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/config/McpServerConfig.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/config/McpServerConfig.java
new file mode 100644
index 0000000..33357b6
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/config/McpServerConfig.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.eventmesh.connector.mcp.config;
+
+import org.apache.eventmesh.common.config.connector.Config;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class McpServerConfig extends Config {
+
+    private boolean sourceEnable;
+
+    private boolean sinkEnable;
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/server/McpConnectServer.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/server/McpConnectServer.java
new file mode 100644
index 0000000..71ea9ae
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/server/McpConnectServer.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.server;
+
+import org.apache.eventmesh.connector.mcp.config.McpServerConfig;
+import org.apache.eventmesh.connector.mcp.sink.McpSinkConnector;
+import org.apache.eventmesh.connector.mcp.source.McpSourceConnector;
+import org.apache.eventmesh.openconnect.Application;
+import org.apache.eventmesh.openconnect.util.ConfigUtil;
+
+public class McpConnectServer {
+    public static void main(String[] args) throws Exception {
+        McpServerConfig serverConfig = ConfigUtil.parse(McpServerConfig.class, "server-config.yml");
+
+        if (serverConfig.isSourceEnable()) {
+            Application mcpSourceApp = new Application();
+            mcpSourceApp.run(McpSourceConnector.class);
+        }
+
+        if (serverConfig.isSinkEnable()) {
+            Application mcpSinkApp = new Application();
+            mcpSinkApp.run(McpSinkConnector.class);
+        }
+    }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/McpSinkConnector.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/McpSinkConnector.java
new file mode 100644
index 0000000..3d65fb9
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/McpSinkConnector.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.sink;
+
+import org.apache.eventmesh.common.EventMeshThreadFactory;
+import org.apache.eventmesh.common.config.connector.Config;
+import org.apache.eventmesh.common.config.connector.mcp.McpSinkConfig;
+import org.apache.eventmesh.common.config.connector.mcp.SinkConnectorConfig;
+import org.apache.eventmesh.connector.mcp.sink.handler.McpSinkHandler;
+import org.apache.eventmesh.connector.mcp.sink.handler.impl.CommonMcpSinkHandler;
+import org.apache.eventmesh.connector.mcp.sink.handler.impl.McpSinkHandlerRetryWrapper;
+import org.apache.eventmesh.openconnect.api.ConnectorCreateService;
+import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
+import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
+import org.apache.eventmesh.openconnect.api.sink.Sink;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import lombok.Getter;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+public class McpSinkConnector implements Sink, ConnectorCreateService<Sink> {
+
+    private McpSinkConfig mcpSinkConfig;
+
+    @Getter
+    private McpSinkHandler sinkHandler;
+
+    private ThreadPoolExecutor executor;
+
+    private final AtomicBoolean isStart = new AtomicBoolean(false);
+
+    @Override
+    public Class<? extends Config> configClass() {
+        return McpSinkConfig.class;
+    }
+
+    @Override
+    public Sink create() {
+        return new McpSinkConnector();
+    }
+
+    @Override
+    public void init(Config config) throws Exception {
+        this.mcpSinkConfig = (McpSinkConfig) config;
+        doInit();
+    }
+
+    @Override
+    public void init(ConnectorContext connectorContext) throws Exception {
+        SinkConnectorContext sinkConnectorContext = (SinkConnectorContext) connectorContext;
+        this.mcpSinkConfig = (McpSinkConfig) sinkConnectorContext.getSinkConfig();
+        doInit();
+    }
+
+    @SneakyThrows
+    private void doInit() {
+        // Fill default values if absent
+        SinkConnectorConfig.populateFieldsWithDefaults(this.mcpSinkConfig.connectorConfig);
+        // Create different handlers for different configurations
+        McpSinkHandler nonRetryHandler;
+
+        nonRetryHandler = new CommonMcpSinkHandler(this.mcpSinkConfig.connectorConfig);
+
+        int maxRetries = this.mcpSinkConfig.connectorConfig.getRetryConfig().getMaxRetries();
+        if (maxRetries == 0) {
+            // Use the original sink handler
+            this.sinkHandler = nonRetryHandler;
+        } else if (maxRetries > 0) {
+            // Wrap the sink handler with a retry handler
+            this.sinkHandler = new McpSinkHandlerRetryWrapper(this.mcpSinkConfig.connectorConfig, nonRetryHandler);
+        } else {
+            throw new IllegalArgumentException("Max retries must be greater than or equal to 0.");
+        }
+
+        boolean isParallelized = this.mcpSinkConfig.connectorConfig.isParallelized();
+        int parallelism = isParallelized ? this.mcpSinkConfig.connectorConfig.getParallelism() : 1;
+
+        // Use the executor's built-in queue with a reasonable capacity
+        executor = new ThreadPoolExecutor(
+                parallelism,
+                parallelism,
+                0L,
+                TimeUnit.MILLISECONDS,
+                new LinkedBlockingQueue<>(), // Built-in queue with capacity
+                new EventMeshThreadFactory("mcp-sink-handler")
+        );
+    }
+
+    @Override
+    public void start() throws Exception {
+        this.sinkHandler.start();
+        isStart.set(true);
+    }
+
+    @Override
+    public void commit(ConnectRecord record) {
+
+    }
+
+    @Override
+    public String name() {
+        return this.mcpSinkConfig.connectorConfig.getConnectorName();
+    }
+
+    @Override
+    public void onException(ConnectRecord record) {
+
+    }
+
+    @Override
+    public void stop() throws Exception {
+        isStart.set(false);
+
+        log.info("Stopping mcp sink connector, shutting down executor...");
+        executor.shutdown();
+
+        try {
+            // Wait for existing tasks to complete
+            if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
+                log.warn("Executor did not terminate gracefully, forcing shutdown");
+                executor.shutdownNow();
+                // Wait a bit more for tasks to respond to being cancelled
+                if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
+                    log.error("Executor did not terminate after forced shutdown");
+                }
+            }
+        } catch (InterruptedException e) {
+            log.warn("Interrupted while waiting for executor termination");
+            executor.shutdownNow();
+            Thread.currentThread().interrupt();
+        }
+
+        this.sinkHandler.stop();
+    }
+
+    @Override
+    public void put(List<ConnectRecord> sinkRecords) {
+        if (!isStart.get()) {
+            log.warn("Connector is not started, ignoring sink records");
+            return;
+        }
+
+        for (ConnectRecord sinkRecord : sinkRecords) {
+            if (Objects.isNull(sinkRecord)) {
+                log.warn("ConnectRecord data is null, ignore.");
+                continue;
+            }
+            log.info("McpSinkConnector put record: {}", sinkRecord);
+
+            try {
+                // Use executor.submit() instead of custom queue
+                executor.submit(() -> {
+                    try {
+                        sinkHandler.handle(sinkRecord);
+                    } catch (Exception e) {
+                        log.error("Failed to handle sink record via mcp", e);
+                    }
+                });
+            } catch (Exception e) {
+                log.error("Failed to submit sink record to executor", e);
+            }
+        }
+    }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpAttemptEvent.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpAttemptEvent.java
new file mode 100644
index 0000000..451fc35
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpAttemptEvent.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.sink.data;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Single MCP attempt event
+ */
+public class McpAttemptEvent {
+
+    public static final String PREFIX = "mcp-attempt-event-";
+
+    private final int maxAttempts;
+
+    private final AtomicInteger attempts;
+
+    private Throwable lastException;
+
+
+    public McpAttemptEvent(int maxAttempts) {
+        this.maxAttempts = maxAttempts;
+        this.attempts = new AtomicInteger(0);
+    }
+
+    /**
+     * Increment the attempts
+     */
+    public void incrementAttempts() {
+        attempts.incrementAndGet();
+    }
+
+    /**
+     * Update the event, incrementing the attempts and setting the last exception
+     *
+     * @param exception the exception to update, can be null
+     */
+    public void updateEvent(Throwable exception) {
+        // increment the attempts
+        incrementAttempts();
+
+        // update the last exception
+        lastException = exception;
+    }
+
+    /**
+     * Check if the attempts are less than the maximum attempts
+     *
+     * @return true if the attempts are less than the maximum attempts, false otherwise
+     */
+    public boolean canAttempt() {
+        return attempts.get() < maxAttempts;
+    }
+
+    public boolean isComplete() {
+        if (attempts.get() == 0) {
+            // No start yet
+            return false;
+        }
+
+        // If no attempt can be made or the last exception is null, the event completed
+        return !canAttempt() || lastException == null;
+    }
+
+
+    public int getMaxAttempts() {
+        return maxAttempts;
+    }
+
+    public int getAttempts() {
+        return attempts.get();
+    }
+
+    public Throwable getLastException() {
+        return lastException;
+    }
+
+    /**
+     * Get the limited exception message with the default limit of 256
+     *
+     * @return the limited exception message
+     */
+    public String getLimitedExceptionMessage() {
+        return getLimitedExceptionMessage(256);
+    }
+
+    /**
+     * Get the limited exception message with the specified limit
+     *
+     * @param maxLimit the maximum limit of the exception message
+     * @return the limited exception message
+     */
+    public String getLimitedExceptionMessage(int maxLimit) {
+        if (lastException == null) {
+            return "";
+        }
+        String message = lastException.getMessage();
+        if (message == null) {
+            return "";
+        }
+        if (message.length() > maxLimit) {
+            return message.substring(0, maxLimit);
+        }
+        return message;
+    }
+
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpConnectRecord.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpConnectRecord.java
new file mode 100644
index 0000000..26f9c1e
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpConnectRecord.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.sink.data;
+
+import org.apache.eventmesh.common.remote.offset.http.HttpRecordOffset;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.KeyValue;
+
+import java.io.Serializable;
+import java.time.LocalDateTime;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import lombok.Builder;
+import lombok.Getter;
+
+/**
+ * a special ConnectRecord for McpSinkConnector
+ */
+@Getter
+@Builder
+public class McpConnectRecord implements Serializable {
+
+    private static final long serialVersionUID = 5271462532332251473L;
+
+    /**
+     * The unique identifier for the McpConnectRecord
+     */
+    private final String mcpRecordId = UUID.randomUUID().toString();
+
+    /**
+     * The time when the McpConnectRecord was created
+     */
+    private LocalDateTime createTime;
+
+    /**
+     * The type of the McpConnectRecord
+     */
+    private String type;
+
+    /**
+     * The event id of the McpConnectRecord
+     */
+    private String eventId;
+
+    private Object data;
+
+    private KeyValue extensions;
+
+    @Override
+    public String toString() {
+        return "McpConnectRecord{"
+                + "createTime=" + createTime
+                + ", mcpRecordId='" + mcpRecordId
+                + ", type='" + type
+                + ", eventId='" + eventId
+                + ", data=" + data
+                + ", extensions=" + extensions
+                + '}';
+    }
+
+    /**
+     * Convert ConnectRecord to McpConnectRecord
+     *
+     * @param record the ConnectRecord to convert
+     * @return the converted McpConnectRecord
+     */
+    public static McpConnectRecord convertConnectRecord(ConnectRecord record, String type) {
+        Map<String, ?> offsetMap = new HashMap<>();
+        if (record != null && record.getPosition() != null && record.getPosition().getRecordOffset() != null) {
+            if (HttpRecordOffset.class.equals(record.getPosition().getRecordOffsetClazz())) {
+                offsetMap = ((HttpRecordOffset) record.getPosition().getRecordOffset()).getOffsetMap();
+            }
+        }
+        String offset = "0";
+        if (!offsetMap.isEmpty()) {
+            offset = offsetMap.values().iterator().next().toString();
+        }
+        if (record.getData() instanceof byte[]) {
+            String data = Base64.getEncoder().encodeToString((byte[]) record.getData());
+            record.addExtension("isBase64", true);
+            return McpConnectRecord.builder()
+                    .type(type)
+                    .createTime(LocalDateTime.now())
+                    .eventId(type + "-" + offset)
+                    .data(data)
+                    .extensions(record.getExtensions())
+                    .build();
+        } else {
+            record.addExtension("isBase64", false);
+            return McpConnectRecord.builder()
+                    .type(type)
+                    .createTime(LocalDateTime.now())
+                    .eventId(type + "-" + offset)
+                    .data(record.getData())
+                    .extensions(record.getExtensions())
+                    .build();
+        }
+    }
+
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpExportMetadata.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpExportMetadata.java
new file mode 100644
index 0000000..72595b2
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpExportMetadata.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.sink.data;
+
+import java.io.Serializable;
+import java.time.LocalDateTime;
+
+import lombok.Builder;
+import lombok.Data;
+
+/**
+ * Metadata for an MCP export operation.
+ */
+@Data
+@Builder
+public class McpExportMetadata implements Serializable {
+
+    private static final long serialVersionUID = 1121010466793041920L;
+
+    private String url;
+
+    private int code;
+
+    private String message;
+
+    private LocalDateTime receivedTime;
+
+    private String recordId;
+
+    private String retriedBy;
+
+    private int retryNum;
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpExportRecord.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpExportRecord.java
new file mode 100644
index 0000000..c9a35c1
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpExportRecord.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.sink.data;
+
+import java.io.Serializable;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+/**
+ * Represents an MCP export record containing metadata and data to be exported.
+ */
+@Data
+@AllArgsConstructor
+public class McpExportRecord implements Serializable {
+
+    private static final long serialVersionUID = 6010283911452947157L;
+
+    private McpExportMetadata metadata;
+
+    private Object data;
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpExportRecordPage.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpExportRecordPage.java
new file mode 100644
index 0000000..3e0e615
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpExportRecordPage.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.sink.data;
+
+import java.io.Serializable;
+import java.util.List;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+/**
+ * Represents a page of MCP export records.
+ */
+@Data
+@AllArgsConstructor
+public class McpExportRecordPage implements Serializable {
+
+    private static final long serialVersionUID = 1143791658357035990L;
+
+    private int pageNum;
+
+    private int pageSize;
+
+    private List<McpExportRecord> pageItems;
+
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/MultiMcpRequestContext.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/MultiMcpRequestContext.java
new file mode 100644
index 0000000..f24d0e3
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/MultiMcpRequestContext.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.sink.data;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ * Multi Mcp request context
+ */
+public class MultiMcpRequestContext {
+
+    public static final String NAME = "multi-http-request-context";
+
+    /**
+     * The remaining requests to be processed.
+     */
+    private final AtomicInteger remainingRequests;
+
+    /**
+     * The last failed event.
+     * If retries occur but still fail, it will be logged, and only the last one will be retained.
+     */
+    private McpAttemptEvent lastFailedEvent;
+
+    public MultiMcpRequestContext(int remainingEvents) {
+        this.remainingRequests = new AtomicInteger(remainingEvents);
+    }
+
+    /**
+     * Decrement the remaining requests by 1.
+     */
+    public void decrementRemainingRequests() {
+        remainingRequests.decrementAndGet();
+    }
+
+    /**
+     * Check if all requests have been processed.
+     *
+     * @return true if all requests have been processed, false otherwise.
+     */
+    public boolean isAllRequestsProcessed() {
+        return remainingRequests.get() == 0;
+    }
+
+    public int getRemainingRequests() {
+        return remainingRequests.get();
+    }
+
+    public McpAttemptEvent getLastFailedEvent() {
+        return lastFailedEvent;
+    }
+
+    public void setLastFailedEvent(McpAttemptEvent lastFailedEvent) {
+        this.lastFailedEvent = lastFailedEvent;
+    }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/AbstractMcpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/AbstractMcpSinkHandler.java
new file mode 100644
index 0000000..5c7435d
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/AbstractMcpSinkHandler.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.sink.handler;
+
+import org.apache.eventmesh.common.config.connector.mcp.SinkConnectorConfig;
+import org.apache.eventmesh.connector.mcp.sink.data.McpAttemptEvent;
+import org.apache.eventmesh.connector.mcp.sink.data.McpConnectRecord;
+import org.apache.eventmesh.connector.mcp.sink.data.MultiMcpRequestContext;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import lombok.Getter;
+
+public abstract class AbstractMcpSinkHandler implements McpSinkHandler {
+    @Getter
+    private final SinkConnectorConfig sinkConnectorConfig;
+
+    @Getter
+    private final List<URI> urls;
+
+    private final McpDeliveryStrategy deliveryStrategy;
+
+    private int roundRobinIndex = 0;
+
+    protected AbstractMcpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
+        this.sinkConnectorConfig = sinkConnectorConfig;
+        this.deliveryStrategy = McpDeliveryStrategy.valueOf(sinkConnectorConfig.getDeliveryStrategy());
+        // Initialize URLs
+        String[] urlStrings = sinkConnectorConfig.getUrls();
+        this.urls = Arrays.stream(urlStrings)
+                .map(URI::create)
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Processes a ConnectRecord by sending it over HTTP or HTTPS. This method should be called for each ConnectRecord that needs to be processed.
+     *
+     * @param record the ConnectRecord to process
+     */
+    @Override
+    public void handle(ConnectRecord record) {
+        // build attributes
+        Map<String, Object> attributes = new ConcurrentHashMap<>();
+
+        switch (deliveryStrategy) {
+            case ROUND_ROBIN:
+                attributes.put(MultiMcpRequestContext.NAME, new MultiMcpRequestContext(1));
+                URI url = urls.get(roundRobinIndex);
+                roundRobinIndex = (roundRobinIndex + 1) % urls.size();
+                sendRecordToUrl(record, attributes, url);
+                break;
+            case BROADCAST:
+                attributes.put(MultiMcpRequestContext.NAME, new MultiMcpRequestContext(urls.size()));
+                // send the record to all URLs
+                urls.forEach(url0 -> sendRecordToUrl(record, attributes, url0));
+                break;
+            default:
+                throw new IllegalArgumentException("Unknown delivery strategy: " + deliveryStrategy);
+        }
+    }
+
+    private void sendRecordToUrl(ConnectRecord record, Map<String, Object> attributes, URI url) {
+        // convert ConnectRecord to HttpConnectRecord
+        String type = String.format("%s.%s.%s",
+                this.sinkConnectorConfig.getConnectorName(), url.getScheme(),
+                "common");
+        McpConnectRecord mcpConnectRecord = McpConnectRecord.convertConnectRecord(record, type);
+
+        // add AttemptEvent to the attributes
+        McpAttemptEvent attemptEvent = new McpAttemptEvent(this.sinkConnectorConfig.getRetryConfig().getMaxRetries() + 1);
+        attributes.put(McpAttemptEvent.PREFIX + mcpConnectRecord.getMcpRecordId(), attemptEvent);
+
+        // deliver the record
+        deliver(url, mcpConnectRecord, attributes, record);
+    }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/McpDeliveryStrategy.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/McpDeliveryStrategy.java
new file mode 100644
index 0000000..07cbbe3
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/McpDeliveryStrategy.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.sink.handler;
+
+public enum McpDeliveryStrategy {
+    ROUND_ROBIN,
+    BROADCAST
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/McpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/McpSinkHandler.java
new file mode 100644
index 0000000..c10d963
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/McpSinkHandler.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.sink.handler;
+
+import org.apache.eventmesh.connector.mcp.sink.data.McpConnectRecord;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.net.URI;
+import java.util.Map;
+
+import io.vertx.core.Future;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.ext.web.client.HttpResponse;
+
+/**
+ * Interface for handling ConnectRecords via HTTP or HTTPS. Classes implementing this interface are responsible for processing ConnectRecords by
+ * sending them over HTTP or HTTPS, with additional support for handling multiple requests and asynchronous processing.
+ *
+ * <p>Any class that needs to process ConnectRecords via HTTP or HTTPS should implement this interface.
+ * Implementing classes must provide implementations for the {@link #start()}, {@link #handle(ConnectRecord)},
+ * {@link #deliver(URI, McpConnectRecord, Map, ConnectRecord)}, and {@link #stop()} methods.</p>
+ *
+ * <p>Implementing classes should ensure thread safety and handle MCP communication efficiently.
+ * The {@link #start()} method initializes any necessary resources for MCP communication. The {@link #handle(ConnectRecord)} method processes a
+ * ConnectRecord by sending it over HTTP or HTTPS. The {@link #deliver(URI, McpConnectRecord, Map, ConnectRecord)} method processes HttpConnectRecord
+ * on specified URL while returning its own processing logic {@link #stop()} method releases any resources used for MCP communication.</p>
+ *
+ * <p>It's recommended to handle exceptions gracefully within the {@link #deliver(URI, McpConnectRecord, Map, ConnectRecord)} method
+ * to prevent message loss or processing interruptions.</p>
+ */
+public interface McpSinkHandler {
+
+    /**
+     * Initializes the MCP handler. This method should be called before using the handler.
+     */
+    void start();
+
+    /**
+     * Processes a ConnectRecord by sending it over HTTP or HTTPS. This method should be called for each ConnectRecord that needs to be processed.
+     *
+     * @param record the ConnectRecord to process
+     */
+    void handle(ConnectRecord record);
+
+
+    /**
+     * Processes HttpConnectRecord on specified URL while returning its own processing logic
+     *
+     * @param url               URI to which the HttpConnectRecord should be sent
+     * @param mcpConnectRecord HttpConnectRecord to process
+     * @param attributes        additional attributes to be used in processing
+     * @return processing chain
+     */
+    Future<HttpResponse<Buffer>> deliver(URI url, McpConnectRecord mcpConnectRecord, Map<String, Object> attributes, ConnectRecord connectRecord);
+
+    /**
+     * Cleans up and releases resources used by the MCP handler. This method should be called when the handler is no longer needed.
+     */
+    void stop();
+}
+
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/impl/CommonMcpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/impl/CommonMcpSinkHandler.java
new file mode 100644
index 0000000..1d884f4
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/impl/CommonMcpSinkHandler.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.sink.handler.impl;
+
+import org.apache.eventmesh.common.config.connector.mcp.SinkConnectorConfig;
+import org.apache.eventmesh.common.utils.JsonUtils;
+import org.apache.eventmesh.connector.http.util.HttpUtils;
+import org.apache.eventmesh.connector.mcp.sink.data.McpAttemptEvent;
+import org.apache.eventmesh.connector.mcp.sink.data.McpConnectRecord;
+import org.apache.eventmesh.connector.mcp.sink.data.MultiMcpRequestContext;
+import org.apache.eventmesh.connector.mcp.sink.handler.AbstractMcpSinkHandler;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.net.URI;
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.vertx.core.Future;
+import io.vertx.core.MultiMap;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpHeaders;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.WebClientOptions;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * Common MCP Sink Handler implementation to handle ConnectRecords by sending them over MCP to configured URLs.
+ *
+ * <p>This handler initializes a WebClient for making HTTP requests based on the provided SinkConnectorConfig.
+ * It handles processing ConnectRecords by converting them to HttpConnectRecord and sending them asynchronously to each configured URL using the
+ * WebClient.</p>
+ *
+ * <p>The handler uses Vert.x's WebClient to perform HTTP/HTTPS requests. It initializes the WebClient in the {@link #start()}
+ * method and closes it in the {@link #stop()} method to manage resources efficiently.</p>
+ *
+ * <p>Each ConnectRecord is processed and sent to all configured URLs concurrently using asynchronous HTTP requests.</p>
+ */
+@Slf4j
+@Getter
+public class CommonMcpSinkHandler extends AbstractMcpSinkHandler {
+
+    private WebClient webClient;
+
+
+    public CommonMcpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
+        super(sinkConnectorConfig);
+    }
+
+    /**
+     * Initializes the WebClient for making HTTP requests based on the provided SinkConnectorConfig.
+     */
+    @Override
+    public void start() {
+        // Create WebClient
+        doInitWebClient();
+    }
+
+    /**
+     * Initializes the WebClient with the provided configuration options.
+     */
+    private void doInitWebClient() {
+        SinkConnectorConfig sinkConnectorConfig = getSinkConnectorConfig();
+        final Vertx vertx = Vertx.vertx();
+        WebClientOptions options = new WebClientOptions()
+            .setKeepAlive(sinkConnectorConfig.isKeepAlive())
+            .setKeepAliveTimeout(sinkConnectorConfig.getKeepAliveTimeout() / 1000)
+            .setIdleTimeout(sinkConnectorConfig.getIdleTimeout())
+            .setIdleTimeoutUnit(TimeUnit.MILLISECONDS)
+            .setConnectTimeout(sinkConnectorConfig.getConnectionTimeout())
+            .setMaxPoolSize(sinkConnectorConfig.getMaxConnectionPoolSize())
+            .setPipelining(sinkConnectorConfig.isParallelized());
+        this.webClient = WebClient.create(vertx, options);
+    }
+
+    /**
+     * Processes HttpConnectRecord on specified URL while returning its own processing logic. This method sends the HttpConnectRecord to the specified
+     * URL using the WebClient.
+     *
+     * @param url               URI to which the HttpConnectRecord should be sent
+     * @param mcpConnectRecord HttpConnectRecord to process
+     * @param attributes        additional attributes to be used in processing
+     * @return processing chain
+     */
+    @Override
+    public Future<HttpResponse<Buffer>> deliver(URI url, McpConnectRecord mcpConnectRecord, Map<String, Object> attributes,
+                                                ConnectRecord connectRecord) {
+        // create headers
+        Map<String, Object> extensionMap = new HashMap<>();
+        Set<String> extensionKeySet = mcpConnectRecord.getExtensions().keySet();
+        for (String extensionKey : extensionKeySet) {
+            Object v = mcpConnectRecord.getExtensions().getObject(extensionKey);
+            extensionMap.put(extensionKey, v);
+        }
+
+        MultiMap headers = HttpHeaders.headers()
+            .set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=utf-8")
+            .set(HttpHeaderNames.ACCEPT, "application/json; charset=utf-8")
+            .set("extension", JsonUtils.toJSONString(extensionMap));
+        // get timestamp and offset
+        Long timestamp = mcpConnectRecord.getCreateTime()
+            .atZone(ZoneId.systemDefault())
+            .toInstant()
+            .toEpochMilli();
+
+        // send the request
+        return this.webClient.post(url.getPath())
+            .host(url.getHost())
+            .port(url.getPort() == -1 ? (Objects.equals(url.getScheme(), "https") ? 443 : 80) : url.getPort())
+            .putHeaders(headers)
+            .ssl(Objects.equals(url.getScheme(), "https"))
+            .sendJson(mcpConnectRecord.getData())
+            .onSuccess(res -> {
+                log.info("Request sent successfully. Record: timestamp={}", timestamp);
+
+                Exception e = null;
+
+                // log the response
+                if (HttpUtils.is2xxSuccessful(res.statusCode())) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Received successful response: statusCode={}. Record: timestamp={}, responseBody={}",
+                            res.statusCode(), timestamp, res.bodyAsString());
+                    } else {
+                        log.info("Received successful response: statusCode={}. Record: timestamp={}", res.statusCode(), timestamp);
+                    }
+                } else {
+                    if (log.isDebugEnabled()) {
+                        log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, responseBody={}",
+                            res.statusCode(), timestamp, res.bodyAsString());
+                    } else {
+                        log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}", res.statusCode(), timestamp);
+                    }
+
+                    e = new RuntimeException("Unexpected HTTP response code: " + res.statusCode());
+                }
+
+                // try callback
+                tryCallback(mcpConnectRecord, e, attributes, connectRecord);
+            }).onFailure(err -> {
+                log.error("Request failed to send. Record: timestamp={}", timestamp, err);
+
+                // try callback
+                tryCallback(mcpConnectRecord, err, attributes, connectRecord);
+            });
+    }
+
+    /**
+     * Tries to call the callback based on the result of the request.
+     *
+     * @param mcpConnectRecord the McpConnectRecord to use
+     * @param e                 the exception thrown during the request, may be null
+     * @param attributes        additional attributes to be used in processing
+     */
+    private void tryCallback(McpConnectRecord mcpConnectRecord, Throwable e, Map<String, Object> attributes, ConnectRecord record) {
+        // get and update the attempt event
+        McpAttemptEvent attemptEvent = (McpAttemptEvent) attributes.get(McpAttemptEvent.PREFIX + mcpConnectRecord.getMcpRecordId());
+        attemptEvent.updateEvent(e);
+
+        // get and update the multiHttpRequestContext
+        MultiMcpRequestContext multiMcpRequestContext = getAndUpdateMultiMcpRequestContext(attributes, attemptEvent);
+
+        if (multiMcpRequestContext.isAllRequestsProcessed()) {
+            // do callback
+            if (record.getCallback() == null) {
+                if (log.isDebugEnabled()) {
+                    log.warn("ConnectRecord callback is null. Ignoring callback. {}", record);
+                } else {
+                    log.warn("ConnectRecord callback is null. Ignoring callback.");
+                }
+                return;
+            }
+
+            // get the last failed event
+            McpAttemptEvent lastFailedEvent = multiMcpRequestContext.getLastFailedEvent();
+            if (lastFailedEvent == null) {
+                // success
+                record.getCallback().onSuccess(convertToSendResult(record));
+            } else {
+                // failure
+                record.getCallback().onException(buildSendExceptionContext(record, lastFailedEvent.getLastException()));
+            }
+        } else {
+            log.warn("still have requests to process, size {}|attempt num {}",
+                multiMcpRequestContext.getRemainingRequests(), attemptEvent.getAttempts());
+        }
+    }
+
+
+    /**
+     * Gets and updates the multi mcp request context based on the provided attributes and HttpConnectRecord.
+     *
+     * @param attributes   the attributes to use
+     * @param attemptEvent the McpAttemptEvent to use
+     * @return the updated multi mcp request context
+     */
+    private MultiMcpRequestContext getAndUpdateMultiMcpRequestContext(Map<String, Object> attributes, McpAttemptEvent attemptEvent) {
+        // get the multi http request context
+        MultiMcpRequestContext multiMcpRequestContext = (MultiMcpRequestContext) attributes.get(MultiMcpRequestContext.NAME);
+
+        // Check if the current attempted event has completed
+        if (attemptEvent.isComplete()) {
+            // decrement the counter
+            multiMcpRequestContext.decrementRemainingRequests();
+
+            if (attemptEvent.getLastException() != null) {
+                // if all attempts are exhausted, set the last failed event
+                multiMcpRequestContext.setLastFailedEvent(attemptEvent);
+            }
+        }
+
+        return multiMcpRequestContext;
+    }
+
+    private SendResult convertToSendResult(ConnectRecord record) {
+        SendResult result = new SendResult();
+        result.setMessageId(record.getRecordId());
+        if (org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic"))) {
+            result.setTopic(record.getExtension("topic"));
+        }
+        return result;
+    }
+
+    private SendExceptionContext buildSendExceptionContext(ConnectRecord record, Throwable e) {
+        SendExceptionContext sendExceptionContext = new SendExceptionContext();
+        sendExceptionContext.setMessageId(record.getRecordId());
+        sendExceptionContext.setCause(e);
+        if (org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic"))) {
+            sendExceptionContext.setTopic(record.getExtension("topic"));
+        }
+        return sendExceptionContext;
+    }
+
+
+    /**
+     * Cleans up and releases resources used by the MCP handler.
+     */
+    @Override
+    public void stop() {
+        if (this.webClient != null) {
+            this.webClient.close();
+        } else {
+            log.warn("WebClient is null, ignore.");
+        }
+    }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/impl/McpSinkHandlerRetryWrapper.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/impl/McpSinkHandlerRetryWrapper.java
new file mode 100644
index 0000000..c2e9908
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/impl/McpSinkHandlerRetryWrapper.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.sink.handler.impl;
+
+import org.apache.eventmesh.common.config.connector.mcp.McpRetryConfig;
+import org.apache.eventmesh.common.config.connector.mcp.SinkConnectorConfig;
+import org.apache.eventmesh.connector.http.util.HttpUtils;
+import org.apache.eventmesh.connector.mcp.sink.data.McpConnectRecord;
+import org.apache.eventmesh.connector.mcp.sink.handler.AbstractMcpSinkHandler;
+import org.apache.eventmesh.connector.mcp.sink.handler.McpSinkHandler;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.net.ConnectException;
+import java.net.URI;
+import java.time.Duration;
+import java.util.Map;
+
+import io.vertx.core.Future;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.ext.web.client.HttpResponse;
+
+import lombok.extern.slf4j.Slf4j;
+
+import dev.failsafe.Failsafe;
+import dev.failsafe.RetryPolicy;
+
+/**
+ * McpSinkHandlerRetryWrapper is a wrapper class for the McpSinkHandler that provides retry functionality for failed Mcp requests.
+ */
+@Slf4j
+public class McpSinkHandlerRetryWrapper extends AbstractMcpSinkHandler {
+
+    private final McpRetryConfig mcpRetryConfig;
+
+    private final McpSinkHandler sinkHandler;
+
+    private final RetryPolicy<HttpResponse<Buffer>> retryPolicy;
+
+    public McpSinkHandlerRetryWrapper(SinkConnectorConfig sinkConnectorConfig, McpSinkHandler sinkHandler) {
+        super(sinkConnectorConfig);
+        this.sinkHandler = sinkHandler;
+        this.mcpRetryConfig = getSinkConnectorConfig().getRetryConfig();
+        this.retryPolicy = buildRetryPolicy();
+    }
+
+    private RetryPolicy<HttpResponse<Buffer>> buildRetryPolicy() {
+        return RetryPolicy.<HttpResponse<Buffer>>builder()
+            .handleIf(e -> e instanceof ConnectException)
+            .handleResultIf(response -> mcpRetryConfig.isRetryOnNonSuccess() && !HttpUtils.is2xxSuccessful(response.statusCode()))
+            .withMaxRetries(mcpRetryConfig.getMaxRetries())
+            .withDelay(Duration.ofMillis(mcpRetryConfig.getInterval()))
+            .onRetry(event -> {
+                if (log.isDebugEnabled()) {
+                    log.warn("Failed to deliver message after {} attempts. Retrying in {} ms. Error: {}",
+                        event.getAttemptCount(), mcpRetryConfig.getInterval(), event.getLastException());
+                } else {
+                    log.warn("Failed to deliver message after {} attempts. Retrying in {} ms.",
+                        event.getAttemptCount(), mcpRetryConfig.getInterval());
+                }
+            }).onFailure(event -> {
+                if (log.isDebugEnabled()) {
+                    log.error("Failed to deliver message after {} attempts. Error: {}",
+                        event.getAttemptCount(), event.getException());
+                } else {
+                    log.error("Failed to deliver message after {} attempts.",
+                        event.getAttemptCount());
+                }
+            }).build();
+    }
+
+    /**
+     * Initializes the WebClient for making Mcp requests based on the provided SinkConnectorConfig.
+     */
+    @Override
+    public void start() {
+        sinkHandler.start();
+    }
+
+
+    /**
+     * Processes McpConnectRecord on specified URL while returning its own processing logic This method provides the retry power to process the
+     * McpConnectRecord
+     *
+     * @param url               URI to which the McpConnectRecord should be sent
+     * @param mcpConnectRecord McpConnectRecord to process
+     * @param attributes        additional attributes to pass to the processing chain
+     * @return processing chain
+     */
+    @Override
+    public Future<HttpResponse<Buffer>> deliver(URI url, McpConnectRecord mcpConnectRecord, Map<String, Object> attributes,
+                                                ConnectRecord connectRecord) {
+        Failsafe.with(retryPolicy)
+            .getStageAsync(() -> sinkHandler.deliver(url, mcpConnectRecord, attributes, connectRecord).toCompletionStage());
+        return null;
+    }
+
+
+    /**
+     * Cleans up and releases resources used by the Mcp handler.
+     */
+    @Override
+    public void stop() {
+        sinkHandler.stop();
+    }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/McpSourceConnector.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/McpSourceConnector.java
new file mode 100644
index 0000000..e5fe258
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/McpSourceConnector.java
@@ -0,0 +1,659 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.source;
+
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.CACHE_CONTROL_NO_CACHE;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.CONNECTION_KEEP_ALIVE;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.CONTENT_TYPE_JSON;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.CONTENT_TYPE_JSON_PLAIN;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.CONTENT_TYPE_SSE;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.CORS_ALLOWED_HEADERS;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.CORS_ALLOWED_METHODS;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.CORS_ALLOW_ALL;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.CORS_EXPOSED_HEADERS;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.DEFAULT_CONNECTOR_NAME;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.DEFAULT_HEARTBEAT_INTERVAL_MS;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.DEFAULT_IDLE_TIMEOUT_MS;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.DEFAULT_NO_MESSAGE;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.DEFAULT_PROTOCOL_VERSION;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.DEFAULT_SERVER_NAME;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.DEFAULT_SERVER_VERSION;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.ENDPOINT_HEALTH;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.ERROR_INTERNAL;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.ERROR_INVALID_PARAMS;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.ERROR_METHOD_NOT_FOUND;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.HEADER_ACCEPT;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.HEADER_CACHE_CONTROL;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.HEADER_CONNECTION;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.HEADER_CONTENT_TYPE;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.HEADER_CORS_ALLOW_HEADERS;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.HEADER_CORS_ALLOW_METHODS;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.HEADER_CORS_ALLOW_ORIGIN;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.HEADER_CORS_EXPOSE_HEADERS;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.HEADER_X_ACCEL_BUFFERING;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.HTTP_METHOD_OPTIONS;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.HTTP_STATUS_INTERNAL_ERROR;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.HTTP_STATUS_NO_CONTENT;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_CAPABILITIES;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_CONNECTOR;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_CONTENT;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_DESCRIPTION;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_ERROR;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_ERROR_CODE;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_ERROR_MESSAGE;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_ID;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_JSONRPC;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_METHOD;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_NAME;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_PARAMS;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_PROPERTIES;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_PROTOCOL_VERSION;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_REQUIRED;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_RESULT;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_SERVER_INFO;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_STATUS;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_TEXT;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_TOOLS;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_TYPE;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_VERSION;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.MAX_POLL_WAIT_TIME_MS;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.METHOD_INITIALIZE;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.METHOD_NOTIFICATIONS_INITIALIZED;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.METHOD_PING;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.METHOD_TOOLS_CALL;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.METHOD_TOOLS_LIST;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.PARAM_DESC_MESSAGE;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.PARAM_DESC_MESSAGE_CONTENT;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.PARAM_DESC_TOPIC;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.PARAM_MESSAGE;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.PARAM_TOPIC;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.SSE_DATA_OPEN;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.SSE_EVENT_OPEN;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.SSE_HEARTBEAT;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.VALUE_JSONRPC_VERSION;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.VALUE_STATUS_UP;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.VALUE_TYPE_OBJECT;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.VALUE_TYPE_STRING;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.VALUE_TYPE_TEXT;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.X_ACCEL_BUFFERING_NO;
+
+import org.apache.eventmesh.common.config.connector.Config;
+import org.apache.eventmesh.common.config.connector.mcp.McpSourceConfig;
+import org.apache.eventmesh.common.exception.EventMeshException;
+import org.apache.eventmesh.connector.mcp.source.protocol.Protocol;
+import org.apache.eventmesh.connector.mcp.source.protocol.ProtocolFactory;
+import org.apache.eventmesh.openconnect.api.ConnectorCreateService;
+import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
+import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
+import org.apache.eventmesh.openconnect.api.source.Source;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpServer;
+import io.vertx.core.http.HttpServerOptions;
+import io.vertx.core.json.JsonArray;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.web.Route;
+import io.vertx.ext.web.Router;
+import io.vertx.ext.web.RoutingContext;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.handler.BodyHandler;
+import io.vertx.ext.web.handler.LoggerHandler;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * MCP Source Connector for EventMesh Implements MCP protocol server allowing AI clients to interact with EventMesh via MCP protocol
+ */
+@Slf4j
+public class McpSourceConnector implements Source, ConnectorCreateService<Source> {
+
+    private McpSourceConfig sourceConfig;
+
+    private BlockingQueue<Object> queue;
+
+    private int batchSize;
+
+    private String forwardPath;
+
+    private Route route;
+
+    private Protocol protocol;
+
+    private HttpServer server;
+
+    private Vertx vertx;
+
+    private WebClient webClient;
+
+    private McpToolRegistry toolRegistry;
+
+    @Getter
+    private volatile boolean started = false;
+
+    @Getter
+    private volatile boolean destroyed = false;
+
+    @Override
+    public Class<? extends Config> configClass() {
+        return McpSourceConfig.class;
+    }
+
+    @Override
+    public Source create() {
+        return new McpSourceConnector();
+    }
+
+    @Override
+    public void init(Config config) {
+        this.sourceConfig = (McpSourceConfig) config;
+        doInit();
+    }
+
+    @Override
+    public void init(ConnectorContext connectorContext) {
+        SourceConnectorContext sourceConnectorContext = (SourceConnectorContext) connectorContext;
+        this.sourceConfig = (McpSourceConfig) sourceConnectorContext.getSourceConfig();
+        doInit();
+    }
+
+    /**
+     * Initialize the connector
+     */
+    private void doInit() {
+        log.info("Initializing MCP Source Connector...");
+
+        // Initialize queue
+        int maxQueueSize = this.sourceConfig.getConnectorConfig().getMaxStorageSize();
+        this.queue = new LinkedBlockingQueue<>(maxQueueSize);
+
+        // Initialize batch size
+        this.batchSize = this.sourceConfig.getConnectorConfig().getBatchSize();
+
+        String protocolName = this.sourceConfig.getConnectorConfig().getProtocol();
+        this.protocol = ProtocolFactory.getInstance(this.sourceConfig.connectorConfig, protocolName);
+
+        // Initialize tool registry
+        this.toolRegistry = new McpToolRegistry();
+        registerDefaultTools();
+
+        // Initialize Vertx and router
+        this.vertx = Vertx.vertx();
+        final Router router = Router.router(vertx);
+        this.webClient = WebClient.create(vertx);
+
+        final String basePath = this.sourceConfig.connectorConfig.getPath();
+        this.forwardPath = this.sourceConfig.connectorConfig.getForwardPath();
+
+        // Configure CORS (must be before all routes)
+        router.route().handler(ctx -> {
+            ctx.response()
+                .putHeader(HEADER_CORS_ALLOW_ORIGIN, CORS_ALLOW_ALL)
+                .putHeader(HEADER_CORS_ALLOW_METHODS, CORS_ALLOWED_METHODS)
+                .putHeader(HEADER_CORS_ALLOW_HEADERS, CORS_ALLOWED_HEADERS)
+                .putHeader(HEADER_CORS_EXPOSE_HEADERS, CORS_EXPOSED_HEADERS);
+
+            if (HTTP_METHOD_OPTIONS.equals(ctx.request().method().name())) {
+                ctx.response().setStatusCode(HTTP_STATUS_NO_CONTENT).end();
+            } else {
+                ctx.next();
+            }
+        });
+
+        // Body handler
+        router.route().handler(BodyHandler.create());
+
+        // Main endpoint - handles both JSON-RPC and SSE requests
+        router.post(basePath)
+            .handler(LoggerHandler.create())
+            .handler(ctx -> {
+                String contentType = ctx.request().getHeader(HEADER_CONTENT_TYPE);
+                String accept = ctx.request().getHeader(HEADER_ACCEPT);
+
+                // Determine if it's an SSE request or JSON-RPC request
+                if (CONTENT_TYPE_SSE.startsWith(accept != null ? accept : "")) {
+                    handleSseRequest(ctx);
+                } else {
+                    handleJsonRpcRequest(ctx);
+                }
+            });
+
+        // GET request for SSE support
+        router.get(basePath)
+            .handler(this::handleSseRequest);
+
+        // Health check endpoint
+        router.get(basePath + ENDPOINT_HEALTH).handler(ctx -> {
+            JsonObject health = new JsonObject()
+                .put(KEY_STATUS, VALUE_STATUS_UP)
+                .put(KEY_CONNECTOR, DEFAULT_CONNECTOR_NAME)
+                .put(KEY_TOOLS, toolRegistry.getToolCount());
+            ctx.response()
+                .putHeader(HEADER_CONTENT_TYPE, CONTENT_TYPE_JSON)
+                .end(health.encode());
+        });
+
+        Route forwardRoute = router.route().path(forwardPath).handler(LoggerHandler.create());
+
+        this.route = router.route()
+            .path(this.sourceConfig.connectorConfig.getPath())
+            .handler(LoggerHandler.create());
+
+        // set protocol handler
+        this.protocol.setHandler(route, queue);
+        this.protocol.setHandler(forwardRoute, queue);
+
+        // Create server
+        this.server = vertx.createHttpServer(new HttpServerOptions()
+                .setPort(this.sourceConfig.connectorConfig.getPort())
+                .setHandle100ContinueAutomatically(true)
+                .setIdleTimeout(DEFAULT_IDLE_TIMEOUT_MS)
+                .setIdleTimeoutUnit(TimeUnit.MILLISECONDS))
+            .requestHandler(router);
+
+        log.info("MCP Source Connector initialized on http://127.0.0.1:{}{}",
+            this.sourceConfig.connectorConfig.getPort(), basePath);
+    }
+
+    /**
+     * Register default MCP tools
+     */
+    private void registerDefaultTools() {
+        // Echo tool
+        toolRegistry.registerTool(
+            "echo",
+            "Echo back the input message",
+            createEchoSchema(),
+            args -> {
+                String message = args.getString(PARAM_MESSAGE, DEFAULT_NO_MESSAGE);
+                return createTextContent("Echo: " + message);
+            }
+        );
+
+        // EventMesh message sending tool
+        toolRegistry.registerTool(
+            "sendEventMeshMessage",
+            "Send a message to EventMesh",
+            createSendMessageSchema(),
+            args -> {
+                String topic = args.getString(PARAM_TOPIC);
+                Object message = args.getString(PARAM_MESSAGE);
+
+                webClient.post(this.sourceConfig.connectorConfig.getPort(), "127.0.0.1", this.forwardPath)
+                    .putHeader(CORS_EXPOSED_HEADERS, CONTENT_TYPE_JSON_PLAIN)
+                    .sendBuffer(Buffer.buffer(
+                        new JsonObject()
+                            .put("type", "mcp.tools.call")
+                            .put("tool", "sendEventMeshMessage")
+                            .put("arguments", new JsonObject().put("message", message).put("topic", topic))
+                            .encode()
+                    ), ar -> {
+                        if (ar.succeeded()) {
+                            log.info("forwarded tools/call to {} OK, status={}", forwardPath, ar.result().statusCode());
+                        } else {
+                            log.warn("forward tools/call failed: {}", ar.cause().toString());
+                        }
+                    });
+
+                return createTextContent(
+                    String.format("Message sent to topic '%s': %s", topic, message)
+                );
+            }
+        );
+
+        log.info("Registered {} MCP tools", toolRegistry.getToolCount());
+    }
+
+    /**
+     * Handle JSON-RPC request (HTTP mode)
+     *
+     * @param ctx Routing context
+     */
+    private void handleJsonRpcRequest(RoutingContext ctx) {
+        String body = ctx.body().asString();
+
+        try {
+            JsonObject request = new JsonObject(body);
+            JsonObject response = handleMcpRequest(request);
+
+            if (response != null) {
+                ctx.response()
+                    .putHeader(HEADER_CONTENT_TYPE, CONTENT_TYPE_JSON)
+                    .end(response.encode());
+            } else {
+                // Notification messages don't need response
+                ctx.response().setStatusCode(HTTP_STATUS_NO_CONTENT).end();
+            }
+
+        } catch (Exception e) {
+            JsonObject error = createErrorResponse(null, ERROR_INTERNAL,
+                "Internal error: " + e.getMessage());
+            ctx.response()
+                .putHeader(HEADER_CONTENT_TYPE, CONTENT_TYPE_JSON)
+                .setStatusCode(HTTP_STATUS_INTERNAL_ERROR)
+                .end(error.encode());
+        }
+    }
+
+    /**
+     * Handle SSE request (Server-Sent Events mode)
+     *
+     * @param ctx Routing context
+     */
+    private void handleSseRequest(RoutingContext ctx) {
+        ctx.response()
+            .putHeader(HEADER_CONTENT_TYPE, CONTENT_TYPE_SSE)
+            .putHeader(HEADER_CACHE_CONTROL, CACHE_CONTROL_NO_CACHE)
+            .putHeader(HEADER_CONNECTION, CONNECTION_KEEP_ALIVE)
+            .putHeader(HEADER_X_ACCEL_BUFFERING, X_ACCEL_BUFFERING_NO)
+            .setChunked(true);
+
+        // Send connection established event
+        ctx.response().write(SSE_EVENT_OPEN);
+        ctx.response().write(SSE_DATA_OPEN);
+
+        // Heartbeat (optional)
+        long timerId = vertx.setPeriodic(DEFAULT_HEARTBEAT_INTERVAL_MS, id -> {
+            if (!ctx.response().closed()) {
+                ctx.response().write(SSE_HEARTBEAT);
+            } else {
+                vertx.cancelTimer(id);
+            }
+        });
+
+        ctx.request().connection().closeHandler(v -> {
+            vertx.cancelTimer(timerId);
+        });
+    }
+
+    /**
+     * Handle MCP JSON-RPC request
+     *
+     * @param request JSON-RPC request object
+     * @return JSON-RPC response object, or null for notifications
+     */
+    private JsonObject handleMcpRequest(JsonObject request) {
+        String method = request.getString(KEY_METHOD, "");
+        Object id = request.getValue(KEY_ID);
+        JsonObject params = request.getJsonObject(KEY_PARAMS);
+
+        switch (method) {
+            case METHOD_INITIALIZE:
+                return handleInitialize(id, params);
+            case METHOD_NOTIFICATIONS_INITIALIZED:
+                return null; // Notifications don't need response
+            case METHOD_TOOLS_LIST:
+                return handleToolsList(id);
+            case METHOD_TOOLS_CALL:
+                return handleToolsCall(id, params);
+            case METHOD_PING:
+                return createSuccessResponse(id, new JsonObject());
+            default:
+                return createErrorResponse(id, ERROR_METHOD_NOT_FOUND,
+                    "Method not found: " + method);
+        }
+    }
+
+    /**
+     * Handle initialize method
+     *
+     * @param id     Request ID
+     * @param params Request parameters
+     * @return Initialize response
+     */
+    private JsonObject handleInitialize(Object id, JsonObject params) {
+        String clientVersion = params != null
+            ? params.getString(KEY_PROTOCOL_VERSION, DEFAULT_PROTOCOL_VERSION)
+            : DEFAULT_PROTOCOL_VERSION;
+
+        JsonObject result = new JsonObject()
+            .put(KEY_PROTOCOL_VERSION, clientVersion)
+            .put(KEY_SERVER_INFO, new JsonObject()
+                .put(KEY_NAME, DEFAULT_SERVER_NAME)
+                .put(KEY_VERSION, DEFAULT_SERVER_VERSION))
+            .put(KEY_CAPABILITIES, new JsonObject()
+                .put(KEY_TOOLS, new JsonObject()));
+
+        return createSuccessResponse(id, result);
+    }
+
+    /**
+     * Handle tools/list method
+     *
+     * @param id Request ID
+     * @return Tools list response
+     */
+    private JsonObject handleToolsList(Object id) {
+        JsonArray tools = toolRegistry.getToolsArray();
+        JsonObject result = new JsonObject().put(KEY_TOOLS, tools);
+        return createSuccessResponse(id, result);
+    }
+
+    /**
+     * Handle tools/call method
+     *
+     * @param id     Request ID
+     * @param params Tool call parameters
+     * @return Tool execution result
+     */
+    private JsonObject handleToolsCall(Object id, JsonObject params) {
+        if (params == null) {
+            return createErrorResponse(id, ERROR_INVALID_PARAMS, "Invalid params");
+        }
+
+        String toolName = params.getString(KEY_NAME);
+        JsonObject arguments = params.getJsonObject("arguments", new JsonObject());
+
+        log.info("Calling tool: {} with arguments: {}", toolName, arguments);
+
+        try {
+            JsonObject content = toolRegistry.executeTool(toolName, arguments);
+            JsonObject result = new JsonObject()
+                .put(KEY_CONTENT, new JsonArray().add(content));
+
+            return createSuccessResponse(id, result);
+
+        } catch (IllegalArgumentException e) {
+            return createErrorResponse(id, ERROR_INVALID_PARAMS, e.getMessage());
+        } catch (Exception e) {
+            log.error("Tool execution error", e);
+            return createErrorResponse(id, ERROR_INTERNAL,
+                "Tool execution failed: " + e.getMessage());
+        }
+    }
+
+    // ========== JSON-RPC Response Builders ==========
+
+    /**
+     * Create a success response
+     *
+     * @param id     Request ID
+     * @param result Result object
+     * @return JSON-RPC success response
+     */
+    private JsonObject createSuccessResponse(Object id, JsonObject result) {
+        return new JsonObject()
+            .put(KEY_JSONRPC, VALUE_JSONRPC_VERSION)
+            .put(KEY_ID, id)
+            .put(KEY_RESULT, result);
+    }
+
+    /**
+     * Create an error response
+     *
+     * @param id      Request ID
+     * @param code    Error code
+     * @param message Error message
+     * @return JSON-RPC error response
+     */
+    private JsonObject createErrorResponse(Object id, int code, String message) {
+        return new JsonObject()
+            .put(KEY_JSONRPC, VALUE_JSONRPC_VERSION)
+            .put(KEY_ID, id)
+            .put(KEY_ERROR, new JsonObject()
+                .put(KEY_ERROR_CODE, code)
+                .put(KEY_ERROR_MESSAGE, message));
+    }
+
+    // ========== Schema Creation Helpers ==========
+
+    /**
+     * Create JSON schema for echo tool
+     *
+     * @return Echo tool input schema
+     */
+    private JsonObject createEchoSchema() {
+        return new JsonObject()
+            .put(KEY_TYPE, VALUE_TYPE_OBJECT)
+            .put(KEY_PROPERTIES, new JsonObject()
+                .put(PARAM_MESSAGE, new JsonObject()
+                    .put(KEY_TYPE, VALUE_TYPE_STRING)
+                    .put(KEY_DESCRIPTION, PARAM_DESC_MESSAGE)))
+            .put(KEY_REQUIRED, new JsonArray().add(PARAM_MESSAGE));
+    }
+
+    /**
+     * Create JSON schema for send message tool
+     *
+     * @return Send message tool input schema
+     */
+    private JsonObject createSendMessageSchema() {
+        return new JsonObject()
+            .put(KEY_TYPE, VALUE_TYPE_OBJECT)
+            .put(KEY_PROPERTIES, new JsonObject()
+                .put(PARAM_TOPIC, new JsonObject()
+                    .put(KEY_TYPE, VALUE_TYPE_STRING)
+                    .put(KEY_DESCRIPTION, PARAM_DESC_TOPIC))
+                .put(PARAM_MESSAGE, new JsonObject()
+                    .put(KEY_TYPE, VALUE_TYPE_STRING)
+                    .put(KEY_DESCRIPTION, PARAM_DESC_MESSAGE_CONTENT)))
+            .put(KEY_REQUIRED, new JsonArray().add(PARAM_TOPIC).add(PARAM_MESSAGE));
+    }
+
+    /**
+     * Create text content object
+     *
+     * @param text Text content
+     * @return MCP text content object
+     */
+    private JsonObject createTextContent(String text) {
+        return new JsonObject()
+            .put(KEY_TYPE, VALUE_TYPE_TEXT)
+            .put(KEY_TEXT, text);
+    }
+
+    // ========== Source Interface Implementation ==========
+
+    @Override
+    public void start() {
+        this.server.listen(res -> {
+            if (res.succeeded()) {
+                this.started = true;
+                log.info("McpSourceConnector started on port: {}",
+                    this.sourceConfig.getConnectorConfig().getPort());
+                log.info("MCP endpoints available at:");
+                log.info("  - POST {} (JSON-RPC)", this.sourceConfig.connectorConfig.getPath());
+                log.info("  - GET {} (SSE)", this.sourceConfig.connectorConfig.getPath());
+                log.info("  - GET {}{} (Health check)",
+                    this.sourceConfig.connectorConfig.getPath(), ENDPOINT_HEALTH);
+            } else {
+                log.error("McpSourceConnector failed to start on port: {}",
+                    this.sourceConfig.getConnectorConfig().getPort());
+                throw new EventMeshException("failed to start Vertx server", res.cause());
+            }
+        });
+    }
+
+    @Override
+    public void commit(ConnectRecord record) {
+        if (sourceConfig.getConnectorConfig().isDataConsistencyEnabled()) {
+            log.debug("McpSourceConnector commit record: {}", record.getRecordId());
+            // MCP protocol processing doesn't require additional commit logic
+        }
+    }
+
+    @Override
+    public String name() {
+        return this.sourceConfig.getConnectorConfig().getConnectorName();
+    }
+
+    @Override
+    public void onException(ConnectRecord record) {
+        log.error("Exception occurred for record: {}", record.getRecordId());
+        // MCP errors are already handled via JSON-RPC error responses
+    }
+
+    @Override
+    public void stop() {
+        if (this.server != null) {
+            this.server.close(res -> {
+                if (res.succeeded()) {
+                    this.destroyed = true;
+                    log.info("McpSourceConnector stopped on port: {}",
+                        this.sourceConfig.getConnectorConfig().getPort());
+                } else {
+                    log.error("McpSourceConnector failed to stop on port: {}",
+                        this.sourceConfig.getConnectorConfig().getPort());
+                    throw new EventMeshException("failed to stop Vertx server", res.cause());
+                }
+            });
+        } else {
+            log.warn("McpSourceConnector server is null, ignore.");
+        }
+
+        if (this.vertx != null) {
+            this.vertx.close();
+        }
+    }
+
+    @Override
+    public List<ConnectRecord> poll() {
+        long startTime = System.currentTimeMillis();
+        long remainingTime = MAX_POLL_WAIT_TIME_MS;
+
+        List<ConnectRecord> connectRecords = new ArrayList<>(batchSize);
+        for (int i = 0; i < batchSize; i++) {
+            try {
+                Object obj = queue.poll(remainingTime, TimeUnit.MILLISECONDS);
+                if (obj == null) {
+                    break;
+                }
+
+                // Convert MCP tool calls to ConnectRecord
+                ConnectRecord connectRecord = protocol.convertToConnectRecord(obj);
+                connectRecords.add(connectRecord);
+
+                long elapsedTime = System.currentTimeMillis() - startTime;
+                remainingTime = MAX_POLL_WAIT_TIME_MS > elapsedTime
+                    ? MAX_POLL_WAIT_TIME_MS - elapsedTime : 0;
+            } catch (Exception e) {
+                log.error("Failed to poll from queue.", e);
+                throw new RuntimeException(e);
+            }
+        }
+        return connectRecords;
+    }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/McpSourceConstants.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/McpSourceConstants.java
new file mode 100644
index 0000000..423cbbc
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/McpSourceConstants.java
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.source;
+
+/**
+ * Constants for MCP Source Connector
+ */
+public final class McpSourceConstants {
+
+    private McpSourceConstants() {
+        // Utility class, no instantiation
+    }
+
+    // ========== Server Configuration ==========
+
+    /**
+     * Default connector name
+     */
+    public static final String DEFAULT_CONNECTOR_NAME = "mcp-source";
+
+    /**
+     * Default server name for MCP protocol
+     */
+    public static final String DEFAULT_SERVER_NAME = "eventmesh-mcp-connector";
+
+    /**
+     * Default server version
+     */
+    public static final String DEFAULT_SERVER_VERSION = "1.0.0";
+
+    /**
+     * Default MCP protocol version
+     */
+    public static final String DEFAULT_PROTOCOL_VERSION = "2024-11-05";
+
+    /**
+     * Default idle timeout in milliseconds (60 seconds)
+     */
+    public static final int DEFAULT_IDLE_TIMEOUT_MS = 60000;
+
+    /**
+     * Default heartbeat interval in milliseconds (30 seconds)
+     */
+    public static final long DEFAULT_HEARTBEAT_INTERVAL_MS = 30000;
+
+    /**
+     * Maximum poll wait time in milliseconds (5 seconds)
+     */
+    public static final long MAX_POLL_WAIT_TIME_MS = 5000;
+
+    // ========== HTTP Headers ==========
+
+    /**
+     * Content-Type header name
+     */
+    public static final String HEADER_CONTENT_TYPE = "Content-Type";
+
+    /**
+     * Accept header name
+     */
+    public static final String HEADER_ACCEPT = "Accept";
+
+    /**
+     * Access-Control-Allow-Origin header
+     */
+    public static final String HEADER_CORS_ALLOW_ORIGIN = "Access-Control-Allow-Origin";
+
+    /**
+     * Access-Control-Allow-Methods header
+     */
+    public static final String HEADER_CORS_ALLOW_METHODS = "Access-Control-Allow-Methods";
+
+    /**
+     * Access-Control-Allow-Headers header
+     */
+    public static final String HEADER_CORS_ALLOW_HEADERS = "Access-Control-Allow-Headers";
+
+    /**
+     * Access-Control-Expose-Headers header
+     */
+    public static final String HEADER_CORS_EXPOSE_HEADERS = "Access-Control-Expose-Headers";
+
+    /**
+     * Cache-Control header
+     */
+    public static final String HEADER_CACHE_CONTROL = "Cache-Control";
+
+    /**
+     * Connection header
+     */
+    public static final String HEADER_CONNECTION = "Connection";
+
+    /**
+     * X-Accel-Buffering header
+     */
+    public static final String HEADER_X_ACCEL_BUFFERING = "X-Accel-Buffering";
+
+    // ========== CORS Values ==========
+
+    /**
+     * CORS allow all origins
+     */
+    public static final String CORS_ALLOW_ALL = "*";
+
+    /**
+     * CORS allowed methods
+     */
+    public static final String CORS_ALLOWED_METHODS = "GET, POST, OPTIONS";
+
+    /**
+     * CORS allowed headers
+     */
+    public static final String CORS_ALLOWED_HEADERS = "Content-Type, Authorization, Accept";
+
+    /**
+     * CORS exposed headers
+     */
+    public static final String CORS_EXPOSED_HEADERS = "Content-Type";
+
+    // ========== Content Types ==========
+
+    /**
+     * JSON content type with UTF-8 charset
+     */
+    public static final String CONTENT_TYPE_JSON = "application/json; charset=utf-8";
+
+    /**
+     * Server-Sent Events content type
+     */
+    public static final String CONTENT_TYPE_SSE = "text/event-stream; charset=utf-8";
+
+    /**
+     * Plain JSON content type (for matching)
+     */
+    public static final String CONTENT_TYPE_JSON_PLAIN = "application/json";
+
+    // ========== HTTP Status Codes ==========
+
+    /**
+     * HTTP 204 No Content
+     */
+    public static final int HTTP_STATUS_NO_CONTENT = 204;
+
+    /**
+     * HTTP 500 Internal Server Error
+     */
+    public static final int HTTP_STATUS_INTERNAL_ERROR = 500;
+
+    // ========== JSON-RPC Methods ==========
+
+    /**
+     * Initialize method
+     */
+    public static final String METHOD_INITIALIZE = "initialize";
+
+    /**
+     * Notifications initialized method
+     */
+    public static final String METHOD_NOTIFICATIONS_INITIALIZED = "notifications/initialized";
+
+    /**
+     * Tools list method
+     */
+    public static final String METHOD_TOOLS_LIST = "tools/list";
+
+    /**
+     * Tools call method
+     */
+    public static final String METHOD_TOOLS_CALL = "tools/call";
+
+    /**
+     * Ping method
+     */
+    public static final String METHOD_PING = "ping";
+
+    // ========== JSON-RPC Error Codes ==========
+
+    /**
+     * Invalid params error code
+     */
+    public static final int ERROR_INVALID_PARAMS = -32602;
+
+    /**
+     * Method not found error code
+     */
+    public static final int ERROR_METHOD_NOT_FOUND = -32601;
+
+    /**
+     * Internal error code
+     */
+    public static final int ERROR_INTERNAL = -32603;
+
+    // ========== JSON Keys ==========
+
+    /**
+     * JSON-RPC version key
+     */
+    public static final String KEY_JSONRPC = "jsonrpc";
+
+    /**
+     * JSON-RPC version value
+     */
+    public static final String VALUE_JSONRPC_VERSION = "2.0";
+
+    /**
+     * Method key
+     */
+    public static final String KEY_METHOD = "method";
+
+    /**
+     * ID key
+     */
+    public static final String KEY_ID = "id";
+
+    /**
+     * Params key
+     */
+    public static final String KEY_PARAMS = "params";
+
+    /**
+     * Result key
+     */
+    public static final String KEY_RESULT = "result";
+
+    /**
+     * Error key
+     */
+    public static final String KEY_ERROR = "error";
+
+    /**
+     * Error code key
+     */
+    public static final String KEY_ERROR_CODE = "code";
+
+    /**
+     * Error message key
+     */
+    public static final String KEY_ERROR_MESSAGE = "message";
+
+    /**
+     * Protocol version key
+     */
+    public static final String KEY_PROTOCOL_VERSION = "protocolVersion";
+
+    /**
+     * Server info key
+     */
+    public static final String KEY_SERVER_INFO = "serverInfo";
+
+    /**
+     * Capabilities key
+     */
+    public static final String KEY_CAPABILITIES = "capabilities";
+
+    /**
+     * Tools key
+     */
+    public static final String KEY_TOOLS = "tools";
+
+    /**
+     * Name key
+     */
+    public static final String KEY_NAME = "name";
+
+    /**
+     * Version key
+     */
+    public static final String KEY_VERSION = "version";
+
+    /**
+     * Content key
+     */
+    public static final String KEY_CONTENT = "content";
+
+    /**
+     * Type key
+     */
+    public static final String KEY_TYPE = "type";
+
+    /**
+     * Text key
+     */
+    public static final String KEY_TEXT = "text";
+
+    /**
+     * Description key
+     */
+    public static final String KEY_DESCRIPTION = "description";
+
+    /**
+     * Input schema key
+     */
+    public static final String KEY_INPUT_SCHEMA = "inputSchema";
+
+    /**
+     * Properties key
+     */
+    public static final String KEY_PROPERTIES = "properties";
+
+    /**
+     * Required key
+     */
+    public static final String KEY_REQUIRED = "required";
+
+    /**
+     * Status key
+     */
+    public static final String KEY_STATUS = "status";
+
+    /**
+     * Connector key
+     */
+    public static final String KEY_CONNECTOR = "connector";
+
+    // ========== JSON Values ==========
+
+    /**
+     * Object type value
+     */
+    public static final String VALUE_TYPE_OBJECT = "object";
+
+    /**
+     * String type value
+     */
+    public static final String VALUE_TYPE_STRING = "string";
+
+    /**
+     * Text type value
+     */
+    public static final String VALUE_TYPE_TEXT = "text";
+
+    /**
+     * Status UP value
+     */
+    public static final String VALUE_STATUS_UP = "UP";
+
+    // ========== HTTP Methods ==========
+
+    /**
+     * OPTIONS HTTP method
+     */
+    public static final String HTTP_METHOD_OPTIONS = "OPTIONS";
+
+    // ========== SSE Events ==========
+
+    /**
+     * SSE event: open
+     */
+    public static final String SSE_EVENT_OPEN = "event: open\n";
+
+    /**
+     * SSE data for open event
+     */
+    public static final String SSE_DATA_OPEN = "data: {\"type\":\"open\"}\n\n";
+
+    /**
+     * SSE heartbeat comment
+     */
+    public static final String SSE_HEARTBEAT = ": heartbeat\n\n";
+
+    /**
+     * Cache control: no-cache
+     */
+    public static final String CACHE_CONTROL_NO_CACHE = "no-cache";
+
+    /**
+     * Connection: keep-alive
+     */
+    public static final String CONNECTION_KEEP_ALIVE = "keep-alive";
+
+    /**
+     * X-Accel-Buffering: no
+     */
+    public static final String X_ACCEL_BUFFERING_NO = "no";
+
+    // ========== Tool Parameter Names ==========
+
+    /**
+     * Message parameter name
+     */
+    public static final String PARAM_MESSAGE = "message";
+
+    /**
+     * Topic parameter name
+     */
+    public static final String PARAM_TOPIC = "topic";
+
+    // ========== Tool Parameter Descriptions ==========
+
+    /**
+     * Message parameter description
+     */
+    public static final String PARAM_DESC_MESSAGE = "Message to echo";
+
+    /**
+     * Topic parameter description
+     */
+    public static final String PARAM_DESC_TOPIC = "EventMesh topic";
+
+    /**
+     * Message content parameter description
+     */
+    public static final String PARAM_DESC_MESSAGE_CONTENT = "Message content";
+
+    // ========== Default Values ==========
+
+    /**
+     * Default message when no message provided
+     */
+    public static final String DEFAULT_NO_MESSAGE = "No message";
+
+    // ========== Endpoint Paths ==========
+
+    /**
+     * Health check endpoint suffix
+     */
+    public static final String ENDPOINT_HEALTH = "/health";
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/McpToolRegistry.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/McpToolRegistry.java
new file mode 100644
index 0000000..a59d93f
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/McpToolRegistry.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.source;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import io.vertx.core.json.JsonArray;
+import io.vertx.core.json.JsonObject;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * MCP Tool Registry
+ * Manages all available MCP tools
+ */
+@Slf4j
+public class McpToolRegistry {
+
+    private final Map<String, McpTool> tools = new ConcurrentHashMap<>();
+
+    /**
+     * Register an MCP tool
+     * @param name Tool name
+     * @param description Tool description
+     * @param inputSchema JSON schema for tool input parameters
+     * @param executor Tool execution logic
+     */
+    public void registerTool(String name, String description,
+                             JsonObject inputSchema, ToolExecutor executor) {
+        McpTool tool = new McpTool(name, description, inputSchema, executor);
+        tools.put(name, tool);
+        log.info("Registered MCP tool: {}", name);
+    }
+
+    /**
+     * Execute a specified tool
+     * @param name Tool name
+     * @param arguments Tool arguments as JSON object
+     * @return Tool execution result as MCP content object
+     * @throws IllegalArgumentException if tool not found
+     * @throws RuntimeException if tool execution fails
+     */
+    public JsonObject executeTool(String name, JsonObject arguments) {
+        McpTool tool = tools.get(name);
+        if (tool == null) {
+            throw new IllegalArgumentException("Unknown tool: " + name);
+        }
+
+        try {
+            return tool.executor.execute(arguments);
+        } catch (Exception e) {
+            log.error("Tool execution failed: {}", name, e);
+            throw new RuntimeException("Tool execution failed: " + e.getMessage(), e);
+        }
+    }
+
+    /**
+     * Get all tools as a JSON array
+     * @return JSON array containing all registered tools with their metadata
+     */
+    public JsonArray getToolsArray() {
+        JsonArray array = new JsonArray();
+        for (McpTool tool : tools.values()) {
+            JsonObject toolObj = new JsonObject()
+                    .put("name", tool.name)
+                    .put("description", tool.description)
+                    .put("inputSchema", tool.inputSchema);
+            array.add(toolObj);
+        }
+        return array;
+    }
+
+    /**
+     * Get the number of registered tools
+     * @return Total count of registered tools
+     */
+    public int getToolCount() {
+        return tools.size();
+    }
+
+    /**
+     * Check if a tool exists
+     * @param name Tool name to check
+     * @return true if tool is registered, false otherwise
+     */
+    public boolean hasTool(String name) {
+        return tools.containsKey(name);
+    }
+
+    /**
+     * Tool Executor Interface
+     * Functional interface for defining tool execution logic
+     */
+    @FunctionalInterface
+    public interface ToolExecutor {
+        /**
+         * Execute tool logic
+         * @param arguments Tool input arguments as JSON object
+         * @return MCP content object (must contain 'type' and 'text' fields)
+         * @throws Exception if execution fails
+         */
+        JsonObject execute(JsonObject arguments) throws Exception;
+    }
+
+    /**
+     * MCP Tool Definition
+     * Internal class representing a registered MCP tool
+     */
+    private static class McpTool {
+        final String name;
+        final String description;
+        final JsonObject inputSchema;
+        final ToolExecutor executor;
+
+        McpTool(String name, String description, JsonObject inputSchema, ToolExecutor executor) {
+            this.name = name;
+            this.description = description;
+            this.inputSchema = inputSchema;
+            this.executor = executor;
+        }
+    }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/data/McpRequest.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/data/McpRequest.java
new file mode 100644
index 0000000..38b9be8
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/data/McpRequest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.source.data;
+
+import java.io.Serializable;
+
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.web.RoutingContext;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * MCP Protocol Request
+ * Represents a request in the MCP (Model Context Protocol) format
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class McpRequest implements Serializable {
+
+    private static final long serialVersionUID = -483500600756490500L;
+
+    /**
+     * Protocol name
+     */
+    private String protocolName;
+
+    /**
+     * Session ID for tracking the request
+     */
+    private String sessionId;
+
+    /**
+     * MCP method name
+     */
+    private String method;
+
+    /**
+     * Tool name
+     */
+    private String toolName;
+
+    /**
+     * Tool arguments
+     */
+    private JsonObject arguments;
+
+    /**
+     * Tool execution result
+     */
+    private JsonObject result;
+
+    /**
+     * Request timestamp
+     */
+    private long timestamp;
+
+    /**
+     * Whether the tool execution succeeded
+     */
+    private boolean success;
+
+    /**
+     * Error message if execution failed
+     */
+    private String errorMessage;
+
+    /**
+     * Vert.x routing context for HTTP response handling
+     */
+    private transient RoutingContext routingContext;
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/data/McpResponse.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/data/McpResponse.java
new file mode 100644
index 0000000..93e1cbc
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/data/McpResponse.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.source.data;
+
+import java.io.Serializable;
+import java.time.LocalDateTime;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONWriter.Feature;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * MCP Response
+ * Represents a response message for MCP protocol operations
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class McpResponse implements Serializable {
+
+    private static final long serialVersionUID = 8616938575207104455L;
+
+    /**
+     * Response status: "success", "error", etc.
+     */
+    private String status;
+
+    /**
+     * Response message
+     */
+    private String msg;
+
+    /**
+     * Response timestamp
+     */
+    private LocalDateTime handleTime;
+
+    /**
+     * Additional error code for error responses
+     */
+    private Integer errorCode;
+
+    /**
+     * Additional data payload
+     */
+    private Object data;
+
+    /**
+     * Convert to JSON string
+     *
+     * @return JSON string representation
+     */
+    public String toJsonStr() {
+        return JSON.toJSONString(this, Feature.WriteMapNullValue);
+    }
+
+    /**
+     * Create a success response
+     *
+     * @return Success response
+     */
+    public static McpResponse success() {
+        return new McpResponse("success", "Operation completed successfully",
+                LocalDateTime.now(), null, null);
+    }
+
+    /**
+     * Create a success response with message
+     *
+     * @param msg Success message
+     * @return Success response
+     */
+    public static McpResponse success(String msg) {
+        return new McpResponse("success", msg, LocalDateTime.now(), null, null);
+    }
+
+    /**
+     * Create a success response with data
+     *
+     * @param msg Success message
+     * @param data Response data
+     * @return Success response with data
+     */
+    public static McpResponse success(String msg, Object data) {
+        return new McpResponse("success", msg, LocalDateTime.now(), null, data);
+    }
+
+    /**
+     * Create an error response
+     *
+     * @param msg Error message
+     * @return Error response
+     */
+    public static McpResponse error(String msg) {
+        return new McpResponse("error", msg, LocalDateTime.now(), null, null);
+    }
+
+    /**
+     * Create an error response with error code
+     *
+     * @param msg Error message
+     * @param errorCode Error code
+     * @return Error response with code
+     */
+    public static McpResponse error(String msg, Integer errorCode) {
+        return new McpResponse("error", msg, LocalDateTime.now(), errorCode, null);
+    }
+
+    /**
+     * Create a base response
+     *
+     * @param msg Message
+     * @return Base response
+     */
+    public static McpResponse base(String msg) {
+        return new McpResponse("info", msg, LocalDateTime.now(), null, null);
+    }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/protocol/Protocol.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/protocol/Protocol.java
new file mode 100644
index 0000000..f18397a
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/protocol/Protocol.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.source.protocol;
+
+import org.apache.eventmesh.common.config.connector.mcp.SourceConnectorConfig;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.util.concurrent.BlockingQueue;
+
+import io.vertx.ext.web.Route;
+
+/**
+ * Protocol Interface.
+ * All protocols should implement this interface.
+ */
+public interface Protocol {
+
+    /**
+     * Initialize the protocol.
+     *
+     * @param sourceConnectorConfig source connector config
+     */
+    void initialize(SourceConnectorConfig sourceConnectorConfig);
+
+
+    /**
+     * Handle the protocol message.
+     *
+     * @param route     route
+     * @param queue queue info
+     */
+    void setHandler(Route route, BlockingQueue<Object> queue);
+
+
+    /**
+     * Convert the message to ConnectRecord.
+     *
+     * @param message message
+     * @return ConnectRecord
+     */
+    ConnectRecord convertToConnectRecord(Object message);
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/protocol/ProtocolFactory.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/protocol/ProtocolFactory.java
new file mode 100644
index 0000000..1c30d4e
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/protocol/ProtocolFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.source.protocol;
+
+import org.apache.eventmesh.common.config.connector.mcp.SourceConnectorConfig;
+import org.apache.eventmesh.connector.mcp.source.protocol.impl.McpStandardProtocol;
+
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * Protocol factory. This class is responsible for storing and creating instances of {@link Protocol} classes.
+ */
+public class ProtocolFactory {
+    // protocol name -> protocol class
+    private static final ConcurrentHashMap<String, Class<?>> protocols = new ConcurrentHashMap<>();
+
+    static {
+        // register all protocols
+        registerProtocol(McpStandardProtocol.PROTOCOL_NAME, McpStandardProtocol.class);
+    }
+
+
+    /**
+     * Register a protocol
+     *
+     * @param name  name of the protocol
+     * @param clazz class of the protocol
+     */
+    public static void registerProtocol(String name, Class<?> clazz) {
+        if (Protocol.class.isAssignableFrom(clazz)) {
+            // put the class into the map(case insensitive)
+            protocols.put(name.toLowerCase(), clazz);
+        } else {
+            throw new IllegalArgumentException("Class " + clazz.getName() + " does not implement Protocol interface");
+        }
+    }
+
+    /**
+     * Get an instance of a protocol, if it is not already created, create a new instance
+     *
+     * @param name name of the protocol
+     * @return instance of the protocol
+     */
+    public static Protocol getInstance(SourceConnectorConfig sourceConnectorConfig, String name) {
+        // get the class by name(case insensitive)
+        Class<?> clazz = Optional.ofNullable(protocols.get(name.toLowerCase()))
+                .orElseThrow(() -> new IllegalArgumentException("Protocol " + name + " is not registered"));
+        try {
+            // create a new instance
+            Protocol protocol = (Protocol) clazz.newInstance();
+            // initialize the protocol
+            protocol.initialize(sourceConnectorConfig);
+            return protocol;
+        } catch (InstantiationException | IllegalAccessException e) {
+            throw new IllegalArgumentException("Failed to instantiate protocol " + name, e);
+        }
+    }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/protocol/impl/McpStandardProtocol.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/protocol/impl/McpStandardProtocol.java
new file mode 100644
index 0000000..737dae8
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/protocol/impl/McpStandardProtocol.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.source.protocol.impl;
+
+import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.common.config.connector.mcp.SourceConnectorConfig;
+import org.apache.eventmesh.connector.mcp.source.data.McpRequest;
+import org.apache.eventmesh.connector.mcp.source.data.McpResponse;
+import org.apache.eventmesh.connector.mcp.source.protocol.Protocol;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.util.Base64;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.web.Route;
+import io.vertx.ext.web.handler.BodyHandler;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * MCP Standard Protocol Implementation
+ * Handles MCP (Model Context Protocol) requests and converts them to EventMesh ConnectRecords
+ */
+@Slf4j
+public class McpStandardProtocol implements Protocol {
+
+    /**
+     * Protocol name constant
+     */
+    public static final String PROTOCOL_NAME = "MCP";
+
+    // Extension keys
+    private static final String EXTENSION_PROTOCOL        = "protocol";
+    private static final String EXTENSION_SESSION_ID      = "sessionid";
+    private static final String EXTENSION_TOOL_NAME       = "toolname";
+    private static final String EXTENSION_METHOD          = "method";          // ok
+    private static final String EXTENSION_REQUEST_ID      = "requestid";
+    private static final String EXTENSION_SUCCESS         = "success";         // ok
+    private static final String EXTENSION_ERROR_MESSAGE   = "errormessage";
+    private static final String EXTENSION_ROUTING_CONTEXT = "routingcontext";
+    private static final String EXTENSION_IS_BASE64       = "isbase64";
+    private static final String METADATA_EXTENSION_KEY    = "extension";
+
+    private SourceConnectorConfig sourceConnectorConfig;
+
+    /**
+     * Initialize the protocol
+     *
+     * @param sourceConnectorConfig Source connector configuration
+     */
+    @Override
+    public void initialize(SourceConnectorConfig sourceConnectorConfig) {
+        this.sourceConnectorConfig = sourceConnectorConfig;
+        log.info("Initialized MCP Standard Protocol");
+    }
+
+    /**
+     * Set the handler for the route
+     * This method is called when using the protocol in a generic HTTP connector context
+     *
+     * @param route Vert.x route to configure
+     * @param queue Queue for storing requests
+     */
+    @Override
+    public void setHandler(Route route, BlockingQueue<Object> queue) {
+        route.method(HttpMethod.POST)
+                .handler(BodyHandler.create())
+                .handler(ctx -> {
+                    try {
+                        // Parse the request body
+                        String bodyString = ctx.body().asString(Constants.DEFAULT_CHARSET.toString());
+
+                        // Try to parse as JSON
+                        JsonObject requestJson;
+                        try {
+                            requestJson = new JsonObject(bodyString);
+                        } catch (Exception e) {
+                            log.error("Failed to parse request as JSON: {}", bodyString, e);
+                            ctx.response()
+                                    .setStatusCode(HttpResponseStatus.BAD_REQUEST.code())
+                                    .putHeader("Content-Type", "application/json")
+                                    .end(McpResponse.error("Invalid JSON format").toJsonStr());
+                            return;
+                        }
+
+                        // Extract JSON-RPC fields
+                        String method = requestJson.getString("type", "");
+                        String toolName = requestJson.getString("tool", "");
+                        JsonObject params = requestJson.getJsonObject("arguments");
+
+                        // Generate session ID if not present
+                        String sessionId = ctx.request().getHeader("Mcp-Session-Id");
+                        if (sessionId == null || sessionId.isEmpty()) {
+                            sessionId = generateSessionId();
+                        }
+
+                        // Create MCP request based on method type
+                        McpRequest mcpRequest = createMcpRequest(
+                                method,
+                                params,
+                                sessionId,
+                                toolName,
+                                ctx
+                        );
+
+                        // Queue the request
+                        if (!queue.offer(mcpRequest)) {
+                            log.error("Failed to queue MCP request: queue is full");
+                            ctx.response()
+                                    .setStatusCode(HttpResponseStatus.SERVICE_UNAVAILABLE.code())
+                                    .putHeader("Content-Type", "application/json")
+                                    .end(McpResponse.error("Service temporarily unavailable").toJsonStr());
+                            return;
+                        }
+
+                        // If data consistency is not enabled, return immediate response
+                        if (!sourceConnectorConfig.isDataConsistencyEnabled()) {
+                            ctx.response()
+                                    .setStatusCode(HttpResponseStatus.OK.code())
+                                    .putHeader("Content-Type", "application/json")
+                                    .end(McpResponse.success().toJsonStr());
+                        }
+                        // Otherwise, response will be sent after processing (via commit)
+
+                    } catch (Exception e) {
+                        log.error("Error handling MCP request", e);
+                        ctx.response()
+                                .setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code())
+                                .putHeader("Content-Type", "application/json")
+                                .end(McpResponse.error("Internal server error: " + e.getMessage()).toJsonStr());
+                    }
+                })
+                .failureHandler(ctx -> {
+                    log.error("Failed to handle MCP request", ctx.failure());
+
+                    // Return error response
+                    ctx.response()
+                            .setStatusCode(ctx.statusCode() > 0 ? ctx.statusCode() : 500)
+                            .putHeader("Content-Type", "application/json")
+                            .end(McpResponse.error(ctx.failure().getMessage()).toJsonStr());
+                });
+    }
+
+    /**
+     * Create MCP request from parsed JSON-RPC data
+     *
+     * @param method JSON-RPC method name
+     * @param params JSON-RPC params
+     * @param sessionId Session identifier
+     * @param tool Tool name
+     * @param ctx Routing context
+     * @return Constructed McpRequest
+     */
+    private McpRequest createMcpRequest(
+            String method,
+            JsonObject params,
+            String sessionId,
+            String tool,
+            io.vertx.ext.web.RoutingContext ctx) {
+
+        McpRequest.McpRequestBuilder builder = McpRequest.builder()
+                .protocolName(PROTOCOL_NAME)
+                .sessionId(sessionId)
+                .method(method)
+                .toolName(tool)
+                .timestamp(System.currentTimeMillis())
+                .routingContext(ctx);
+
+
+        // Handle different method types
+        if ("mcp.tools.call".equals(method) && params != null) {
+            // Tool call request
+            String toolName = params.getString("name");
+            JsonObject arguments = params.getJsonObject("arguments", new JsonObject());
+
+            builder.toolName(toolName)
+                    .arguments(arguments)
+                    .success(false); // Will be set to true after execution
+
+
+        } else if ("initialize".equals(method)) {
+            // Initialize request
+            builder.success(true);
+
+        } else {
+            // Other methods
+            builder.success(true);
+        }
+
+        return builder.build();
+    }
+
+    /**
+     * Convert MCP request to ConnectRecord
+     * Simple and direct conversion following the existing pattern
+     *
+     * @param message MCP request message
+     * @return ConnectRecord representation
+     */
+    @Override
+    public ConnectRecord convertToConnectRecord(Object message) {
+        // Validate input
+        if (message == null) {
+            throw new IllegalArgumentException("Message cannot be null");
+        }
+
+        if (!(message instanceof McpRequest)) {
+            throw new IllegalArgumentException(
+                    String.format("Expected McpRequest but got %s", message.getClass().getName())
+            );
+        }
+
+        McpRequest request = (McpRequest) message;
+
+        // Get timestamp
+        long timestamp = request.getTimestamp() > 0
+                ? request.getTimestamp()
+                : System.currentTimeMillis();
+
+        // Get data (priority: result > arguments > inputs)
+        Object data = extractData(request);
+
+        // Create ConnectRecord
+        ConnectRecord connectRecord = new ConnectRecord(null, null, timestamp, data);
+
+        // Add protocol extension
+        connectRecord.addExtension(EXTENSION_PROTOCOL, PROTOCOL_NAME);
+
+        // Add session ID
+        if (request.getSessionId() != null) {
+            connectRecord.addExtension(EXTENSION_SESSION_ID, request.getSessionId());
+        }
+
+        // Add method
+        if (request.getMethod() != null) {
+            connectRecord.addExtension(EXTENSION_METHOD, request.getMethod());
+        }
+
+        // Add tool name (for tool calls)
+        if (request.getToolName() != null) {
+            connectRecord.addExtension(EXTENSION_TOOL_NAME, request.getToolName());
+        }
+
+        // Add success status
+        connectRecord.addExtension(EXTENSION_SUCCESS, String.valueOf(request.isSuccess()));
+
+        // Add error message if failed
+        if (!request.isSuccess() && request.getErrorMessage() != null) {
+            connectRecord.addExtension(EXTENSION_ERROR_MESSAGE, request.getErrorMessage());
+        }
+
+        // Handle Base64 decoding if needed
+        handleBase64Decoding(connectRecord);
+
+        // Add routing context for response handling
+        if (request.getRoutingContext() != null) {
+            connectRecord.addExtension(EXTENSION_ROUTING_CONTEXT, request.getRoutingContext());
+        }
+
+        return connectRecord;
+    }
+
+    /**
+     * Extract data from MCP request
+     * Priority: result > arguments > inputs
+     */
+    private Object extractData(McpRequest request) {
+        if (request.isSuccess() && request.getResult() != null) {
+            return request.getResult().encode();
+        }
+
+        if (request.getArguments() != null) {
+            return request.getArguments().encode();
+        }
+
+        return String.format("{\"tool\":\"%s\",\"timestamp\":%d}",
+                request.getToolName(), request.getTimestamp());
+    }
+
+    /**
+     * Handle Base64 decoding if isBase64 flag is set
+     */
+    private void handleBase64Decoding(ConnectRecord connectRecord) {
+        Object isBase64Obj = connectRecord.getExtensionObj(EXTENSION_IS_BASE64);
+
+        if (isBase64Obj == null) {
+            return;
+        }
+
+        // Parse boolean value
+        boolean isBase64;
+        if (isBase64Obj instanceof Boolean) {
+            isBase64 = (Boolean) isBase64Obj;
+        } else {
+            isBase64 = Boolean.parseBoolean(String.valueOf(isBase64Obj));
+        }
+
+        // Decode if needed
+        if (isBase64 && connectRecord.getData() != null) {
+            try {
+                String dataStr = connectRecord.getData().toString();
+                byte[] decodedData = Base64.getDecoder().decode(dataStr);
+                connectRecord.setData(decodedData);
+                log.debug("Decoded Base64 data: {} bytes", decodedData.length);
+            } catch (IllegalArgumentException e) {
+                log.error("Failed to decode Base64 data: {}", e.getMessage());
+                // Keep original data if decoding fails
+            }
+        }
+    }
+
+    /**
+     * Generate a unique session ID
+     *
+     * @return Generated session ID
+     */
+    private String generateSessionId() {
+        return "mcp-session-" + UUID.randomUUID();
+    }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/resources/META-INF.eventmesh/org.apache.eventmesh.openconnect.api.ConnectorCreateService b/eventmesh-connectors/eventmesh-connector-mcp/src/main/resources/META-INF.eventmesh/org.apache.eventmesh.openconnect.api.ConnectorCreateService
new file mode 100644
index 0000000..01a46e9
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/resources/META-INF.eventmesh/org.apache.eventmesh.openconnect.api.ConnectorCreateService
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+MCP-Source=org.apache.eventmesh.connector.mcp.source.McpSourceConnector
+MCP-Sink=org.apache.eventmesh.connector.mcp.sink.McpSinkConnector
\ No newline at end of file
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/resources/server-config.yml b/eventmesh-connectors/eventmesh-connector-mcp/src/main/resources/server-config.yml
new file mode 100644
index 0000000..8009f5c
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/resources/server-config.yml
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+sourceEnable: true
+sinkEnable: false
\ No newline at end of file
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/resources/sink-config.yml b/eventmesh-connectors/eventmesh-connector-mcp/src/main/resources/sink-config.yml
new file mode 100644
index 0000000..c04f886
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/resources/sink-config.yml
@@ -0,0 +1,56 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pubSubConfig:
+  meshAddress: 127.0.0.1:10000
+  subject: TopicTest
+  idc: FT
+  env: PRD
+  group: mcpSink
+  appId: 5032
+  userName: mcpSourceUser
+  passWord: mcpPassWord
+connectorConfig:
+  connectorName: mcpSink
+  urls:
+    - http://127.0.0.1:7092/test
+  keepAlive: true
+  keepAliveTimeout: 60000
+  idleTimeout: 5000   # timeunit: ms, recommended scope: common(5s - 10s), webhook(15s - 60s)
+  connectionTimeout: 5000   # timeunit: ms, recommended scope: 5 - 10s
+  maxConnectionPoolSize: 5
+  retryConfig:
+    maxRetries: 2
+    interval: 1000
+    retryOnNonSuccess: false
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/resources/source-config.yml b/eventmesh-connectors/eventmesh-connector-mcp/src/main/resources/source-config.yml
new file mode 100644
index 0000000..66ad75d
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/resources/source-config.yml
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pubSubConfig:
+    meshAddress: 127.0.0.1:10000
+    subject: TopicTest
+    idc: FT
+    env: PRD
+    group: mcpSource
+    appId: 5032
+    userName: mcpSourceUser
+    passWord: mcpPassWord
+connectorConfig:
+    connectorName: mcpSource
+    path: /test
+    port: 7091
+    idleTimeout: 5000   # timeunit: ms
+    maxFormAttributeSize: 1048576 # timeunit: byte, default: 1048576(1MB). This applies only when handling form data submissions.
+    protocol: MCP # Case insensitive, default: CloudEvent, options: CloudEvent, GitHub, Common
+    extraConfig: # extra config for different protocol, e.g. GitHub secret
+        streamType: chunked
+        contentType: application/json
+        reconnection: true
+    forwardPath: /forward
\ No newline at end of file
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/test/resources/server-config.yml b/eventmesh-connectors/eventmesh-connector-mcp/src/test/resources/server-config.yml
new file mode 100644
index 0000000..0cd7b5b
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/test/resources/server-config.yml
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+sourceEnable: true
+sinkEnable: false
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/RemoteSubscribeInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/RemoteSubscribeInstance.java
index 99837d2..f045bf0 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/RemoteSubscribeInstance.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/RemoteSubscribeInstance.java
@@ -32,23 +32,66 @@
 import org.apache.eventmesh.common.utils.IPUtils;
 import org.apache.eventmesh.common.utils.JsonUtils;
 
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
 
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Objects;
 
 import io.netty.handler.codec.http.HttpMethod;
 
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
 public class RemoteSubscribeInstance {
 
     static final CloseableHttpClient httpClient = HttpClients.createDefault();
 
-    public static void main(String[] args) {
-        subscribeRemote();
+    public static void main(String[] args) throws IOException {
+        subscribeLocal();
+        //subscribeRemote();
         // unsubscribeRemote();
     }
 
+    private static void subscribeLocal() throws IOException {
+        SubscriptionItem item = new SubscriptionItem();
+        item.setTopic(ExampleConstants.EVENTMESH_HTTP_ASYNC_TEST_TOPIC);
+        item.setMode(SubscriptionMode.CLUSTERING);
+        item.setType(SubscriptionType.ASYNC);
+
+        Map<String, Object> body = new HashMap<>();
+        body.put("url", "http://127.0.0.1:8088/sub/test");
+        body.put("consumerGroup", "EventMeshTest-consumerGroup");
+        body.put("topic", Collections.singletonList(item));
+
+        String json = JsonUtils.toJSONString(body);
+        // 2) use HttpPost
+        HttpPost post = new HttpPost("http://127.0.0.1:10105/eventmesh/subscribe/local");
+        post.setHeader("Content-Type", "application/json");
+        post.setHeader("env", "prod");
+        post.setHeader("idc", "default");
+        post.setHeader("sys", "http-client-demo");
+        post.setHeader("username", "eventmesh");
+        post.setHeader("passwd", "eventmesh");
+        post.setHeader("ip", IPUtils.getLocalAddress());
+        post.setHeader("language", "JAVA");
+        post.setEntity(new StringEntity(json, StandardCharsets.UTF_8));
+
+        try (CloseableHttpResponse resp = httpClient.execute(post)) {
+            String respBody = EntityUtils.toString(resp.getEntity(), StandardCharsets.UTF_8);
+            log.info("respStatusLine:{}", resp.getStatusLine());
+            log.info("respBody:{}", respBody);
+        }
+    }
+
     private static void subscribeRemote() {
         SubscriptionItem subscriptionItem = new SubscriptionItem();
         subscriptionItem.setTopic(ExampleConstants.EVENTMESH_HTTP_ASYNC_TEST_TOPIC);
diff --git a/settings.gradle b/settings.gradle
index b013a57..327ca7e 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -76,6 +76,7 @@
 include 'eventmesh-connectors:eventmesh-connector-http'
 include 'eventmesh-connectors:eventmesh-connector-chatgpt'
 include 'eventmesh-connectors:eventmesh-connector-canal'
+include 'eventmesh-connectors:eventmesh-connector-mcp'
 
 include 'eventmesh-storage-plugin:eventmesh-storage-api'
 include 'eventmesh-storage-plugin:eventmesh-storage-standalone'