| # OpenMessaging Example |
| [OpenMessaging](https://openmessaging.github.io/), which includes the establishment of industry guidelines and messaging, streaming specifications to provide a common framework for finance, e-commerce, IoT and big-data area. The design principles are the cloud-oriented, simplicity, flexibility, and language independent in distributed heterogeneous environments. Conformance to these specifications will make it possible to develop a heterogeneous messaging applications across all major platforms and operating systems. |
| |
| RocketMQ provides a partial implementation of OpenMessaging 0.1.0-alpha, the following examples demonstrate how to access RocketMQ based on OpenMessaging. |
| |
| ## OMSProducer |
| The following example shows how to send message to RocketMQ broker in synchronous, asynchronous, or one-way transmissions. |
| |
| ``` |
| public class OMSProducer { |
| public static void main(String[] args) { |
| final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory |
| .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); |
| |
| final Producer producer = messagingAccessPoint.createProducer(); |
| |
| messagingAccessPoint.startup(); |
| System.out.printf("MessagingAccessPoint startup OK%n"); |
| |
| producer.startup(); |
| System.out.printf("Producer startup OK%n"); |
| |
| { |
| Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))); |
| SendResult sendResult = producer.send(message); |
| System.out.printf("Send sync message OK, msgId: %s%n", sendResult.messageId()); |
| } |
| |
| { |
| final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); |
| result.addListener(new PromiseListener<SendResult>() { |
| @Override |
| public void operationCompleted(Promise<SendResult> promise) { |
| System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId()); |
| } |
| |
| @Override |
| public void operationFailed(Promise<SendResult> promise) { |
| System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage()); |
| } |
| }); |
| } |
| |
| { |
| producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); |
| System.out.printf("Send oneway message OK%n"); |
| } |
| |
| producer.shutdown(); |
| messagingAccessPoint.shutdown(); |
| } |
| } |
| ``` |
| ## OMSPullConsumer |
| Use OMS PullConsumer to poll messages from a specified queue. |
| |
| ``` |
| public class OMSPullConsumer { |
| public static void main(String[] args) { |
| final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory |
| .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); |
| |
| final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC", |
| OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER")); |
| |
| messagingAccessPoint.startup(); |
| System.out.printf("MessagingAccessPoint startup OK%n"); |
| |
| consumer.startup(); |
| System.out.printf("Consumer startup OK%n"); |
| |
| Message message = consumer.poll(); |
| if (message != null) { |
| String msgId = message.headers().getString(MessageHeader.MESSAGE_ID); |
| System.out.printf("Received one message: %s%n", msgId); |
| consumer.ack(msgId); |
| } |
| |
| consumer.shutdown(); |
| messagingAccessPoint.shutdown(); |
| } |
| } |
| |
| ``` |
| ## OMSPushConsumer |
| Attaches OMS PushConsumer to a specified queue and consumes messages by MessageListener |
| |
| ``` |
| public class OMSPushConsumer { |
| public static void main(String[] args) { |
| final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory |
| .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); |
| |
| final PushConsumer consumer = messagingAccessPoint. |
| createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER")); |
| |
| messagingAccessPoint.startup(); |
| System.out.printf("MessagingAccessPoint startup OK%n"); |
| |
| Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { |
| @Override |
| public void run() { |
| consumer.shutdown(); |
| messagingAccessPoint.shutdown(); |
| } |
| })); |
| |
| consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() { |
| @Override |
| public void onMessage(final Message message, final ReceivedMessageContext context) { |
| System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID)); |
| context.ack(); |
| } |
| }); |
| |
| } |
| } |
| ``` |