EventMesh Java SDK 实现了 gRPC 同步、异步和广播消息的生产者和消费者。二者都需要一个 EventMeshHttpClientConfig 类实例来指定 EventMesh gRPC 客户端的配置信息。其中的 liteEventMeshAddr、userName 和 password 字段需要和 EventMesh runtime eventmesh.properties 文件中的相匹配。
import org.apache.eventmesh.client.grpc.config.EventMeshGrpcClientConfig; import org.apache.eventmesh.client.grpc.consumer.ReceiveMsgHook; import io.cloudevents.CloudEvent; public class CloudEventsAsyncSubscribe implements ReceiveMsgHook<CloudEvent> { public static void main(String[] args) throws InterruptedException { EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder() .serverAddr("localhost") .serverPort(10205) .consumerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_CONSUMER_GROUP) .env("env").idc("idc") .sys("1234").build(); /* ... */ } }
EventMesh runtime 会将来自生产者的信息作为一系列事件流向流消费者发送。消费者应实现 ReceiveHook 类,其被定义在 ReceiveMsgHook.java。
public interface ReceiveMsgHook<T> { Optional<T> handle(T msg) throws Throwable; String getProtocolType(); }
类 EventMeshGrpcConsumer 实现了 registerListener、subscribe 和 unsubscribe 方法。subscribe 方法接收一个 SubscriptionItem 对象的列表,其中定义了要订阅的话题。registerListener 接收一个实现了 ReceiveMsgHook 的实例。handle 方法将会在消费者收到订阅的主题消息时被调用。如果 SubscriptionType 是 SYNC,handle 的返回值将被发送回生产者。
import org.apache.eventmesh.client.grpc.consumer.EventMeshGrpcConsumer; import org.apache.eventmesh.client.grpc.consumer.ReceiveMsgHook; import org.apache.eventmesh.client.tcp.common.EventMeshCommon; import org.apache.eventmesh.common.protocol.SubscriptionItem; import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.SubscriptionType; import io.cloudevents.CloudEvent; public class CloudEventsAsyncSubscribe implements ReceiveMsgHook<CloudEvent> { public static CloudEventsAsyncSubscribe handler = new CloudEventsAsyncSubscribe(); public static void main(String[] args) throws InterruptedException { /* ... */ SubscriptionItem subscriptionItem = new SubscriptionItem( "eventmesh-async-topic", SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC ); EventMeshGrpcConsumer eventMeshGrpcConsumer = new EventMeshGrpcConsumer(eventMeshClientConfig); eventMeshGrpcConsumer.init(); eventMeshGrpcConsumer.registerListener(handler); eventMeshGrpcConsumer.subscribe(Collections.singletonList(subscriptionItem)); /* ... */ eventMeshGrpcConsumer.unsubscribe(Collections.singletonList(subscriptionItem)); } @Override public Optional<CloudEvent> handle(CloudEvent message) { log.info("Messaged received: {}", message); return Optional.empty(); } @Override public String getProtocolType() { return EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME; } }
类 EventMeshGrpcConsumer 的 subscribe 方法接收一个 SubscriptionItem 对象的列表,其中定义了要订阅的主题和一个可选的 timeout 值。如果提供了回调 URL,EventMesh runtime 将向回调 URL 地址发送一个包含 CloudEvents 格式 消息的 POST 请求。SubController.java 实现了一个接收并解析回调信息的 Spring Boot controller。
import org.apache.eventmesh.client.grpc.consumer.EventMeshGrpcConsumer; import org.apache.eventmesh.client.grpc.consumer.ReceiveMsgHook; import org.apache.eventmesh.client.tcp.common.EventMeshCommon; import org.apache.eventmesh.common.protocol.SubscriptionItem; import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.SubscriptionType; @Component public class SubService implements InitializingBean { final String url = "http://localhost:8080/callback"; public void afterPropertiesSet() throws Exception { /* ... */ eventMeshGrpcConsumer = new EventMeshGrpcConsumer(eventMeshClientConfig); eventMeshGrpcConsumer.init(); SubscriptionItem subscriptionItem = new SubscriptionItem( "eventmesh-async-topic", SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC ); eventMeshGrpcConsumer.subscribe(Collections.singletonList(subscriptionItem), url); /* ... */ eventMeshGrpcConsumer.unsubscribe(Collections.singletonList(subscriptionItem), url); } }
类 EventMeshGrpcProducer 实现了 publish 方法。publish 方法接收将被发布的消息和一个可选的 timeout 值。消息应是下列类的一个实例:
org.apache.eventmesh.common.EventMeshMessageio.cloudevents.CloudEvent/* ... */ EventMeshGrpcProducer eventMeshGrpcProducer = new EventMeshGrpcProducer(eventMeshClientConfig); eventMeshGrpcProducer.init(); Map<String, String> content = new HashMap<>(); content.put("content", "testAsyncMessage"); CloudEvent event = CloudEventBuilder.v1() .withId(UUID.randomUUID().toString()) .withSubject(ExampleConstants.EVENTMESH_GRPC_ASYNC_TEST_TOPIC) .withSource(URI.create("/")) .withDataContentType(ExampleConstants.CLOUDEVENT_CONTENT_TYPE) .withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME) .withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8)) .withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000)) .build(); eventMeshGrpcProducer.publish(event);
类 EventMeshGrpcProducer 实现了 requestReply 方法。requestReply 方法接收将被发布的消息和一个可选的 timeout 值。方法会返回消费者返回的消息。消息应是下列类的一个实例:
org.apache.eventmesh.common.EventMeshMessageio.cloudevents.CloudEvent类 EventMeshGrpcProducer 重写了 publish 方法,该方法接收一个将被发布的消息列表和一个可选的 timeout 值。列表中的消息应是下列类的一个实例:
org.apache.eventmesh.common.EventMeshMessageio.cloudevents.CloudEvent/* ... */ List<CloudEvent> cloudEventList = new ArrayList<>(); for (int i = 0; i < 5; i++) { CloudEvent event = CloudEventBuilder.v1() .withId(UUID.randomUUID().toString()) .withSubject(ExampleConstants.EVENTMESH_GRPC_ASYNC_TEST_TOPIC) .withSource(URI.create("/")) .withDataContentType(ExampleConstants.CLOUDEVENT_CONTENT_TYPE) .withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME) .withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8)) .withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000)) .build(); cloudEventList.add(event); } eventMeshGrpcProducer.publish(cloudEventList); /* ... */