[ISSUE #5208] develop mcp protocol (#5203)
* basic arch
* refine
* basic arch
* build the basic mcp server without streamable http
* build the basic mcp server without streamable http
* build the basic mcp server without streamable http
* build the basic mcp server with streamable http
* build the basic mcp server
* build the basic mcp server
* build the basic mcp server
* build the basic mcp server
* build the basic mcp server
* build the basic mcp server
* build the basic mcp server
* build the basic mcp server
* build the basic mcp server
* Update RemoteSubscribeInstance.java
* Update McpSinkHandlerRetryWrapper.java
* Update CommonMcpSinkHandler.java
* Update McpSinkConnector.java
* Update McpSinkHandler.java
* Update McpExportRecord.java
* Update McpConnectRecord.java
* Update McpExportRecordPage.java
* Update McpExportMetadata.java
* Update McpSourceConnector.java
* Update McpToolRegistry.java
* Update McpServerConfig.java
* Update Protocol.java
* Update McpSourceConnector.java
* Update McpSourceConstants.java
* Update McpStandardProtocol.java
* Update McpRequest.java
* Update McpResponse.java
* Update McpSinkHandlerRetryWrapper.java
* Update AbstractMcpSinkHandler.java
---------
Co-authored-by: youyun.0601 <youyun.0601@bytedance.com>
Co-authored-by: wqliang <wqliang@users.noreply.github.com>
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/McpRetryConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/McpRetryConfig.java
new file mode 100644
index 0000000..44889f9
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/McpRetryConfig.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.config.connector.mcp;
+
+import lombok.Data;
+
+@Data
+public class McpRetryConfig {
+ // maximum number of retries, default 2, minimum 0
+ private int maxRetries = 2;
+
+ // retry interval, default 1000ms
+ private int interval = 1000;
+
+ // Default value is false, indicating that only requests with network-level errors will be retried.
+ // If set to true, all failed requests will be retried, including network-level errors and non-2xx responses.
+ private boolean retryOnNonSuccess = false;
+}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/McpSinkConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/McpSinkConfig.java
new file mode 100644
index 0000000..ce64551
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/McpSinkConfig.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.config.connector.mcp;
+
+import org.apache.eventmesh.common.config.connector.SinkConfig;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class McpSinkConfig extends SinkConfig {
+
+ public SinkConnectorConfig connectorConfig;
+}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/McpSourceConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/McpSourceConfig.java
new file mode 100644
index 0000000..320cc37
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/McpSourceConfig.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.config.connector.mcp;
+
+import org.apache.eventmesh.common.config.connector.SourceConfig;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class McpSourceConfig extends SourceConfig {
+
+ public SourceConnectorConfig connectorConfig;
+}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/SinkConnectorConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/SinkConnectorConfig.java
new file mode 100644
index 0000000..54a02fb
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/SinkConnectorConfig.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.config.connector.mcp;
+
+
+import lombok.Data;
+
+@Data
+public class SinkConnectorConfig {
+
+ private String connectorName;
+
+ private String[] urls;
+
+ // keepAlive, default true
+ private boolean keepAlive = true;
+
+ // timeunit: ms, default 60000ms
+ private int keepAliveTimeout = 60 * 1000; // Keep units consistent
+
+ // timeunit: ms, default 5000ms, recommended scope: 5000ms - 10000ms
+ private int connectionTimeout = 5000;
+
+ // timeunit: ms, default 5000ms
+ private int idleTimeout = 5000;
+
+ // maximum number of HTTP/1 connections a client will pool, default 50
+ private int maxConnectionPoolSize = 50;
+
+ // retry config
+ private McpRetryConfig retryConfig = new McpRetryConfig();
+
+ private String deliveryStrategy = "ROUND_ROBIN";
+
+ private boolean skipDeliverException = false;
+
+ // managed pipelining param, default true
+ private boolean isParallelized = true;
+
+ private int parallelism = 2;
+
+
+ /**
+ * Fill default values if absent (When there are multiple default values for a field)
+ *
+ * @param config SinkConnectorConfig
+ */
+ public static void populateFieldsWithDefaults(SinkConnectorConfig config) {
+ /*
+ * set default values for idleTimeout
+ * recommended scope: common(5s - 10s), webhook(15s - 30s)
+ */
+ final int commonHttpIdleTimeout = 5000;
+
+ // Set default values for idleTimeout
+ if (config.getIdleTimeout() == 0) {
+ config.setIdleTimeout(commonHttpIdleTimeout);
+ }
+
+ }
+}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/SourceConnectorConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/SourceConnectorConfig.java
new file mode 100644
index 0000000..1880894
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mcp/SourceConnectorConfig.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.config.connector.mcp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import lombok.Data;
+
+@Data
+public class SourceConnectorConfig {
+
+ private String connectorName;
+
+ private String path = "/";
+
+ private int port;
+
+ // timeunit: ms, default 5000ms
+ private int idleTimeout = 5000;
+
+ /**
+ * <ul>
+ * <li>The maximum size allowed for form attributes when Content-Type is application/x-www-form-urlencoded or multipart/form-data </li>
+ * <li>Default is 1MB (1024 * 1024 bytes). </li>
+ * <li>If you receive a "size exceed allowed maximum capacity" error, you can increase this value. </li>
+ * <li>Note: This applies only when handling form data submissions.</li>
+ * </ul>
+ */
+ private int maxFormAttributeSize = 1024 * 1024;
+
+ // max size of the queue, default 1000
+ private int maxStorageSize = 1000;
+
+ // batch size, default 10
+ private int batchSize = 10;
+
+ // protocol, default CloudEvent
+ private String protocol = "Mcp";
+
+ // extra config, e.g. GitHub secret
+ private Map<String, String> extraConfig = new HashMap<>();
+
+ // data consistency enabled, default true
+ private boolean dataConsistencyEnabled = false;
+
+ private String forwardPath;
+}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java
index f232854..9b1bfe6 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java
@@ -162,6 +162,17 @@
}
}
+ public static <T> T parseObject(String text, TypeReference<T> typeReference) {
+ if (StringUtils.isEmpty(text)) {
+ return null;
+ }
+ try {
+ return OBJECT_MAPPER.readValue(text, typeReference);
+ } catch (JsonProcessingException e) {
+ throw new JsonException("deserialize json string to object error", e);
+ }
+ }
+
/**
* parse json string to object.
*
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/resources/server-config.yml b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/server-config.yml
index 0cd7b5b..5f66dd0 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/resources/server-config.yml
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/server-config.yml
@@ -16,4 +16,4 @@
#
sourceEnable: true
-sinkEnable: false
+sinkEnable: true
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/build.gradle b/eventmesh-connectors/eventmesh-connector-mcp/build.gradle
new file mode 100644
index 0000000..82072c2
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/build.gradle
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+dependencies {
+ api project(":eventmesh-openconnect:eventmesh-openconnect-java")
+ implementation project(":eventmesh-common")
+ implementation project(":eventmesh-connectors:eventmesh-connector-http")
+ implementation project(":eventmesh-protocol-plugin:eventmesh-protocol-api")
+ implementation "io.cloudevents:cloudevents-core"
+ implementation "com.google.guava:guava"
+ implementation "io.cloudevents:cloudevents-json-jackson"
+ implementation ("io.grpc:grpc-protobuf:1.68.0") {
+ exclude group: "com.google.protobuf", module: "protobuf-java"
+ }
+ implementation 'io.cloudevents:cloudevents-http-vertx:3.0.0'
+ implementation 'io.vertx:vertx-web:4.5.8'
+ implementation 'io.vertx:vertx-web-client:4.5.9'
+ implementation 'dev.failsafe:failsafe:3.3.2'
+
+
+ testImplementation 'org.apache.httpcomponents.client5:httpclient5:5.4'
+ testImplementation 'org.apache.httpcomponents.client5:httpclient5-fluent:5.4'
+ testImplementation 'org.mock-server:mockserver-netty:5.15.0'
+ implementation 'io.netty:netty-codec-http:4.1.114.Final'
+ compileOnly 'org.projectlombok:lombok'
+ annotationProcessor 'org.projectlombok:lombok'
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/gradle.properties b/eventmesh-connectors/eventmesh-connector-mcp/gradle.properties
new file mode 100644
index 0000000..5e98eb9
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/gradle.properties
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+pluginType=connector
+pluginName=mcp
\ No newline at end of file
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/config/McpServerConfig.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/config/McpServerConfig.java
new file mode 100644
index 0000000..33357b6
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/config/McpServerConfig.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.eventmesh.connector.mcp.config;
+
+import org.apache.eventmesh.common.config.connector.Config;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class McpServerConfig extends Config {
+
+ private boolean sourceEnable;
+
+ private boolean sinkEnable;
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/server/McpConnectServer.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/server/McpConnectServer.java
new file mode 100644
index 0000000..71ea9ae
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/server/McpConnectServer.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.server;
+
+import org.apache.eventmesh.connector.mcp.config.McpServerConfig;
+import org.apache.eventmesh.connector.mcp.sink.McpSinkConnector;
+import org.apache.eventmesh.connector.mcp.source.McpSourceConnector;
+import org.apache.eventmesh.openconnect.Application;
+import org.apache.eventmesh.openconnect.util.ConfigUtil;
+
+public class McpConnectServer {
+ public static void main(String[] args) throws Exception {
+ McpServerConfig serverConfig = ConfigUtil.parse(McpServerConfig.class, "server-config.yml");
+
+ if (serverConfig.isSourceEnable()) {
+ Application mcpSourceApp = new Application();
+ mcpSourceApp.run(McpSourceConnector.class);
+ }
+
+ if (serverConfig.isSinkEnable()) {
+ Application mcpSinkApp = new Application();
+ mcpSinkApp.run(McpSinkConnector.class);
+ }
+ }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/McpSinkConnector.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/McpSinkConnector.java
new file mode 100644
index 0000000..3d65fb9
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/McpSinkConnector.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.sink;
+
+import org.apache.eventmesh.common.EventMeshThreadFactory;
+import org.apache.eventmesh.common.config.connector.Config;
+import org.apache.eventmesh.common.config.connector.mcp.McpSinkConfig;
+import org.apache.eventmesh.common.config.connector.mcp.SinkConnectorConfig;
+import org.apache.eventmesh.connector.mcp.sink.handler.McpSinkHandler;
+import org.apache.eventmesh.connector.mcp.sink.handler.impl.CommonMcpSinkHandler;
+import org.apache.eventmesh.connector.mcp.sink.handler.impl.McpSinkHandlerRetryWrapper;
+import org.apache.eventmesh.openconnect.api.ConnectorCreateService;
+import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
+import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
+import org.apache.eventmesh.openconnect.api.sink.Sink;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import lombok.Getter;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+public class McpSinkConnector implements Sink, ConnectorCreateService<Sink> {
+
+ private McpSinkConfig mcpSinkConfig;
+
+ @Getter
+ private McpSinkHandler sinkHandler;
+
+ private ThreadPoolExecutor executor;
+
+ private final AtomicBoolean isStart = new AtomicBoolean(false);
+
+ @Override
+ public Class<? extends Config> configClass() {
+ return McpSinkConfig.class;
+ }
+
+ @Override
+ public Sink create() {
+ return new McpSinkConnector();
+ }
+
+ @Override
+ public void init(Config config) throws Exception {
+ this.mcpSinkConfig = (McpSinkConfig) config;
+ doInit();
+ }
+
+ @Override
+ public void init(ConnectorContext connectorContext) throws Exception {
+ SinkConnectorContext sinkConnectorContext = (SinkConnectorContext) connectorContext;
+ this.mcpSinkConfig = (McpSinkConfig) sinkConnectorContext.getSinkConfig();
+ doInit();
+ }
+
+ @SneakyThrows
+ private void doInit() {
+ // Fill default values if absent
+ SinkConnectorConfig.populateFieldsWithDefaults(this.mcpSinkConfig.connectorConfig);
+ // Create different handlers for different configurations
+ McpSinkHandler nonRetryHandler;
+
+ nonRetryHandler = new CommonMcpSinkHandler(this.mcpSinkConfig.connectorConfig);
+
+ int maxRetries = this.mcpSinkConfig.connectorConfig.getRetryConfig().getMaxRetries();
+ if (maxRetries == 0) {
+ // Use the original sink handler
+ this.sinkHandler = nonRetryHandler;
+ } else if (maxRetries > 0) {
+ // Wrap the sink handler with a retry handler
+ this.sinkHandler = new McpSinkHandlerRetryWrapper(this.mcpSinkConfig.connectorConfig, nonRetryHandler);
+ } else {
+ throw new IllegalArgumentException("Max retries must be greater than or equal to 0.");
+ }
+
+ boolean isParallelized = this.mcpSinkConfig.connectorConfig.isParallelized();
+ int parallelism = isParallelized ? this.mcpSinkConfig.connectorConfig.getParallelism() : 1;
+
+ // Use the executor's built-in queue with a reasonable capacity
+ executor = new ThreadPoolExecutor(
+ parallelism,
+ parallelism,
+ 0L,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(), // Built-in queue with capacity
+ new EventMeshThreadFactory("mcp-sink-handler")
+ );
+ }
+
+ @Override
+ public void start() throws Exception {
+ this.sinkHandler.start();
+ isStart.set(true);
+ }
+
+ @Override
+ public void commit(ConnectRecord record) {
+
+ }
+
+ @Override
+ public String name() {
+ return this.mcpSinkConfig.connectorConfig.getConnectorName();
+ }
+
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
+ @Override
+ public void stop() throws Exception {
+ isStart.set(false);
+
+ log.info("Stopping mcp sink connector, shutting down executor...");
+ executor.shutdown();
+
+ try {
+ // Wait for existing tasks to complete
+ if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
+ log.warn("Executor did not terminate gracefully, forcing shutdown");
+ executor.shutdownNow();
+ // Wait a bit more for tasks to respond to being cancelled
+ if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
+ log.error("Executor did not terminate after forced shutdown");
+ }
+ }
+ } catch (InterruptedException e) {
+ log.warn("Interrupted while waiting for executor termination");
+ executor.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+
+ this.sinkHandler.stop();
+ }
+
+ @Override
+ public void put(List<ConnectRecord> sinkRecords) {
+ if (!isStart.get()) {
+ log.warn("Connector is not started, ignoring sink records");
+ return;
+ }
+
+ for (ConnectRecord sinkRecord : sinkRecords) {
+ if (Objects.isNull(sinkRecord)) {
+ log.warn("ConnectRecord data is null, ignore.");
+ continue;
+ }
+ log.info("McpSinkConnector put record: {}", sinkRecord);
+
+ try {
+ // Use executor.submit() instead of custom queue
+ executor.submit(() -> {
+ try {
+ sinkHandler.handle(sinkRecord);
+ } catch (Exception e) {
+ log.error("Failed to handle sink record via mcp", e);
+ }
+ });
+ } catch (Exception e) {
+ log.error("Failed to submit sink record to executor", e);
+ }
+ }
+ }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpAttemptEvent.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpAttemptEvent.java
new file mode 100644
index 0000000..451fc35
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpAttemptEvent.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.sink.data;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Single MCP attempt event
+ */
+public class McpAttemptEvent {
+
+ public static final String PREFIX = "mcp-attempt-event-";
+
+ private final int maxAttempts;
+
+ private final AtomicInteger attempts;
+
+ private Throwable lastException;
+
+
+ public McpAttemptEvent(int maxAttempts) {
+ this.maxAttempts = maxAttempts;
+ this.attempts = new AtomicInteger(0);
+ }
+
+ /**
+ * Increment the attempts
+ */
+ public void incrementAttempts() {
+ attempts.incrementAndGet();
+ }
+
+ /**
+ * Update the event, incrementing the attempts and setting the last exception
+ *
+ * @param exception the exception to update, can be null
+ */
+ public void updateEvent(Throwable exception) {
+ // increment the attempts
+ incrementAttempts();
+
+ // update the last exception
+ lastException = exception;
+ }
+
+ /**
+ * Check if the attempts are less than the maximum attempts
+ *
+ * @return true if the attempts are less than the maximum attempts, false otherwise
+ */
+ public boolean canAttempt() {
+ return attempts.get() < maxAttempts;
+ }
+
+ public boolean isComplete() {
+ if (attempts.get() == 0) {
+ // No start yet
+ return false;
+ }
+
+ // If no attempt can be made or the last exception is null, the event completed
+ return !canAttempt() || lastException == null;
+ }
+
+
+ public int getMaxAttempts() {
+ return maxAttempts;
+ }
+
+ public int getAttempts() {
+ return attempts.get();
+ }
+
+ public Throwable getLastException() {
+ return lastException;
+ }
+
+ /**
+ * Get the limited exception message with the default limit of 256
+ *
+ * @return the limited exception message
+ */
+ public String getLimitedExceptionMessage() {
+ return getLimitedExceptionMessage(256);
+ }
+
+ /**
+ * Get the limited exception message with the specified limit
+ *
+ * @param maxLimit the maximum limit of the exception message
+ * @return the limited exception message
+ */
+ public String getLimitedExceptionMessage(int maxLimit) {
+ if (lastException == null) {
+ return "";
+ }
+ String message = lastException.getMessage();
+ if (message == null) {
+ return "";
+ }
+ if (message.length() > maxLimit) {
+ return message.substring(0, maxLimit);
+ }
+ return message;
+ }
+
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpConnectRecord.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpConnectRecord.java
new file mode 100644
index 0000000..26f9c1e
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpConnectRecord.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.sink.data;
+
+import org.apache.eventmesh.common.remote.offset.http.HttpRecordOffset;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.KeyValue;
+
+import java.io.Serializable;
+import java.time.LocalDateTime;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import lombok.Builder;
+import lombok.Getter;
+
+/**
+ * a special ConnectRecord for McpSinkConnector
+ */
+@Getter
+@Builder
+public class McpConnectRecord implements Serializable {
+
+ private static final long serialVersionUID = 5271462532332251473L;
+
+ /**
+ * The unique identifier for the McpConnectRecord
+ */
+ private final String mcpRecordId = UUID.randomUUID().toString();
+
+ /**
+ * The time when the McpConnectRecord was created
+ */
+ private LocalDateTime createTime;
+
+ /**
+ * The type of the McpConnectRecord
+ */
+ private String type;
+
+ /**
+ * The event id of the McpConnectRecord
+ */
+ private String eventId;
+
+ private Object data;
+
+ private KeyValue extensions;
+
+ @Override
+ public String toString() {
+ return "McpConnectRecord{"
+ + "createTime=" + createTime
+ + ", mcpRecordId='" + mcpRecordId
+ + ", type='" + type
+ + ", eventId='" + eventId
+ + ", data=" + data
+ + ", extensions=" + extensions
+ + '}';
+ }
+
+ /**
+ * Convert ConnectRecord to McpConnectRecord
+ *
+ * @param record the ConnectRecord to convert
+ * @return the converted McpConnectRecord
+ */
+ public static McpConnectRecord convertConnectRecord(ConnectRecord record, String type) {
+ Map<String, ?> offsetMap = new HashMap<>();
+ if (record != null && record.getPosition() != null && record.getPosition().getRecordOffset() != null) {
+ if (HttpRecordOffset.class.equals(record.getPosition().getRecordOffsetClazz())) {
+ offsetMap = ((HttpRecordOffset) record.getPosition().getRecordOffset()).getOffsetMap();
+ }
+ }
+ String offset = "0";
+ if (!offsetMap.isEmpty()) {
+ offset = offsetMap.values().iterator().next().toString();
+ }
+ if (record.getData() instanceof byte[]) {
+ String data = Base64.getEncoder().encodeToString((byte[]) record.getData());
+ record.addExtension("isBase64", true);
+ return McpConnectRecord.builder()
+ .type(type)
+ .createTime(LocalDateTime.now())
+ .eventId(type + "-" + offset)
+ .data(data)
+ .extensions(record.getExtensions())
+ .build();
+ } else {
+ record.addExtension("isBase64", false);
+ return McpConnectRecord.builder()
+ .type(type)
+ .createTime(LocalDateTime.now())
+ .eventId(type + "-" + offset)
+ .data(record.getData())
+ .extensions(record.getExtensions())
+ .build();
+ }
+ }
+
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpExportMetadata.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpExportMetadata.java
new file mode 100644
index 0000000..72595b2
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpExportMetadata.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.sink.data;
+
+import java.io.Serializable;
+import java.time.LocalDateTime;
+
+import lombok.Builder;
+import lombok.Data;
+
+/**
+ * Metadata for an MCP export operation.
+ */
+@Data
+@Builder
+public class McpExportMetadata implements Serializable {
+
+ private static final long serialVersionUID = 1121010466793041920L;
+
+ private String url;
+
+ private int code;
+
+ private String message;
+
+ private LocalDateTime receivedTime;
+
+ private String recordId;
+
+ private String retriedBy;
+
+ private int retryNum;
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpExportRecord.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpExportRecord.java
new file mode 100644
index 0000000..c9a35c1
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpExportRecord.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.sink.data;
+
+import java.io.Serializable;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+/**
+ * Represents an MCP export record containing metadata and data to be exported.
+ */
+@Data
+@AllArgsConstructor
+public class McpExportRecord implements Serializable {
+
+ private static final long serialVersionUID = 6010283911452947157L;
+
+ private McpExportMetadata metadata;
+
+ private Object data;
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpExportRecordPage.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpExportRecordPage.java
new file mode 100644
index 0000000..3e0e615
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/McpExportRecordPage.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.sink.data;
+
+import java.io.Serializable;
+import java.util.List;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+/**
+ * Represents a page of MCP export records.
+ */
+@Data
+@AllArgsConstructor
+public class McpExportRecordPage implements Serializable {
+
+ private static final long serialVersionUID = 1143791658357035990L;
+
+ private int pageNum;
+
+ private int pageSize;
+
+ private List<McpExportRecord> pageItems;
+
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/MultiMcpRequestContext.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/MultiMcpRequestContext.java
new file mode 100644
index 0000000..f24d0e3
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/data/MultiMcpRequestContext.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.sink.data;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ * Multi Mcp request context
+ */
+public class MultiMcpRequestContext {
+
+ public static final String NAME = "multi-http-request-context";
+
+ /**
+ * The remaining requests to be processed.
+ */
+ private final AtomicInteger remainingRequests;
+
+ /**
+ * The last failed event.
+ * If retries occur but still fail, it will be logged, and only the last one will be retained.
+ */
+ private McpAttemptEvent lastFailedEvent;
+
+ public MultiMcpRequestContext(int remainingEvents) {
+ this.remainingRequests = new AtomicInteger(remainingEvents);
+ }
+
+ /**
+ * Decrement the remaining requests by 1.
+ */
+ public void decrementRemainingRequests() {
+ remainingRequests.decrementAndGet();
+ }
+
+ /**
+ * Check if all requests have been processed.
+ *
+ * @return true if all requests have been processed, false otherwise.
+ */
+ public boolean isAllRequestsProcessed() {
+ return remainingRequests.get() == 0;
+ }
+
+ public int getRemainingRequests() {
+ return remainingRequests.get();
+ }
+
+ public McpAttemptEvent getLastFailedEvent() {
+ return lastFailedEvent;
+ }
+
+ public void setLastFailedEvent(McpAttemptEvent lastFailedEvent) {
+ this.lastFailedEvent = lastFailedEvent;
+ }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/AbstractMcpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/AbstractMcpSinkHandler.java
new file mode 100644
index 0000000..5c7435d
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/AbstractMcpSinkHandler.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.sink.handler;
+
+import org.apache.eventmesh.common.config.connector.mcp.SinkConnectorConfig;
+import org.apache.eventmesh.connector.mcp.sink.data.McpAttemptEvent;
+import org.apache.eventmesh.connector.mcp.sink.data.McpConnectRecord;
+import org.apache.eventmesh.connector.mcp.sink.data.MultiMcpRequestContext;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import lombok.Getter;
+
+public abstract class AbstractMcpSinkHandler implements McpSinkHandler {
+ @Getter
+ private final SinkConnectorConfig sinkConnectorConfig;
+
+ @Getter
+ private final List<URI> urls;
+
+ private final McpDeliveryStrategy deliveryStrategy;
+
+ private int roundRobinIndex = 0;
+
+ protected AbstractMcpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
+ this.sinkConnectorConfig = sinkConnectorConfig;
+ this.deliveryStrategy = McpDeliveryStrategy.valueOf(sinkConnectorConfig.getDeliveryStrategy());
+ // Initialize URLs
+ String[] urlStrings = sinkConnectorConfig.getUrls();
+ this.urls = Arrays.stream(urlStrings)
+ .map(URI::create)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Processes a ConnectRecord by sending it over HTTP or HTTPS. This method should be called for each ConnectRecord that needs to be processed.
+ *
+ * @param record the ConnectRecord to process
+ */
+ @Override
+ public void handle(ConnectRecord record) {
+ // build attributes
+ Map<String, Object> attributes = new ConcurrentHashMap<>();
+
+ switch (deliveryStrategy) {
+ case ROUND_ROBIN:
+ attributes.put(MultiMcpRequestContext.NAME, new MultiMcpRequestContext(1));
+ URI url = urls.get(roundRobinIndex);
+ roundRobinIndex = (roundRobinIndex + 1) % urls.size();
+ sendRecordToUrl(record, attributes, url);
+ break;
+ case BROADCAST:
+ attributes.put(MultiMcpRequestContext.NAME, new MultiMcpRequestContext(urls.size()));
+ // send the record to all URLs
+ urls.forEach(url0 -> sendRecordToUrl(record, attributes, url0));
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown delivery strategy: " + deliveryStrategy);
+ }
+ }
+
+ private void sendRecordToUrl(ConnectRecord record, Map<String, Object> attributes, URI url) {
+ // convert ConnectRecord to HttpConnectRecord
+ String type = String.format("%s.%s.%s",
+ this.sinkConnectorConfig.getConnectorName(), url.getScheme(),
+ "common");
+ McpConnectRecord mcpConnectRecord = McpConnectRecord.convertConnectRecord(record, type);
+
+ // add AttemptEvent to the attributes
+ McpAttemptEvent attemptEvent = new McpAttemptEvent(this.sinkConnectorConfig.getRetryConfig().getMaxRetries() + 1);
+ attributes.put(McpAttemptEvent.PREFIX + mcpConnectRecord.getMcpRecordId(), attemptEvent);
+
+ // deliver the record
+ deliver(url, mcpConnectRecord, attributes, record);
+ }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/McpDeliveryStrategy.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/McpDeliveryStrategy.java
new file mode 100644
index 0000000..07cbbe3
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/McpDeliveryStrategy.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.sink.handler;
+
+public enum McpDeliveryStrategy {
+ ROUND_ROBIN,
+ BROADCAST
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/McpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/McpSinkHandler.java
new file mode 100644
index 0000000..c10d963
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/McpSinkHandler.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.sink.handler;
+
+import org.apache.eventmesh.connector.mcp.sink.data.McpConnectRecord;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.net.URI;
+import java.util.Map;
+
+import io.vertx.core.Future;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.ext.web.client.HttpResponse;
+
+/**
+ * Interface for handling ConnectRecords via HTTP or HTTPS. Classes implementing this interface are responsible for processing ConnectRecords by
+ * sending them over HTTP or HTTPS, with additional support for handling multiple requests and asynchronous processing.
+ *
+ * <p>Any class that needs to process ConnectRecords via HTTP or HTTPS should implement this interface.
+ * Implementing classes must provide implementations for the {@link #start()}, {@link #handle(ConnectRecord)},
+ * {@link #deliver(URI, McpConnectRecord, Map, ConnectRecord)}, and {@link #stop()} methods.</p>
+ *
+ * <p>Implementing classes should ensure thread safety and handle MCP communication efficiently.
+ * The {@link #start()} method initializes any necessary resources for MCP communication. The {@link #handle(ConnectRecord)} method processes a
+ * ConnectRecord by sending it over HTTP or HTTPS. The {@link #deliver(URI, McpConnectRecord, Map, ConnectRecord)} method processes HttpConnectRecord
+ * on specified URL while returning its own processing logic {@link #stop()} method releases any resources used for MCP communication.</p>
+ *
+ * <p>It's recommended to handle exceptions gracefully within the {@link #deliver(URI, McpConnectRecord, Map, ConnectRecord)} method
+ * to prevent message loss or processing interruptions.</p>
+ */
+public interface McpSinkHandler {
+
+ /**
+ * Initializes the MCP handler. This method should be called before using the handler.
+ */
+ void start();
+
+ /**
+ * Processes a ConnectRecord by sending it over HTTP or HTTPS. This method should be called for each ConnectRecord that needs to be processed.
+ *
+ * @param record the ConnectRecord to process
+ */
+ void handle(ConnectRecord record);
+
+
+ /**
+ * Processes HttpConnectRecord on specified URL while returning its own processing logic
+ *
+ * @param url URI to which the HttpConnectRecord should be sent
+ * @param mcpConnectRecord HttpConnectRecord to process
+ * @param attributes additional attributes to be used in processing
+ * @return processing chain
+ */
+ Future<HttpResponse<Buffer>> deliver(URI url, McpConnectRecord mcpConnectRecord, Map<String, Object> attributes, ConnectRecord connectRecord);
+
+ /**
+ * Cleans up and releases resources used by the MCP handler. This method should be called when the handler is no longer needed.
+ */
+ void stop();
+}
+
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/impl/CommonMcpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/impl/CommonMcpSinkHandler.java
new file mode 100644
index 0000000..1d884f4
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/impl/CommonMcpSinkHandler.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.sink.handler.impl;
+
+import org.apache.eventmesh.common.config.connector.mcp.SinkConnectorConfig;
+import org.apache.eventmesh.common.utils.JsonUtils;
+import org.apache.eventmesh.connector.http.util.HttpUtils;
+import org.apache.eventmesh.connector.mcp.sink.data.McpAttemptEvent;
+import org.apache.eventmesh.connector.mcp.sink.data.McpConnectRecord;
+import org.apache.eventmesh.connector.mcp.sink.data.MultiMcpRequestContext;
+import org.apache.eventmesh.connector.mcp.sink.handler.AbstractMcpSinkHandler;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.net.URI;
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.vertx.core.Future;
+import io.vertx.core.MultiMap;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpHeaders;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.WebClientOptions;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * Common MCP Sink Handler implementation to handle ConnectRecords by sending them over MCP to configured URLs.
+ *
+ * <p>This handler initializes a WebClient for making HTTP requests based on the provided SinkConnectorConfig.
+ * It handles processing ConnectRecords by converting them to HttpConnectRecord and sending them asynchronously to each configured URL using the
+ * WebClient.</p>
+ *
+ * <p>The handler uses Vert.x's WebClient to perform HTTP/HTTPS requests. It initializes the WebClient in the {@link #start()}
+ * method and closes it in the {@link #stop()} method to manage resources efficiently.</p>
+ *
+ * <p>Each ConnectRecord is processed and sent to all configured URLs concurrently using asynchronous HTTP requests.</p>
+ */
+@Slf4j
+@Getter
+public class CommonMcpSinkHandler extends AbstractMcpSinkHandler {
+
+ private WebClient webClient;
+
+
+ public CommonMcpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
+ super(sinkConnectorConfig);
+ }
+
+ /**
+ * Initializes the WebClient for making HTTP requests based on the provided SinkConnectorConfig.
+ */
+ @Override
+ public void start() {
+ // Create WebClient
+ doInitWebClient();
+ }
+
+ /**
+ * Initializes the WebClient with the provided configuration options.
+ */
+ private void doInitWebClient() {
+ SinkConnectorConfig sinkConnectorConfig = getSinkConnectorConfig();
+ final Vertx vertx = Vertx.vertx();
+ WebClientOptions options = new WebClientOptions()
+ .setKeepAlive(sinkConnectorConfig.isKeepAlive())
+ .setKeepAliveTimeout(sinkConnectorConfig.getKeepAliveTimeout() / 1000)
+ .setIdleTimeout(sinkConnectorConfig.getIdleTimeout())
+ .setIdleTimeoutUnit(TimeUnit.MILLISECONDS)
+ .setConnectTimeout(sinkConnectorConfig.getConnectionTimeout())
+ .setMaxPoolSize(sinkConnectorConfig.getMaxConnectionPoolSize())
+ .setPipelining(sinkConnectorConfig.isParallelized());
+ this.webClient = WebClient.create(vertx, options);
+ }
+
+ /**
+ * Processes HttpConnectRecord on specified URL while returning its own processing logic. This method sends the HttpConnectRecord to the specified
+ * URL using the WebClient.
+ *
+ * @param url URI to which the HttpConnectRecord should be sent
+ * @param mcpConnectRecord HttpConnectRecord to process
+ * @param attributes additional attributes to be used in processing
+ * @return processing chain
+ */
+ @Override
+ public Future<HttpResponse<Buffer>> deliver(URI url, McpConnectRecord mcpConnectRecord, Map<String, Object> attributes,
+ ConnectRecord connectRecord) {
+ // create headers
+ Map<String, Object> extensionMap = new HashMap<>();
+ Set<String> extensionKeySet = mcpConnectRecord.getExtensions().keySet();
+ for (String extensionKey : extensionKeySet) {
+ Object v = mcpConnectRecord.getExtensions().getObject(extensionKey);
+ extensionMap.put(extensionKey, v);
+ }
+
+ MultiMap headers = HttpHeaders.headers()
+ .set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=utf-8")
+ .set(HttpHeaderNames.ACCEPT, "application/json; charset=utf-8")
+ .set("extension", JsonUtils.toJSONString(extensionMap));
+ // get timestamp and offset
+ Long timestamp = mcpConnectRecord.getCreateTime()
+ .atZone(ZoneId.systemDefault())
+ .toInstant()
+ .toEpochMilli();
+
+ // send the request
+ return this.webClient.post(url.getPath())
+ .host(url.getHost())
+ .port(url.getPort() == -1 ? (Objects.equals(url.getScheme(), "https") ? 443 : 80) : url.getPort())
+ .putHeaders(headers)
+ .ssl(Objects.equals(url.getScheme(), "https"))
+ .sendJson(mcpConnectRecord.getData())
+ .onSuccess(res -> {
+ log.info("Request sent successfully. Record: timestamp={}", timestamp);
+
+ Exception e = null;
+
+ // log the response
+ if (HttpUtils.is2xxSuccessful(res.statusCode())) {
+ if (log.isDebugEnabled()) {
+ log.debug("Received successful response: statusCode={}. Record: timestamp={}, responseBody={}",
+ res.statusCode(), timestamp, res.bodyAsString());
+ } else {
+ log.info("Received successful response: statusCode={}. Record: timestamp={}", res.statusCode(), timestamp);
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, responseBody={}",
+ res.statusCode(), timestamp, res.bodyAsString());
+ } else {
+ log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}", res.statusCode(), timestamp);
+ }
+
+ e = new RuntimeException("Unexpected HTTP response code: " + res.statusCode());
+ }
+
+ // try callback
+ tryCallback(mcpConnectRecord, e, attributes, connectRecord);
+ }).onFailure(err -> {
+ log.error("Request failed to send. Record: timestamp={}", timestamp, err);
+
+ // try callback
+ tryCallback(mcpConnectRecord, err, attributes, connectRecord);
+ });
+ }
+
+ /**
+ * Tries to call the callback based on the result of the request.
+ *
+ * @param mcpConnectRecord the McpConnectRecord to use
+ * @param e the exception thrown during the request, may be null
+ * @param attributes additional attributes to be used in processing
+ */
+ private void tryCallback(McpConnectRecord mcpConnectRecord, Throwable e, Map<String, Object> attributes, ConnectRecord record) {
+ // get and update the attempt event
+ McpAttemptEvent attemptEvent = (McpAttemptEvent) attributes.get(McpAttemptEvent.PREFIX + mcpConnectRecord.getMcpRecordId());
+ attemptEvent.updateEvent(e);
+
+ // get and update the multiHttpRequestContext
+ MultiMcpRequestContext multiMcpRequestContext = getAndUpdateMultiMcpRequestContext(attributes, attemptEvent);
+
+ if (multiMcpRequestContext.isAllRequestsProcessed()) {
+ // do callback
+ if (record.getCallback() == null) {
+ if (log.isDebugEnabled()) {
+ log.warn("ConnectRecord callback is null. Ignoring callback. {}", record);
+ } else {
+ log.warn("ConnectRecord callback is null. Ignoring callback.");
+ }
+ return;
+ }
+
+ // get the last failed event
+ McpAttemptEvent lastFailedEvent = multiMcpRequestContext.getLastFailedEvent();
+ if (lastFailedEvent == null) {
+ // success
+ record.getCallback().onSuccess(convertToSendResult(record));
+ } else {
+ // failure
+ record.getCallback().onException(buildSendExceptionContext(record, lastFailedEvent.getLastException()));
+ }
+ } else {
+ log.warn("still have requests to process, size {}|attempt num {}",
+ multiMcpRequestContext.getRemainingRequests(), attemptEvent.getAttempts());
+ }
+ }
+
+
+ /**
+ * Gets and updates the multi mcp request context based on the provided attributes and HttpConnectRecord.
+ *
+ * @param attributes the attributes to use
+ * @param attemptEvent the McpAttemptEvent to use
+ * @return the updated multi mcp request context
+ */
+ private MultiMcpRequestContext getAndUpdateMultiMcpRequestContext(Map<String, Object> attributes, McpAttemptEvent attemptEvent) {
+ // get the multi http request context
+ MultiMcpRequestContext multiMcpRequestContext = (MultiMcpRequestContext) attributes.get(MultiMcpRequestContext.NAME);
+
+ // Check if the current attempted event has completed
+ if (attemptEvent.isComplete()) {
+ // decrement the counter
+ multiMcpRequestContext.decrementRemainingRequests();
+
+ if (attemptEvent.getLastException() != null) {
+ // if all attempts are exhausted, set the last failed event
+ multiMcpRequestContext.setLastFailedEvent(attemptEvent);
+ }
+ }
+
+ return multiMcpRequestContext;
+ }
+
+ private SendResult convertToSendResult(ConnectRecord record) {
+ SendResult result = new SendResult();
+ result.setMessageId(record.getRecordId());
+ if (org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic"))) {
+ result.setTopic(record.getExtension("topic"));
+ }
+ return result;
+ }
+
+ private SendExceptionContext buildSendExceptionContext(ConnectRecord record, Throwable e) {
+ SendExceptionContext sendExceptionContext = new SendExceptionContext();
+ sendExceptionContext.setMessageId(record.getRecordId());
+ sendExceptionContext.setCause(e);
+ if (org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic"))) {
+ sendExceptionContext.setTopic(record.getExtension("topic"));
+ }
+ return sendExceptionContext;
+ }
+
+
+ /**
+ * Cleans up and releases resources used by the MCP handler.
+ */
+ @Override
+ public void stop() {
+ if (this.webClient != null) {
+ this.webClient.close();
+ } else {
+ log.warn("WebClient is null, ignore.");
+ }
+ }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/impl/McpSinkHandlerRetryWrapper.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/impl/McpSinkHandlerRetryWrapper.java
new file mode 100644
index 0000000..c2e9908
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/sink/handler/impl/McpSinkHandlerRetryWrapper.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.sink.handler.impl;
+
+import org.apache.eventmesh.common.config.connector.mcp.McpRetryConfig;
+import org.apache.eventmesh.common.config.connector.mcp.SinkConnectorConfig;
+import org.apache.eventmesh.connector.http.util.HttpUtils;
+import org.apache.eventmesh.connector.mcp.sink.data.McpConnectRecord;
+import org.apache.eventmesh.connector.mcp.sink.handler.AbstractMcpSinkHandler;
+import org.apache.eventmesh.connector.mcp.sink.handler.McpSinkHandler;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.net.ConnectException;
+import java.net.URI;
+import java.time.Duration;
+import java.util.Map;
+
+import io.vertx.core.Future;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.ext.web.client.HttpResponse;
+
+import lombok.extern.slf4j.Slf4j;
+
+import dev.failsafe.Failsafe;
+import dev.failsafe.RetryPolicy;
+
+/**
+ * McpSinkHandlerRetryWrapper is a wrapper class for the McpSinkHandler that provides retry functionality for failed Mcp requests.
+ */
+@Slf4j
+public class McpSinkHandlerRetryWrapper extends AbstractMcpSinkHandler {
+
+ private final McpRetryConfig mcpRetryConfig;
+
+ private final McpSinkHandler sinkHandler;
+
+ private final RetryPolicy<HttpResponse<Buffer>> retryPolicy;
+
+ public McpSinkHandlerRetryWrapper(SinkConnectorConfig sinkConnectorConfig, McpSinkHandler sinkHandler) {
+ super(sinkConnectorConfig);
+ this.sinkHandler = sinkHandler;
+ this.mcpRetryConfig = getSinkConnectorConfig().getRetryConfig();
+ this.retryPolicy = buildRetryPolicy();
+ }
+
+ private RetryPolicy<HttpResponse<Buffer>> buildRetryPolicy() {
+ return RetryPolicy.<HttpResponse<Buffer>>builder()
+ .handleIf(e -> e instanceof ConnectException)
+ .handleResultIf(response -> mcpRetryConfig.isRetryOnNonSuccess() && !HttpUtils.is2xxSuccessful(response.statusCode()))
+ .withMaxRetries(mcpRetryConfig.getMaxRetries())
+ .withDelay(Duration.ofMillis(mcpRetryConfig.getInterval()))
+ .onRetry(event -> {
+ if (log.isDebugEnabled()) {
+ log.warn("Failed to deliver message after {} attempts. Retrying in {} ms. Error: {}",
+ event.getAttemptCount(), mcpRetryConfig.getInterval(), event.getLastException());
+ } else {
+ log.warn("Failed to deliver message after {} attempts. Retrying in {} ms.",
+ event.getAttemptCount(), mcpRetryConfig.getInterval());
+ }
+ }).onFailure(event -> {
+ if (log.isDebugEnabled()) {
+ log.error("Failed to deliver message after {} attempts. Error: {}",
+ event.getAttemptCount(), event.getException());
+ } else {
+ log.error("Failed to deliver message after {} attempts.",
+ event.getAttemptCount());
+ }
+ }).build();
+ }
+
+ /**
+ * Initializes the WebClient for making Mcp requests based on the provided SinkConnectorConfig.
+ */
+ @Override
+ public void start() {
+ sinkHandler.start();
+ }
+
+
+ /**
+ * Processes McpConnectRecord on specified URL while returning its own processing logic This method provides the retry power to process the
+ * McpConnectRecord
+ *
+ * @param url URI to which the McpConnectRecord should be sent
+ * @param mcpConnectRecord McpConnectRecord to process
+ * @param attributes additional attributes to pass to the processing chain
+ * @return processing chain
+ */
+ @Override
+ public Future<HttpResponse<Buffer>> deliver(URI url, McpConnectRecord mcpConnectRecord, Map<String, Object> attributes,
+ ConnectRecord connectRecord) {
+ Failsafe.with(retryPolicy)
+ .getStageAsync(() -> sinkHandler.deliver(url, mcpConnectRecord, attributes, connectRecord).toCompletionStage());
+ return null;
+ }
+
+
+ /**
+ * Cleans up and releases resources used by the Mcp handler.
+ */
+ @Override
+ public void stop() {
+ sinkHandler.stop();
+ }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/McpSourceConnector.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/McpSourceConnector.java
new file mode 100644
index 0000000..e5fe258
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/McpSourceConnector.java
@@ -0,0 +1,659 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.source;
+
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.CACHE_CONTROL_NO_CACHE;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.CONNECTION_KEEP_ALIVE;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.CONTENT_TYPE_JSON;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.CONTENT_TYPE_JSON_PLAIN;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.CONTENT_TYPE_SSE;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.CORS_ALLOWED_HEADERS;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.CORS_ALLOWED_METHODS;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.CORS_ALLOW_ALL;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.CORS_EXPOSED_HEADERS;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.DEFAULT_CONNECTOR_NAME;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.DEFAULT_HEARTBEAT_INTERVAL_MS;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.DEFAULT_IDLE_TIMEOUT_MS;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.DEFAULT_NO_MESSAGE;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.DEFAULT_PROTOCOL_VERSION;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.DEFAULT_SERVER_NAME;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.DEFAULT_SERVER_VERSION;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.ENDPOINT_HEALTH;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.ERROR_INTERNAL;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.ERROR_INVALID_PARAMS;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.ERROR_METHOD_NOT_FOUND;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.HEADER_ACCEPT;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.HEADER_CACHE_CONTROL;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.HEADER_CONNECTION;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.HEADER_CONTENT_TYPE;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.HEADER_CORS_ALLOW_HEADERS;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.HEADER_CORS_ALLOW_METHODS;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.HEADER_CORS_ALLOW_ORIGIN;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.HEADER_CORS_EXPOSE_HEADERS;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.HEADER_X_ACCEL_BUFFERING;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.HTTP_METHOD_OPTIONS;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.HTTP_STATUS_INTERNAL_ERROR;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.HTTP_STATUS_NO_CONTENT;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_CAPABILITIES;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_CONNECTOR;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_CONTENT;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_DESCRIPTION;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_ERROR;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_ERROR_CODE;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_ERROR_MESSAGE;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_ID;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_JSONRPC;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_METHOD;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_NAME;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_PARAMS;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_PROPERTIES;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_PROTOCOL_VERSION;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_REQUIRED;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_RESULT;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_SERVER_INFO;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_STATUS;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_TEXT;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_TOOLS;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_TYPE;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.KEY_VERSION;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.MAX_POLL_WAIT_TIME_MS;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.METHOD_INITIALIZE;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.METHOD_NOTIFICATIONS_INITIALIZED;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.METHOD_PING;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.METHOD_TOOLS_CALL;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.METHOD_TOOLS_LIST;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.PARAM_DESC_MESSAGE;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.PARAM_DESC_MESSAGE_CONTENT;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.PARAM_DESC_TOPIC;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.PARAM_MESSAGE;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.PARAM_TOPIC;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.SSE_DATA_OPEN;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.SSE_EVENT_OPEN;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.SSE_HEARTBEAT;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.VALUE_JSONRPC_VERSION;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.VALUE_STATUS_UP;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.VALUE_TYPE_OBJECT;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.VALUE_TYPE_STRING;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.VALUE_TYPE_TEXT;
+import static org.apache.eventmesh.connector.mcp.source.McpSourceConstants.X_ACCEL_BUFFERING_NO;
+
+import org.apache.eventmesh.common.config.connector.Config;
+import org.apache.eventmesh.common.config.connector.mcp.McpSourceConfig;
+import org.apache.eventmesh.common.exception.EventMeshException;
+import org.apache.eventmesh.connector.mcp.source.protocol.Protocol;
+import org.apache.eventmesh.connector.mcp.source.protocol.ProtocolFactory;
+import org.apache.eventmesh.openconnect.api.ConnectorCreateService;
+import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
+import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
+import org.apache.eventmesh.openconnect.api.source.Source;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpServer;
+import io.vertx.core.http.HttpServerOptions;
+import io.vertx.core.json.JsonArray;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.web.Route;
+import io.vertx.ext.web.Router;
+import io.vertx.ext.web.RoutingContext;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.handler.BodyHandler;
+import io.vertx.ext.web.handler.LoggerHandler;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * MCP Source Connector for EventMesh Implements MCP protocol server allowing AI clients to interact with EventMesh via MCP protocol
+ */
+@Slf4j
+public class McpSourceConnector implements Source, ConnectorCreateService<Source> {
+
+ private McpSourceConfig sourceConfig;
+
+ private BlockingQueue<Object> queue;
+
+ private int batchSize;
+
+ private String forwardPath;
+
+ private Route route;
+
+ private Protocol protocol;
+
+ private HttpServer server;
+
+ private Vertx vertx;
+
+ private WebClient webClient;
+
+ private McpToolRegistry toolRegistry;
+
+ @Getter
+ private volatile boolean started = false;
+
+ @Getter
+ private volatile boolean destroyed = false;
+
+ @Override
+ public Class<? extends Config> configClass() {
+ return McpSourceConfig.class;
+ }
+
+ @Override
+ public Source create() {
+ return new McpSourceConnector();
+ }
+
+ @Override
+ public void init(Config config) {
+ this.sourceConfig = (McpSourceConfig) config;
+ doInit();
+ }
+
+ @Override
+ public void init(ConnectorContext connectorContext) {
+ SourceConnectorContext sourceConnectorContext = (SourceConnectorContext) connectorContext;
+ this.sourceConfig = (McpSourceConfig) sourceConnectorContext.getSourceConfig();
+ doInit();
+ }
+
+ /**
+ * Initialize the connector
+ */
+ private void doInit() {
+ log.info("Initializing MCP Source Connector...");
+
+ // Initialize queue
+ int maxQueueSize = this.sourceConfig.getConnectorConfig().getMaxStorageSize();
+ this.queue = new LinkedBlockingQueue<>(maxQueueSize);
+
+ // Initialize batch size
+ this.batchSize = this.sourceConfig.getConnectorConfig().getBatchSize();
+
+ String protocolName = this.sourceConfig.getConnectorConfig().getProtocol();
+ this.protocol = ProtocolFactory.getInstance(this.sourceConfig.connectorConfig, protocolName);
+
+ // Initialize tool registry
+ this.toolRegistry = new McpToolRegistry();
+ registerDefaultTools();
+
+ // Initialize Vertx and router
+ this.vertx = Vertx.vertx();
+ final Router router = Router.router(vertx);
+ this.webClient = WebClient.create(vertx);
+
+ final String basePath = this.sourceConfig.connectorConfig.getPath();
+ this.forwardPath = this.sourceConfig.connectorConfig.getForwardPath();
+
+ // Configure CORS (must be before all routes)
+ router.route().handler(ctx -> {
+ ctx.response()
+ .putHeader(HEADER_CORS_ALLOW_ORIGIN, CORS_ALLOW_ALL)
+ .putHeader(HEADER_CORS_ALLOW_METHODS, CORS_ALLOWED_METHODS)
+ .putHeader(HEADER_CORS_ALLOW_HEADERS, CORS_ALLOWED_HEADERS)
+ .putHeader(HEADER_CORS_EXPOSE_HEADERS, CORS_EXPOSED_HEADERS);
+
+ if (HTTP_METHOD_OPTIONS.equals(ctx.request().method().name())) {
+ ctx.response().setStatusCode(HTTP_STATUS_NO_CONTENT).end();
+ } else {
+ ctx.next();
+ }
+ });
+
+ // Body handler
+ router.route().handler(BodyHandler.create());
+
+ // Main endpoint - handles both JSON-RPC and SSE requests
+ router.post(basePath)
+ .handler(LoggerHandler.create())
+ .handler(ctx -> {
+ String contentType = ctx.request().getHeader(HEADER_CONTENT_TYPE);
+ String accept = ctx.request().getHeader(HEADER_ACCEPT);
+
+ // Determine if it's an SSE request or JSON-RPC request
+ if (CONTENT_TYPE_SSE.startsWith(accept != null ? accept : "")) {
+ handleSseRequest(ctx);
+ } else {
+ handleJsonRpcRequest(ctx);
+ }
+ });
+
+ // GET request for SSE support
+ router.get(basePath)
+ .handler(this::handleSseRequest);
+
+ // Health check endpoint
+ router.get(basePath + ENDPOINT_HEALTH).handler(ctx -> {
+ JsonObject health = new JsonObject()
+ .put(KEY_STATUS, VALUE_STATUS_UP)
+ .put(KEY_CONNECTOR, DEFAULT_CONNECTOR_NAME)
+ .put(KEY_TOOLS, toolRegistry.getToolCount());
+ ctx.response()
+ .putHeader(HEADER_CONTENT_TYPE, CONTENT_TYPE_JSON)
+ .end(health.encode());
+ });
+
+ Route forwardRoute = router.route().path(forwardPath).handler(LoggerHandler.create());
+
+ this.route = router.route()
+ .path(this.sourceConfig.connectorConfig.getPath())
+ .handler(LoggerHandler.create());
+
+ // set protocol handler
+ this.protocol.setHandler(route, queue);
+ this.protocol.setHandler(forwardRoute, queue);
+
+ // Create server
+ this.server = vertx.createHttpServer(new HttpServerOptions()
+ .setPort(this.sourceConfig.connectorConfig.getPort())
+ .setHandle100ContinueAutomatically(true)
+ .setIdleTimeout(DEFAULT_IDLE_TIMEOUT_MS)
+ .setIdleTimeoutUnit(TimeUnit.MILLISECONDS))
+ .requestHandler(router);
+
+ log.info("MCP Source Connector initialized on http://127.0.0.1:{}{}",
+ this.sourceConfig.connectorConfig.getPort(), basePath);
+ }
+
+ /**
+ * Register default MCP tools
+ */
+ private void registerDefaultTools() {
+ // Echo tool
+ toolRegistry.registerTool(
+ "echo",
+ "Echo back the input message",
+ createEchoSchema(),
+ args -> {
+ String message = args.getString(PARAM_MESSAGE, DEFAULT_NO_MESSAGE);
+ return createTextContent("Echo: " + message);
+ }
+ );
+
+ // EventMesh message sending tool
+ toolRegistry.registerTool(
+ "sendEventMeshMessage",
+ "Send a message to EventMesh",
+ createSendMessageSchema(),
+ args -> {
+ String topic = args.getString(PARAM_TOPIC);
+ Object message = args.getString(PARAM_MESSAGE);
+
+ webClient.post(this.sourceConfig.connectorConfig.getPort(), "127.0.0.1", this.forwardPath)
+ .putHeader(CORS_EXPOSED_HEADERS, CONTENT_TYPE_JSON_PLAIN)
+ .sendBuffer(Buffer.buffer(
+ new JsonObject()
+ .put("type", "mcp.tools.call")
+ .put("tool", "sendEventMeshMessage")
+ .put("arguments", new JsonObject().put("message", message).put("topic", topic))
+ .encode()
+ ), ar -> {
+ if (ar.succeeded()) {
+ log.info("forwarded tools/call to {} OK, status={}", forwardPath, ar.result().statusCode());
+ } else {
+ log.warn("forward tools/call failed: {}", ar.cause().toString());
+ }
+ });
+
+ return createTextContent(
+ String.format("Message sent to topic '%s': %s", topic, message)
+ );
+ }
+ );
+
+ log.info("Registered {} MCP tools", toolRegistry.getToolCount());
+ }
+
+ /**
+ * Handle JSON-RPC request (HTTP mode)
+ *
+ * @param ctx Routing context
+ */
+ private void handleJsonRpcRequest(RoutingContext ctx) {
+ String body = ctx.body().asString();
+
+ try {
+ JsonObject request = new JsonObject(body);
+ JsonObject response = handleMcpRequest(request);
+
+ if (response != null) {
+ ctx.response()
+ .putHeader(HEADER_CONTENT_TYPE, CONTENT_TYPE_JSON)
+ .end(response.encode());
+ } else {
+ // Notification messages don't need response
+ ctx.response().setStatusCode(HTTP_STATUS_NO_CONTENT).end();
+ }
+
+ } catch (Exception e) {
+ JsonObject error = createErrorResponse(null, ERROR_INTERNAL,
+ "Internal error: " + e.getMessage());
+ ctx.response()
+ .putHeader(HEADER_CONTENT_TYPE, CONTENT_TYPE_JSON)
+ .setStatusCode(HTTP_STATUS_INTERNAL_ERROR)
+ .end(error.encode());
+ }
+ }
+
+ /**
+ * Handle SSE request (Server-Sent Events mode)
+ *
+ * @param ctx Routing context
+ */
+ private void handleSseRequest(RoutingContext ctx) {
+ ctx.response()
+ .putHeader(HEADER_CONTENT_TYPE, CONTENT_TYPE_SSE)
+ .putHeader(HEADER_CACHE_CONTROL, CACHE_CONTROL_NO_CACHE)
+ .putHeader(HEADER_CONNECTION, CONNECTION_KEEP_ALIVE)
+ .putHeader(HEADER_X_ACCEL_BUFFERING, X_ACCEL_BUFFERING_NO)
+ .setChunked(true);
+
+ // Send connection established event
+ ctx.response().write(SSE_EVENT_OPEN);
+ ctx.response().write(SSE_DATA_OPEN);
+
+ // Heartbeat (optional)
+ long timerId = vertx.setPeriodic(DEFAULT_HEARTBEAT_INTERVAL_MS, id -> {
+ if (!ctx.response().closed()) {
+ ctx.response().write(SSE_HEARTBEAT);
+ } else {
+ vertx.cancelTimer(id);
+ }
+ });
+
+ ctx.request().connection().closeHandler(v -> {
+ vertx.cancelTimer(timerId);
+ });
+ }
+
+ /**
+ * Handle MCP JSON-RPC request
+ *
+ * @param request JSON-RPC request object
+ * @return JSON-RPC response object, or null for notifications
+ */
+ private JsonObject handleMcpRequest(JsonObject request) {
+ String method = request.getString(KEY_METHOD, "");
+ Object id = request.getValue(KEY_ID);
+ JsonObject params = request.getJsonObject(KEY_PARAMS);
+
+ switch (method) {
+ case METHOD_INITIALIZE:
+ return handleInitialize(id, params);
+ case METHOD_NOTIFICATIONS_INITIALIZED:
+ return null; // Notifications don't need response
+ case METHOD_TOOLS_LIST:
+ return handleToolsList(id);
+ case METHOD_TOOLS_CALL:
+ return handleToolsCall(id, params);
+ case METHOD_PING:
+ return createSuccessResponse(id, new JsonObject());
+ default:
+ return createErrorResponse(id, ERROR_METHOD_NOT_FOUND,
+ "Method not found: " + method);
+ }
+ }
+
+ /**
+ * Handle initialize method
+ *
+ * @param id Request ID
+ * @param params Request parameters
+ * @return Initialize response
+ */
+ private JsonObject handleInitialize(Object id, JsonObject params) {
+ String clientVersion = params != null
+ ? params.getString(KEY_PROTOCOL_VERSION, DEFAULT_PROTOCOL_VERSION)
+ : DEFAULT_PROTOCOL_VERSION;
+
+ JsonObject result = new JsonObject()
+ .put(KEY_PROTOCOL_VERSION, clientVersion)
+ .put(KEY_SERVER_INFO, new JsonObject()
+ .put(KEY_NAME, DEFAULT_SERVER_NAME)
+ .put(KEY_VERSION, DEFAULT_SERVER_VERSION))
+ .put(KEY_CAPABILITIES, new JsonObject()
+ .put(KEY_TOOLS, new JsonObject()));
+
+ return createSuccessResponse(id, result);
+ }
+
+ /**
+ * Handle tools/list method
+ *
+ * @param id Request ID
+ * @return Tools list response
+ */
+ private JsonObject handleToolsList(Object id) {
+ JsonArray tools = toolRegistry.getToolsArray();
+ JsonObject result = new JsonObject().put(KEY_TOOLS, tools);
+ return createSuccessResponse(id, result);
+ }
+
+ /**
+ * Handle tools/call method
+ *
+ * @param id Request ID
+ * @param params Tool call parameters
+ * @return Tool execution result
+ */
+ private JsonObject handleToolsCall(Object id, JsonObject params) {
+ if (params == null) {
+ return createErrorResponse(id, ERROR_INVALID_PARAMS, "Invalid params");
+ }
+
+ String toolName = params.getString(KEY_NAME);
+ JsonObject arguments = params.getJsonObject("arguments", new JsonObject());
+
+ log.info("Calling tool: {} with arguments: {}", toolName, arguments);
+
+ try {
+ JsonObject content = toolRegistry.executeTool(toolName, arguments);
+ JsonObject result = new JsonObject()
+ .put(KEY_CONTENT, new JsonArray().add(content));
+
+ return createSuccessResponse(id, result);
+
+ } catch (IllegalArgumentException e) {
+ return createErrorResponse(id, ERROR_INVALID_PARAMS, e.getMessage());
+ } catch (Exception e) {
+ log.error("Tool execution error", e);
+ return createErrorResponse(id, ERROR_INTERNAL,
+ "Tool execution failed: " + e.getMessage());
+ }
+ }
+
+ // ========== JSON-RPC Response Builders ==========
+
+ /**
+ * Create a success response
+ *
+ * @param id Request ID
+ * @param result Result object
+ * @return JSON-RPC success response
+ */
+ private JsonObject createSuccessResponse(Object id, JsonObject result) {
+ return new JsonObject()
+ .put(KEY_JSONRPC, VALUE_JSONRPC_VERSION)
+ .put(KEY_ID, id)
+ .put(KEY_RESULT, result);
+ }
+
+ /**
+ * Create an error response
+ *
+ * @param id Request ID
+ * @param code Error code
+ * @param message Error message
+ * @return JSON-RPC error response
+ */
+ private JsonObject createErrorResponse(Object id, int code, String message) {
+ return new JsonObject()
+ .put(KEY_JSONRPC, VALUE_JSONRPC_VERSION)
+ .put(KEY_ID, id)
+ .put(KEY_ERROR, new JsonObject()
+ .put(KEY_ERROR_CODE, code)
+ .put(KEY_ERROR_MESSAGE, message));
+ }
+
+ // ========== Schema Creation Helpers ==========
+
+ /**
+ * Create JSON schema for echo tool
+ *
+ * @return Echo tool input schema
+ */
+ private JsonObject createEchoSchema() {
+ return new JsonObject()
+ .put(KEY_TYPE, VALUE_TYPE_OBJECT)
+ .put(KEY_PROPERTIES, new JsonObject()
+ .put(PARAM_MESSAGE, new JsonObject()
+ .put(KEY_TYPE, VALUE_TYPE_STRING)
+ .put(KEY_DESCRIPTION, PARAM_DESC_MESSAGE)))
+ .put(KEY_REQUIRED, new JsonArray().add(PARAM_MESSAGE));
+ }
+
+ /**
+ * Create JSON schema for send message tool
+ *
+ * @return Send message tool input schema
+ */
+ private JsonObject createSendMessageSchema() {
+ return new JsonObject()
+ .put(KEY_TYPE, VALUE_TYPE_OBJECT)
+ .put(KEY_PROPERTIES, new JsonObject()
+ .put(PARAM_TOPIC, new JsonObject()
+ .put(KEY_TYPE, VALUE_TYPE_STRING)
+ .put(KEY_DESCRIPTION, PARAM_DESC_TOPIC))
+ .put(PARAM_MESSAGE, new JsonObject()
+ .put(KEY_TYPE, VALUE_TYPE_STRING)
+ .put(KEY_DESCRIPTION, PARAM_DESC_MESSAGE_CONTENT)))
+ .put(KEY_REQUIRED, new JsonArray().add(PARAM_TOPIC).add(PARAM_MESSAGE));
+ }
+
+ /**
+ * Create text content object
+ *
+ * @param text Text content
+ * @return MCP text content object
+ */
+ private JsonObject createTextContent(String text) {
+ return new JsonObject()
+ .put(KEY_TYPE, VALUE_TYPE_TEXT)
+ .put(KEY_TEXT, text);
+ }
+
+ // ========== Source Interface Implementation ==========
+
+ @Override
+ public void start() {
+ this.server.listen(res -> {
+ if (res.succeeded()) {
+ this.started = true;
+ log.info("McpSourceConnector started on port: {}",
+ this.sourceConfig.getConnectorConfig().getPort());
+ log.info("MCP endpoints available at:");
+ log.info(" - POST {} (JSON-RPC)", this.sourceConfig.connectorConfig.getPath());
+ log.info(" - GET {} (SSE)", this.sourceConfig.connectorConfig.getPath());
+ log.info(" - GET {}{} (Health check)",
+ this.sourceConfig.connectorConfig.getPath(), ENDPOINT_HEALTH);
+ } else {
+ log.error("McpSourceConnector failed to start on port: {}",
+ this.sourceConfig.getConnectorConfig().getPort());
+ throw new EventMeshException("failed to start Vertx server", res.cause());
+ }
+ });
+ }
+
+ @Override
+ public void commit(ConnectRecord record) {
+ if (sourceConfig.getConnectorConfig().isDataConsistencyEnabled()) {
+ log.debug("McpSourceConnector commit record: {}", record.getRecordId());
+ // MCP protocol processing doesn't require additional commit logic
+ }
+ }
+
+ @Override
+ public String name() {
+ return this.sourceConfig.getConnectorConfig().getConnectorName();
+ }
+
+ @Override
+ public void onException(ConnectRecord record) {
+ log.error("Exception occurred for record: {}", record.getRecordId());
+ // MCP errors are already handled via JSON-RPC error responses
+ }
+
+ @Override
+ public void stop() {
+ if (this.server != null) {
+ this.server.close(res -> {
+ if (res.succeeded()) {
+ this.destroyed = true;
+ log.info("McpSourceConnector stopped on port: {}",
+ this.sourceConfig.getConnectorConfig().getPort());
+ } else {
+ log.error("McpSourceConnector failed to stop on port: {}",
+ this.sourceConfig.getConnectorConfig().getPort());
+ throw new EventMeshException("failed to stop Vertx server", res.cause());
+ }
+ });
+ } else {
+ log.warn("McpSourceConnector server is null, ignore.");
+ }
+
+ if (this.vertx != null) {
+ this.vertx.close();
+ }
+ }
+
+ @Override
+ public List<ConnectRecord> poll() {
+ long startTime = System.currentTimeMillis();
+ long remainingTime = MAX_POLL_WAIT_TIME_MS;
+
+ List<ConnectRecord> connectRecords = new ArrayList<>(batchSize);
+ for (int i = 0; i < batchSize; i++) {
+ try {
+ Object obj = queue.poll(remainingTime, TimeUnit.MILLISECONDS);
+ if (obj == null) {
+ break;
+ }
+
+ // Convert MCP tool calls to ConnectRecord
+ ConnectRecord connectRecord = protocol.convertToConnectRecord(obj);
+ connectRecords.add(connectRecord);
+
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ remainingTime = MAX_POLL_WAIT_TIME_MS > elapsedTime
+ ? MAX_POLL_WAIT_TIME_MS - elapsedTime : 0;
+ } catch (Exception e) {
+ log.error("Failed to poll from queue.", e);
+ throw new RuntimeException(e);
+ }
+ }
+ return connectRecords;
+ }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/McpSourceConstants.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/McpSourceConstants.java
new file mode 100644
index 0000000..423cbbc
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/McpSourceConstants.java
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.source;
+
+/**
+ * Constants for MCP Source Connector
+ */
+public final class McpSourceConstants {
+
+ private McpSourceConstants() {
+ // Utility class, no instantiation
+ }
+
+ // ========== Server Configuration ==========
+
+ /**
+ * Default connector name
+ */
+ public static final String DEFAULT_CONNECTOR_NAME = "mcp-source";
+
+ /**
+ * Default server name for MCP protocol
+ */
+ public static final String DEFAULT_SERVER_NAME = "eventmesh-mcp-connector";
+
+ /**
+ * Default server version
+ */
+ public static final String DEFAULT_SERVER_VERSION = "1.0.0";
+
+ /**
+ * Default MCP protocol version
+ */
+ public static final String DEFAULT_PROTOCOL_VERSION = "2024-11-05";
+
+ /**
+ * Default idle timeout in milliseconds (60 seconds)
+ */
+ public static final int DEFAULT_IDLE_TIMEOUT_MS = 60000;
+
+ /**
+ * Default heartbeat interval in milliseconds (30 seconds)
+ */
+ public static final long DEFAULT_HEARTBEAT_INTERVAL_MS = 30000;
+
+ /**
+ * Maximum poll wait time in milliseconds (5 seconds)
+ */
+ public static final long MAX_POLL_WAIT_TIME_MS = 5000;
+
+ // ========== HTTP Headers ==========
+
+ /**
+ * Content-Type header name
+ */
+ public static final String HEADER_CONTENT_TYPE = "Content-Type";
+
+ /**
+ * Accept header name
+ */
+ public static final String HEADER_ACCEPT = "Accept";
+
+ /**
+ * Access-Control-Allow-Origin header
+ */
+ public static final String HEADER_CORS_ALLOW_ORIGIN = "Access-Control-Allow-Origin";
+
+ /**
+ * Access-Control-Allow-Methods header
+ */
+ public static final String HEADER_CORS_ALLOW_METHODS = "Access-Control-Allow-Methods";
+
+ /**
+ * Access-Control-Allow-Headers header
+ */
+ public static final String HEADER_CORS_ALLOW_HEADERS = "Access-Control-Allow-Headers";
+
+ /**
+ * Access-Control-Expose-Headers header
+ */
+ public static final String HEADER_CORS_EXPOSE_HEADERS = "Access-Control-Expose-Headers";
+
+ /**
+ * Cache-Control header
+ */
+ public static final String HEADER_CACHE_CONTROL = "Cache-Control";
+
+ /**
+ * Connection header
+ */
+ public static final String HEADER_CONNECTION = "Connection";
+
+ /**
+ * X-Accel-Buffering header
+ */
+ public static final String HEADER_X_ACCEL_BUFFERING = "X-Accel-Buffering";
+
+ // ========== CORS Values ==========
+
+ /**
+ * CORS allow all origins
+ */
+ public static final String CORS_ALLOW_ALL = "*";
+
+ /**
+ * CORS allowed methods
+ */
+ public static final String CORS_ALLOWED_METHODS = "GET, POST, OPTIONS";
+
+ /**
+ * CORS allowed headers
+ */
+ public static final String CORS_ALLOWED_HEADERS = "Content-Type, Authorization, Accept";
+
+ /**
+ * CORS exposed headers
+ */
+ public static final String CORS_EXPOSED_HEADERS = "Content-Type";
+
+ // ========== Content Types ==========
+
+ /**
+ * JSON content type with UTF-8 charset
+ */
+ public static final String CONTENT_TYPE_JSON = "application/json; charset=utf-8";
+
+ /**
+ * Server-Sent Events content type
+ */
+ public static final String CONTENT_TYPE_SSE = "text/event-stream; charset=utf-8";
+
+ /**
+ * Plain JSON content type (for matching)
+ */
+ public static final String CONTENT_TYPE_JSON_PLAIN = "application/json";
+
+ // ========== HTTP Status Codes ==========
+
+ /**
+ * HTTP 204 No Content
+ */
+ public static final int HTTP_STATUS_NO_CONTENT = 204;
+
+ /**
+ * HTTP 500 Internal Server Error
+ */
+ public static final int HTTP_STATUS_INTERNAL_ERROR = 500;
+
+ // ========== JSON-RPC Methods ==========
+
+ /**
+ * Initialize method
+ */
+ public static final String METHOD_INITIALIZE = "initialize";
+
+ /**
+ * Notifications initialized method
+ */
+ public static final String METHOD_NOTIFICATIONS_INITIALIZED = "notifications/initialized";
+
+ /**
+ * Tools list method
+ */
+ public static final String METHOD_TOOLS_LIST = "tools/list";
+
+ /**
+ * Tools call method
+ */
+ public static final String METHOD_TOOLS_CALL = "tools/call";
+
+ /**
+ * Ping method
+ */
+ public static final String METHOD_PING = "ping";
+
+ // ========== JSON-RPC Error Codes ==========
+
+ /**
+ * Invalid params error code
+ */
+ public static final int ERROR_INVALID_PARAMS = -32602;
+
+ /**
+ * Method not found error code
+ */
+ public static final int ERROR_METHOD_NOT_FOUND = -32601;
+
+ /**
+ * Internal error code
+ */
+ public static final int ERROR_INTERNAL = -32603;
+
+ // ========== JSON Keys ==========
+
+ /**
+ * JSON-RPC version key
+ */
+ public static final String KEY_JSONRPC = "jsonrpc";
+
+ /**
+ * JSON-RPC version value
+ */
+ public static final String VALUE_JSONRPC_VERSION = "2.0";
+
+ /**
+ * Method key
+ */
+ public static final String KEY_METHOD = "method";
+
+ /**
+ * ID key
+ */
+ public static final String KEY_ID = "id";
+
+ /**
+ * Params key
+ */
+ public static final String KEY_PARAMS = "params";
+
+ /**
+ * Result key
+ */
+ public static final String KEY_RESULT = "result";
+
+ /**
+ * Error key
+ */
+ public static final String KEY_ERROR = "error";
+
+ /**
+ * Error code key
+ */
+ public static final String KEY_ERROR_CODE = "code";
+
+ /**
+ * Error message key
+ */
+ public static final String KEY_ERROR_MESSAGE = "message";
+
+ /**
+ * Protocol version key
+ */
+ public static final String KEY_PROTOCOL_VERSION = "protocolVersion";
+
+ /**
+ * Server info key
+ */
+ public static final String KEY_SERVER_INFO = "serverInfo";
+
+ /**
+ * Capabilities key
+ */
+ public static final String KEY_CAPABILITIES = "capabilities";
+
+ /**
+ * Tools key
+ */
+ public static final String KEY_TOOLS = "tools";
+
+ /**
+ * Name key
+ */
+ public static final String KEY_NAME = "name";
+
+ /**
+ * Version key
+ */
+ public static final String KEY_VERSION = "version";
+
+ /**
+ * Content key
+ */
+ public static final String KEY_CONTENT = "content";
+
+ /**
+ * Type key
+ */
+ public static final String KEY_TYPE = "type";
+
+ /**
+ * Text key
+ */
+ public static final String KEY_TEXT = "text";
+
+ /**
+ * Description key
+ */
+ public static final String KEY_DESCRIPTION = "description";
+
+ /**
+ * Input schema key
+ */
+ public static final String KEY_INPUT_SCHEMA = "inputSchema";
+
+ /**
+ * Properties key
+ */
+ public static final String KEY_PROPERTIES = "properties";
+
+ /**
+ * Required key
+ */
+ public static final String KEY_REQUIRED = "required";
+
+ /**
+ * Status key
+ */
+ public static final String KEY_STATUS = "status";
+
+ /**
+ * Connector key
+ */
+ public static final String KEY_CONNECTOR = "connector";
+
+ // ========== JSON Values ==========
+
+ /**
+ * Object type value
+ */
+ public static final String VALUE_TYPE_OBJECT = "object";
+
+ /**
+ * String type value
+ */
+ public static final String VALUE_TYPE_STRING = "string";
+
+ /**
+ * Text type value
+ */
+ public static final String VALUE_TYPE_TEXT = "text";
+
+ /**
+ * Status UP value
+ */
+ public static final String VALUE_STATUS_UP = "UP";
+
+ // ========== HTTP Methods ==========
+
+ /**
+ * OPTIONS HTTP method
+ */
+ public static final String HTTP_METHOD_OPTIONS = "OPTIONS";
+
+ // ========== SSE Events ==========
+
+ /**
+ * SSE event: open
+ */
+ public static final String SSE_EVENT_OPEN = "event: open\n";
+
+ /**
+ * SSE data for open event
+ */
+ public static final String SSE_DATA_OPEN = "data: {\"type\":\"open\"}\n\n";
+
+ /**
+ * SSE heartbeat comment
+ */
+ public static final String SSE_HEARTBEAT = ": heartbeat\n\n";
+
+ /**
+ * Cache control: no-cache
+ */
+ public static final String CACHE_CONTROL_NO_CACHE = "no-cache";
+
+ /**
+ * Connection: keep-alive
+ */
+ public static final String CONNECTION_KEEP_ALIVE = "keep-alive";
+
+ /**
+ * X-Accel-Buffering: no
+ */
+ public static final String X_ACCEL_BUFFERING_NO = "no";
+
+ // ========== Tool Parameter Names ==========
+
+ /**
+ * Message parameter name
+ */
+ public static final String PARAM_MESSAGE = "message";
+
+ /**
+ * Topic parameter name
+ */
+ public static final String PARAM_TOPIC = "topic";
+
+ // ========== Tool Parameter Descriptions ==========
+
+ /**
+ * Message parameter description
+ */
+ public static final String PARAM_DESC_MESSAGE = "Message to echo";
+
+ /**
+ * Topic parameter description
+ */
+ public static final String PARAM_DESC_TOPIC = "EventMesh topic";
+
+ /**
+ * Message content parameter description
+ */
+ public static final String PARAM_DESC_MESSAGE_CONTENT = "Message content";
+
+ // ========== Default Values ==========
+
+ /**
+ * Default message when no message provided
+ */
+ public static final String DEFAULT_NO_MESSAGE = "No message";
+
+ // ========== Endpoint Paths ==========
+
+ /**
+ * Health check endpoint suffix
+ */
+ public static final String ENDPOINT_HEALTH = "/health";
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/McpToolRegistry.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/McpToolRegistry.java
new file mode 100644
index 0000000..a59d93f
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/McpToolRegistry.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.source;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import io.vertx.core.json.JsonArray;
+import io.vertx.core.json.JsonObject;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * MCP Tool Registry
+ * Manages all available MCP tools
+ */
+@Slf4j
+public class McpToolRegistry {
+
+ private final Map<String, McpTool> tools = new ConcurrentHashMap<>();
+
+ /**
+ * Register an MCP tool
+ * @param name Tool name
+ * @param description Tool description
+ * @param inputSchema JSON schema for tool input parameters
+ * @param executor Tool execution logic
+ */
+ public void registerTool(String name, String description,
+ JsonObject inputSchema, ToolExecutor executor) {
+ McpTool tool = new McpTool(name, description, inputSchema, executor);
+ tools.put(name, tool);
+ log.info("Registered MCP tool: {}", name);
+ }
+
+ /**
+ * Execute a specified tool
+ * @param name Tool name
+ * @param arguments Tool arguments as JSON object
+ * @return Tool execution result as MCP content object
+ * @throws IllegalArgumentException if tool not found
+ * @throws RuntimeException if tool execution fails
+ */
+ public JsonObject executeTool(String name, JsonObject arguments) {
+ McpTool tool = tools.get(name);
+ if (tool == null) {
+ throw new IllegalArgumentException("Unknown tool: " + name);
+ }
+
+ try {
+ return tool.executor.execute(arguments);
+ } catch (Exception e) {
+ log.error("Tool execution failed: {}", name, e);
+ throw new RuntimeException("Tool execution failed: " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Get all tools as a JSON array
+ * @return JSON array containing all registered tools with their metadata
+ */
+ public JsonArray getToolsArray() {
+ JsonArray array = new JsonArray();
+ for (McpTool tool : tools.values()) {
+ JsonObject toolObj = new JsonObject()
+ .put("name", tool.name)
+ .put("description", tool.description)
+ .put("inputSchema", tool.inputSchema);
+ array.add(toolObj);
+ }
+ return array;
+ }
+
+ /**
+ * Get the number of registered tools
+ * @return Total count of registered tools
+ */
+ public int getToolCount() {
+ return tools.size();
+ }
+
+ /**
+ * Check if a tool exists
+ * @param name Tool name to check
+ * @return true if tool is registered, false otherwise
+ */
+ public boolean hasTool(String name) {
+ return tools.containsKey(name);
+ }
+
+ /**
+ * Tool Executor Interface
+ * Functional interface for defining tool execution logic
+ */
+ @FunctionalInterface
+ public interface ToolExecutor {
+ /**
+ * Execute tool logic
+ * @param arguments Tool input arguments as JSON object
+ * @return MCP content object (must contain 'type' and 'text' fields)
+ * @throws Exception if execution fails
+ */
+ JsonObject execute(JsonObject arguments) throws Exception;
+ }
+
+ /**
+ * MCP Tool Definition
+ * Internal class representing a registered MCP tool
+ */
+ private static class McpTool {
+ final String name;
+ final String description;
+ final JsonObject inputSchema;
+ final ToolExecutor executor;
+
+ McpTool(String name, String description, JsonObject inputSchema, ToolExecutor executor) {
+ this.name = name;
+ this.description = description;
+ this.inputSchema = inputSchema;
+ this.executor = executor;
+ }
+ }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/data/McpRequest.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/data/McpRequest.java
new file mode 100644
index 0000000..38b9be8
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/data/McpRequest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.source.data;
+
+import java.io.Serializable;
+
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.web.RoutingContext;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * MCP Protocol Request
+ * Represents a request in the MCP (Model Context Protocol) format
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class McpRequest implements Serializable {
+
+ private static final long serialVersionUID = -483500600756490500L;
+
+ /**
+ * Protocol name
+ */
+ private String protocolName;
+
+ /**
+ * Session ID for tracking the request
+ */
+ private String sessionId;
+
+ /**
+ * MCP method name
+ */
+ private String method;
+
+ /**
+ * Tool name
+ */
+ private String toolName;
+
+ /**
+ * Tool arguments
+ */
+ private JsonObject arguments;
+
+ /**
+ * Tool execution result
+ */
+ private JsonObject result;
+
+ /**
+ * Request timestamp
+ */
+ private long timestamp;
+
+ /**
+ * Whether the tool execution succeeded
+ */
+ private boolean success;
+
+ /**
+ * Error message if execution failed
+ */
+ private String errorMessage;
+
+ /**
+ * Vert.x routing context for HTTP response handling
+ */
+ private transient RoutingContext routingContext;
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/data/McpResponse.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/data/McpResponse.java
new file mode 100644
index 0000000..93e1cbc
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/data/McpResponse.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.source.data;
+
+import java.io.Serializable;
+import java.time.LocalDateTime;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONWriter.Feature;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * MCP Response
+ * Represents a response message for MCP protocol operations
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class McpResponse implements Serializable {
+
+ private static final long serialVersionUID = 8616938575207104455L;
+
+ /**
+ * Response status: "success", "error", etc.
+ */
+ private String status;
+
+ /**
+ * Response message
+ */
+ private String msg;
+
+ /**
+ * Response timestamp
+ */
+ private LocalDateTime handleTime;
+
+ /**
+ * Additional error code for error responses
+ */
+ private Integer errorCode;
+
+ /**
+ * Additional data payload
+ */
+ private Object data;
+
+ /**
+ * Convert to JSON string
+ *
+ * @return JSON string representation
+ */
+ public String toJsonStr() {
+ return JSON.toJSONString(this, Feature.WriteMapNullValue);
+ }
+
+ /**
+ * Create a success response
+ *
+ * @return Success response
+ */
+ public static McpResponse success() {
+ return new McpResponse("success", "Operation completed successfully",
+ LocalDateTime.now(), null, null);
+ }
+
+ /**
+ * Create a success response with message
+ *
+ * @param msg Success message
+ * @return Success response
+ */
+ public static McpResponse success(String msg) {
+ return new McpResponse("success", msg, LocalDateTime.now(), null, null);
+ }
+
+ /**
+ * Create a success response with data
+ *
+ * @param msg Success message
+ * @param data Response data
+ * @return Success response with data
+ */
+ public static McpResponse success(String msg, Object data) {
+ return new McpResponse("success", msg, LocalDateTime.now(), null, data);
+ }
+
+ /**
+ * Create an error response
+ *
+ * @param msg Error message
+ * @return Error response
+ */
+ public static McpResponse error(String msg) {
+ return new McpResponse("error", msg, LocalDateTime.now(), null, null);
+ }
+
+ /**
+ * Create an error response with error code
+ *
+ * @param msg Error message
+ * @param errorCode Error code
+ * @return Error response with code
+ */
+ public static McpResponse error(String msg, Integer errorCode) {
+ return new McpResponse("error", msg, LocalDateTime.now(), errorCode, null);
+ }
+
+ /**
+ * Create a base response
+ *
+ * @param msg Message
+ * @return Base response
+ */
+ public static McpResponse base(String msg) {
+ return new McpResponse("info", msg, LocalDateTime.now(), null, null);
+ }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/protocol/Protocol.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/protocol/Protocol.java
new file mode 100644
index 0000000..f18397a
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/protocol/Protocol.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.source.protocol;
+
+import org.apache.eventmesh.common.config.connector.mcp.SourceConnectorConfig;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.util.concurrent.BlockingQueue;
+
+import io.vertx.ext.web.Route;
+
+/**
+ * Protocol Interface.
+ * All protocols should implement this interface.
+ */
+public interface Protocol {
+
+ /**
+ * Initialize the protocol.
+ *
+ * @param sourceConnectorConfig source connector config
+ */
+ void initialize(SourceConnectorConfig sourceConnectorConfig);
+
+
+ /**
+ * Handle the protocol message.
+ *
+ * @param route route
+ * @param queue queue info
+ */
+ void setHandler(Route route, BlockingQueue<Object> queue);
+
+
+ /**
+ * Convert the message to ConnectRecord.
+ *
+ * @param message message
+ * @return ConnectRecord
+ */
+ ConnectRecord convertToConnectRecord(Object message);
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/protocol/ProtocolFactory.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/protocol/ProtocolFactory.java
new file mode 100644
index 0000000..1c30d4e
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/protocol/ProtocolFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.source.protocol;
+
+import org.apache.eventmesh.common.config.connector.mcp.SourceConnectorConfig;
+import org.apache.eventmesh.connector.mcp.source.protocol.impl.McpStandardProtocol;
+
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * Protocol factory. This class is responsible for storing and creating instances of {@link Protocol} classes.
+ */
+public class ProtocolFactory {
+ // protocol name -> protocol class
+ private static final ConcurrentHashMap<String, Class<?>> protocols = new ConcurrentHashMap<>();
+
+ static {
+ // register all protocols
+ registerProtocol(McpStandardProtocol.PROTOCOL_NAME, McpStandardProtocol.class);
+ }
+
+
+ /**
+ * Register a protocol
+ *
+ * @param name name of the protocol
+ * @param clazz class of the protocol
+ */
+ public static void registerProtocol(String name, Class<?> clazz) {
+ if (Protocol.class.isAssignableFrom(clazz)) {
+ // put the class into the map(case insensitive)
+ protocols.put(name.toLowerCase(), clazz);
+ } else {
+ throw new IllegalArgumentException("Class " + clazz.getName() + " does not implement Protocol interface");
+ }
+ }
+
+ /**
+ * Get an instance of a protocol, if it is not already created, create a new instance
+ *
+ * @param name name of the protocol
+ * @return instance of the protocol
+ */
+ public static Protocol getInstance(SourceConnectorConfig sourceConnectorConfig, String name) {
+ // get the class by name(case insensitive)
+ Class<?> clazz = Optional.ofNullable(protocols.get(name.toLowerCase()))
+ .orElseThrow(() -> new IllegalArgumentException("Protocol " + name + " is not registered"));
+ try {
+ // create a new instance
+ Protocol protocol = (Protocol) clazz.newInstance();
+ // initialize the protocol
+ protocol.initialize(sourceConnectorConfig);
+ return protocol;
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new IllegalArgumentException("Failed to instantiate protocol " + name, e);
+ }
+ }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/protocol/impl/McpStandardProtocol.java b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/protocol/impl/McpStandardProtocol.java
new file mode 100644
index 0000000..737dae8
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/java/org/apache/eventmesh/connector/mcp/source/protocol/impl/McpStandardProtocol.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.mcp.source.protocol.impl;
+
+import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.common.config.connector.mcp.SourceConnectorConfig;
+import org.apache.eventmesh.connector.mcp.source.data.McpRequest;
+import org.apache.eventmesh.connector.mcp.source.data.McpResponse;
+import org.apache.eventmesh.connector.mcp.source.protocol.Protocol;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.util.Base64;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.web.Route;
+import io.vertx.ext.web.handler.BodyHandler;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * MCP Standard Protocol Implementation
+ * Handles MCP (Model Context Protocol) requests and converts them to EventMesh ConnectRecords
+ */
+@Slf4j
+public class McpStandardProtocol implements Protocol {
+
+ /**
+ * Protocol name constant
+ */
+ public static final String PROTOCOL_NAME = "MCP";
+
+ // Extension keys
+ private static final String EXTENSION_PROTOCOL = "protocol";
+ private static final String EXTENSION_SESSION_ID = "sessionid";
+ private static final String EXTENSION_TOOL_NAME = "toolname";
+ private static final String EXTENSION_METHOD = "method"; // ok
+ private static final String EXTENSION_REQUEST_ID = "requestid";
+ private static final String EXTENSION_SUCCESS = "success"; // ok
+ private static final String EXTENSION_ERROR_MESSAGE = "errormessage";
+ private static final String EXTENSION_ROUTING_CONTEXT = "routingcontext";
+ private static final String EXTENSION_IS_BASE64 = "isbase64";
+ private static final String METADATA_EXTENSION_KEY = "extension";
+
+ private SourceConnectorConfig sourceConnectorConfig;
+
+ /**
+ * Initialize the protocol
+ *
+ * @param sourceConnectorConfig Source connector configuration
+ */
+ @Override
+ public void initialize(SourceConnectorConfig sourceConnectorConfig) {
+ this.sourceConnectorConfig = sourceConnectorConfig;
+ log.info("Initialized MCP Standard Protocol");
+ }
+
+ /**
+ * Set the handler for the route
+ * This method is called when using the protocol in a generic HTTP connector context
+ *
+ * @param route Vert.x route to configure
+ * @param queue Queue for storing requests
+ */
+ @Override
+ public void setHandler(Route route, BlockingQueue<Object> queue) {
+ route.method(HttpMethod.POST)
+ .handler(BodyHandler.create())
+ .handler(ctx -> {
+ try {
+ // Parse the request body
+ String bodyString = ctx.body().asString(Constants.DEFAULT_CHARSET.toString());
+
+ // Try to parse as JSON
+ JsonObject requestJson;
+ try {
+ requestJson = new JsonObject(bodyString);
+ } catch (Exception e) {
+ log.error("Failed to parse request as JSON: {}", bodyString, e);
+ ctx.response()
+ .setStatusCode(HttpResponseStatus.BAD_REQUEST.code())
+ .putHeader("Content-Type", "application/json")
+ .end(McpResponse.error("Invalid JSON format").toJsonStr());
+ return;
+ }
+
+ // Extract JSON-RPC fields
+ String method = requestJson.getString("type", "");
+ String toolName = requestJson.getString("tool", "");
+ JsonObject params = requestJson.getJsonObject("arguments");
+
+ // Generate session ID if not present
+ String sessionId = ctx.request().getHeader("Mcp-Session-Id");
+ if (sessionId == null || sessionId.isEmpty()) {
+ sessionId = generateSessionId();
+ }
+
+ // Create MCP request based on method type
+ McpRequest mcpRequest = createMcpRequest(
+ method,
+ params,
+ sessionId,
+ toolName,
+ ctx
+ );
+
+ // Queue the request
+ if (!queue.offer(mcpRequest)) {
+ log.error("Failed to queue MCP request: queue is full");
+ ctx.response()
+ .setStatusCode(HttpResponseStatus.SERVICE_UNAVAILABLE.code())
+ .putHeader("Content-Type", "application/json")
+ .end(McpResponse.error("Service temporarily unavailable").toJsonStr());
+ return;
+ }
+
+ // If data consistency is not enabled, return immediate response
+ if (!sourceConnectorConfig.isDataConsistencyEnabled()) {
+ ctx.response()
+ .setStatusCode(HttpResponseStatus.OK.code())
+ .putHeader("Content-Type", "application/json")
+ .end(McpResponse.success().toJsonStr());
+ }
+ // Otherwise, response will be sent after processing (via commit)
+
+ } catch (Exception e) {
+ log.error("Error handling MCP request", e);
+ ctx.response()
+ .setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code())
+ .putHeader("Content-Type", "application/json")
+ .end(McpResponse.error("Internal server error: " + e.getMessage()).toJsonStr());
+ }
+ })
+ .failureHandler(ctx -> {
+ log.error("Failed to handle MCP request", ctx.failure());
+
+ // Return error response
+ ctx.response()
+ .setStatusCode(ctx.statusCode() > 0 ? ctx.statusCode() : 500)
+ .putHeader("Content-Type", "application/json")
+ .end(McpResponse.error(ctx.failure().getMessage()).toJsonStr());
+ });
+ }
+
+ /**
+ * Create MCP request from parsed JSON-RPC data
+ *
+ * @param method JSON-RPC method name
+ * @param params JSON-RPC params
+ * @param sessionId Session identifier
+ * @param tool Tool name
+ * @param ctx Routing context
+ * @return Constructed McpRequest
+ */
+ private McpRequest createMcpRequest(
+ String method,
+ JsonObject params,
+ String sessionId,
+ String tool,
+ io.vertx.ext.web.RoutingContext ctx) {
+
+ McpRequest.McpRequestBuilder builder = McpRequest.builder()
+ .protocolName(PROTOCOL_NAME)
+ .sessionId(sessionId)
+ .method(method)
+ .toolName(tool)
+ .timestamp(System.currentTimeMillis())
+ .routingContext(ctx);
+
+
+ // Handle different method types
+ if ("mcp.tools.call".equals(method) && params != null) {
+ // Tool call request
+ String toolName = params.getString("name");
+ JsonObject arguments = params.getJsonObject("arguments", new JsonObject());
+
+ builder.toolName(toolName)
+ .arguments(arguments)
+ .success(false); // Will be set to true after execution
+
+
+ } else if ("initialize".equals(method)) {
+ // Initialize request
+ builder.success(true);
+
+ } else {
+ // Other methods
+ builder.success(true);
+ }
+
+ return builder.build();
+ }
+
+ /**
+ * Convert MCP request to ConnectRecord
+ * Simple and direct conversion following the existing pattern
+ *
+ * @param message MCP request message
+ * @return ConnectRecord representation
+ */
+ @Override
+ public ConnectRecord convertToConnectRecord(Object message) {
+ // Validate input
+ if (message == null) {
+ throw new IllegalArgumentException("Message cannot be null");
+ }
+
+ if (!(message instanceof McpRequest)) {
+ throw new IllegalArgumentException(
+ String.format("Expected McpRequest but got %s", message.getClass().getName())
+ );
+ }
+
+ McpRequest request = (McpRequest) message;
+
+ // Get timestamp
+ long timestamp = request.getTimestamp() > 0
+ ? request.getTimestamp()
+ : System.currentTimeMillis();
+
+ // Get data (priority: result > arguments > inputs)
+ Object data = extractData(request);
+
+ // Create ConnectRecord
+ ConnectRecord connectRecord = new ConnectRecord(null, null, timestamp, data);
+
+ // Add protocol extension
+ connectRecord.addExtension(EXTENSION_PROTOCOL, PROTOCOL_NAME);
+
+ // Add session ID
+ if (request.getSessionId() != null) {
+ connectRecord.addExtension(EXTENSION_SESSION_ID, request.getSessionId());
+ }
+
+ // Add method
+ if (request.getMethod() != null) {
+ connectRecord.addExtension(EXTENSION_METHOD, request.getMethod());
+ }
+
+ // Add tool name (for tool calls)
+ if (request.getToolName() != null) {
+ connectRecord.addExtension(EXTENSION_TOOL_NAME, request.getToolName());
+ }
+
+ // Add success status
+ connectRecord.addExtension(EXTENSION_SUCCESS, String.valueOf(request.isSuccess()));
+
+ // Add error message if failed
+ if (!request.isSuccess() && request.getErrorMessage() != null) {
+ connectRecord.addExtension(EXTENSION_ERROR_MESSAGE, request.getErrorMessage());
+ }
+
+ // Handle Base64 decoding if needed
+ handleBase64Decoding(connectRecord);
+
+ // Add routing context for response handling
+ if (request.getRoutingContext() != null) {
+ connectRecord.addExtension(EXTENSION_ROUTING_CONTEXT, request.getRoutingContext());
+ }
+
+ return connectRecord;
+ }
+
+ /**
+ * Extract data from MCP request
+ * Priority: result > arguments > inputs
+ */
+ private Object extractData(McpRequest request) {
+ if (request.isSuccess() && request.getResult() != null) {
+ return request.getResult().encode();
+ }
+
+ if (request.getArguments() != null) {
+ return request.getArguments().encode();
+ }
+
+ return String.format("{\"tool\":\"%s\",\"timestamp\":%d}",
+ request.getToolName(), request.getTimestamp());
+ }
+
+ /**
+ * Handle Base64 decoding if isBase64 flag is set
+ */
+ private void handleBase64Decoding(ConnectRecord connectRecord) {
+ Object isBase64Obj = connectRecord.getExtensionObj(EXTENSION_IS_BASE64);
+
+ if (isBase64Obj == null) {
+ return;
+ }
+
+ // Parse boolean value
+ boolean isBase64;
+ if (isBase64Obj instanceof Boolean) {
+ isBase64 = (Boolean) isBase64Obj;
+ } else {
+ isBase64 = Boolean.parseBoolean(String.valueOf(isBase64Obj));
+ }
+
+ // Decode if needed
+ if (isBase64 && connectRecord.getData() != null) {
+ try {
+ String dataStr = connectRecord.getData().toString();
+ byte[] decodedData = Base64.getDecoder().decode(dataStr);
+ connectRecord.setData(decodedData);
+ log.debug("Decoded Base64 data: {} bytes", decodedData.length);
+ } catch (IllegalArgumentException e) {
+ log.error("Failed to decode Base64 data: {}", e.getMessage());
+ // Keep original data if decoding fails
+ }
+ }
+ }
+
+ /**
+ * Generate a unique session ID
+ *
+ * @return Generated session ID
+ */
+ private String generateSessionId() {
+ return "mcp-session-" + UUID.randomUUID();
+ }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/resources/META-INF.eventmesh/org.apache.eventmesh.openconnect.api.ConnectorCreateService b/eventmesh-connectors/eventmesh-connector-mcp/src/main/resources/META-INF.eventmesh/org.apache.eventmesh.openconnect.api.ConnectorCreateService
new file mode 100644
index 0000000..01a46e9
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/resources/META-INF.eventmesh/org.apache.eventmesh.openconnect.api.ConnectorCreateService
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+MCP-Source=org.apache.eventmesh.connector.mcp.source.McpSourceConnector
+MCP-Sink=org.apache.eventmesh.connector.mcp.sink.McpSinkConnector
\ No newline at end of file
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/resources/server-config.yml b/eventmesh-connectors/eventmesh-connector-mcp/src/main/resources/server-config.yml
new file mode 100644
index 0000000..8009f5c
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/resources/server-config.yml
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+sourceEnable: true
+sinkEnable: false
\ No newline at end of file
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/resources/sink-config.yml b/eventmesh-connectors/eventmesh-connector-mcp/src/main/resources/sink-config.yml
new file mode 100644
index 0000000..c04f886
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/resources/sink-config.yml
@@ -0,0 +1,56 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pubSubConfig:
+ meshAddress: 127.0.0.1:10000
+ subject: TopicTest
+ idc: FT
+ env: PRD
+ group: mcpSink
+ appId: 5032
+ userName: mcpSourceUser
+ passWord: mcpPassWord
+connectorConfig:
+ connectorName: mcpSink
+ urls:
+ - http://127.0.0.1:7092/test
+ keepAlive: true
+ keepAliveTimeout: 60000
+ idleTimeout: 5000 # timeunit: ms, recommended scope: common(5s - 10s), webhook(15s - 60s)
+ connectionTimeout: 5000 # timeunit: ms, recommended scope: 5 - 10s
+ maxConnectionPoolSize: 5
+ retryConfig:
+ maxRetries: 2
+ interval: 1000
+ retryOnNonSuccess: false
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/main/resources/source-config.yml b/eventmesh-connectors/eventmesh-connector-mcp/src/main/resources/source-config.yml
new file mode 100644
index 0000000..66ad75d
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/main/resources/source-config.yml
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pubSubConfig:
+ meshAddress: 127.0.0.1:10000
+ subject: TopicTest
+ idc: FT
+ env: PRD
+ group: mcpSource
+ appId: 5032
+ userName: mcpSourceUser
+ passWord: mcpPassWord
+connectorConfig:
+ connectorName: mcpSource
+ path: /test
+ port: 7091
+ idleTimeout: 5000 # timeunit: ms
+ maxFormAttributeSize: 1048576 # timeunit: byte, default: 1048576(1MB). This applies only when handling form data submissions.
+ protocol: MCP # Case insensitive, default: CloudEvent, options: CloudEvent, GitHub, Common
+ extraConfig: # extra config for different protocol, e.g. GitHub secret
+ streamType: chunked
+ contentType: application/json
+ reconnection: true
+ forwardPath: /forward
\ No newline at end of file
diff --git a/eventmesh-connectors/eventmesh-connector-mcp/src/test/resources/server-config.yml b/eventmesh-connectors/eventmesh-connector-mcp/src/test/resources/server-config.yml
new file mode 100644
index 0000000..0cd7b5b
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-mcp/src/test/resources/server-config.yml
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+sourceEnable: true
+sinkEnable: false
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/RemoteSubscribeInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/RemoteSubscribeInstance.java
index 99837d2..f045bf0 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/RemoteSubscribeInstance.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/RemoteSubscribeInstance.java
@@ -32,23 +32,66 @@
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.common.utils.JsonUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Objects;
import io.netty.handler.codec.http.HttpMethod;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
public class RemoteSubscribeInstance {
static final CloseableHttpClient httpClient = HttpClients.createDefault();
- public static void main(String[] args) {
- subscribeRemote();
+ public static void main(String[] args) throws IOException {
+ subscribeLocal();
+ //subscribeRemote();
// unsubscribeRemote();
}
+ private static void subscribeLocal() throws IOException {
+ SubscriptionItem item = new SubscriptionItem();
+ item.setTopic(ExampleConstants.EVENTMESH_HTTP_ASYNC_TEST_TOPIC);
+ item.setMode(SubscriptionMode.CLUSTERING);
+ item.setType(SubscriptionType.ASYNC);
+
+ Map<String, Object> body = new HashMap<>();
+ body.put("url", "http://127.0.0.1:8088/sub/test");
+ body.put("consumerGroup", "EventMeshTest-consumerGroup");
+ body.put("topic", Collections.singletonList(item));
+
+ String json = JsonUtils.toJSONString(body);
+ // 2) use HttpPost
+ HttpPost post = new HttpPost("http://127.0.0.1:10105/eventmesh/subscribe/local");
+ post.setHeader("Content-Type", "application/json");
+ post.setHeader("env", "prod");
+ post.setHeader("idc", "default");
+ post.setHeader("sys", "http-client-demo");
+ post.setHeader("username", "eventmesh");
+ post.setHeader("passwd", "eventmesh");
+ post.setHeader("ip", IPUtils.getLocalAddress());
+ post.setHeader("language", "JAVA");
+ post.setEntity(new StringEntity(json, StandardCharsets.UTF_8));
+
+ try (CloseableHttpResponse resp = httpClient.execute(post)) {
+ String respBody = EntityUtils.toString(resp.getEntity(), StandardCharsets.UTF_8);
+ log.info("respStatusLine:{}", resp.getStatusLine());
+ log.info("respBody:{}", respBody);
+ }
+ }
+
private static void subscribeRemote() {
SubscriptionItem subscriptionItem = new SubscriptionItem();
subscriptionItem.setTopic(ExampleConstants.EVENTMESH_HTTP_ASYNC_TEST_TOPIC);
diff --git a/settings.gradle b/settings.gradle
index b013a57..327ca7e 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -76,6 +76,7 @@
include 'eventmesh-connectors:eventmesh-connector-http'
include 'eventmesh-connectors:eventmesh-connector-chatgpt'
include 'eventmesh-connectors:eventmesh-connector-canal'
+include 'eventmesh-connectors:eventmesh-connector-mcp'
include 'eventmesh-storage-plugin:eventmesh-storage-api'
include 'eventmesh-storage-plugin:eventmesh-storage-standalone'