blob: 49be35c129bae29b1e577f739abd7baa3d457c4c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.integration.vertx;
import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.ConnectorService;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.vertx.java.core.Handler;
import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.eventbus.EventBus;
import org.vertx.java.core.eventbus.Message;
import org.vertx.java.core.eventbus.ReplyException;
import org.vertx.java.core.eventbus.impl.PingMessage;
import org.vertx.java.core.json.JsonArray;
import org.vertx.java.core.json.JsonObject;
import org.vertx.java.platform.PlatformLocator;
import org.vertx.java.platform.PlatformManager;
import org.vertx.java.spi.cluster.impl.hazelcast.HazelcastClusterManagerFactory;
class IncomingVertxEventHandler implements ConnectorService {
private final String connectorName;
private final String queueName;
private final int port;
private final String host;
private final int quorumSize;
private final String haGroup;
private final String vertxAddress;
private EventBus eventBus;
private PlatformManager platformManager;
private EventHandler handler;
private final StorageManager storageManager;
private final PostOffice postOffice;
private boolean isStarted = false;
IncomingVertxEventHandler(String connectorName,
Map<String, Object> configuration,
StorageManager storageManager,
PostOffice postOffice) {
this.connectorName = connectorName;
this.queueName = ConfigurationHelper.getStringProperty(VertxConstants.QUEUE_NAME, null, configuration);
this.port = ConfigurationHelper.getIntProperty(VertxConstants.PORT, 0, configuration);
this.host = ConfigurationHelper.getStringProperty(VertxConstants.HOST, "localhost", configuration);
this.quorumSize = ConfigurationHelper.getIntProperty(VertxConstants.VERTX_QUORUM_SIZE, -1, configuration);
this.haGroup = ConfigurationHelper.getStringProperty(VertxConstants.VERTX_HA_GROUP, "activemq", configuration);
this.vertxAddress = ConfigurationHelper.getStringProperty(VertxConstants.VERTX_ADDRESS, "org.apache.activemq", configuration);
this.storageManager = storageManager;
this.postOffice = postOffice;
}
@Override
public void start() throws Exception {
if (this.isStarted) {
return;
}
System.setProperty("vertx.clusterManagerFactory", HazelcastClusterManagerFactory.class.getName());
if (quorumSize != -1) {
platformManager = PlatformLocator.factory.createPlatformManager(port, host, quorumSize, haGroup);
}
else {
platformManager = PlatformLocator.factory.createPlatformManager(port, host);
}
eventBus = platformManager.vertx().eventBus();
Binding b = postOffice.getBinding(new SimpleString(queueName));
if (b == null) {
throw new Exception(connectorName + ": queue " + queueName + " not found");
}
handler = new EventHandler();
eventBus.registerHandler(vertxAddress, handler);
isStarted = true;
ActiveMQVertxLogger.LOGGER.debug(connectorName + ": started");
}
@Override
public void stop() throws Exception {
if (!isStarted) {
return;
}
eventBus.unregisterHandler(vertxAddress, handler);
platformManager.stop();
System.clearProperty("vertx.clusterManagerFactory");
isStarted = false;
ActiveMQVertxLogger.LOGGER.debug(connectorName + ": stopped");
}
@Override
public boolean isStarted() {
return isStarted;
}
@Override
public String getName() {
return connectorName;
}
private class EventHandler implements Handler<Message<?>> {
@Override
public void handle(Message<?> message) {
ServerMessage msg = new ServerMessageImpl(storageManager.generateID(), VertxConstants.INITIAL_MESSAGE_BUFFER_SIZE);
msg.setAddress(new SimpleString(queueName));
msg.setDurable(true);
msg.encodeMessageIDToBuffer();
String replyAddress = message.replyAddress();
if (replyAddress != null) {
msg.putStringProperty(VertxConstants.VERTX_MESSAGE_REPLYADDRESS, replyAddress);
}
// it'd be better that Message expose its type information
int type = getMessageType(message);
msg.putIntProperty(VertxConstants.VERTX_MESSAGE_TYPE, type);
manualEncodeVertxMessageBody(msg.getBodyBuffer(), message.body(), type);
try {
postOffice.route(msg, null, false);
}
catch (Exception e) {
ActiveMQVertxLogger.LOGGER.error("failed to route msg " + msg, e);
}
}
private void manualEncodeVertxMessageBody(ActiveMQBuffer bodyBuffer, Object body, int type) {
switch (type) {
case VertxConstants.TYPE_BOOLEAN:
bodyBuffer.writeBoolean(((Boolean) body));
break;
case VertxConstants.TYPE_BUFFER:
Buffer buff = (Buffer) body;
int len = buff.length();
bodyBuffer.writeInt(len);
bodyBuffer.writeBytes(((Buffer) body).getBytes());
break;
case VertxConstants.TYPE_BYTEARRAY:
byte[] bytes = (byte[]) body;
bodyBuffer.writeInt(bytes.length);
bodyBuffer.writeBytes(bytes);
break;
case VertxConstants.TYPE_BYTE:
bodyBuffer.writeByte((byte) body);
break;
case VertxConstants.TYPE_CHARACTER:
bodyBuffer.writeChar((Character) body);
break;
case VertxConstants.TYPE_DOUBLE:
bodyBuffer.writeDouble((double) body);
break;
case VertxConstants.TYPE_FLOAT:
bodyBuffer.writeFloat((Float) body);
break;
case VertxConstants.TYPE_INT:
bodyBuffer.writeInt((Integer) body);
break;
case VertxConstants.TYPE_LONG:
bodyBuffer.writeLong((Long) body);
break;
case VertxConstants.TYPE_SHORT:
bodyBuffer.writeShort((Short) body);
break;
case VertxConstants.TYPE_STRING:
case VertxConstants.TYPE_PING:
bodyBuffer.writeString((String) body);
break;
case VertxConstants.TYPE_JSON_OBJECT:
bodyBuffer.writeString(((JsonObject) body).encode());
break;
case VertxConstants.TYPE_JSON_ARRAY:
bodyBuffer.writeString(((JsonArray) body).encode());
break;
case VertxConstants.TYPE_REPLY_FAILURE:
ReplyException except = (ReplyException) body;
bodyBuffer.writeInt(except.failureType().toInt());
bodyBuffer.writeInt(except.failureCode());
bodyBuffer.writeString(except.getMessage());
break;
default:
throw new IllegalArgumentException("Invalid body type: " + type);
}
}
private int getMessageType(Message<?> message) {
Object body = message.body();
if (message instanceof PingMessage) {
return VertxConstants.TYPE_PING;
}
else if (body instanceof Buffer) {
return VertxConstants.TYPE_BUFFER;
}
else if (body instanceof Boolean) {
return VertxConstants.TYPE_BOOLEAN;
}
else if (body instanceof byte[]) {
return VertxConstants.TYPE_BYTEARRAY;
}
else if (body instanceof Byte) {
return VertxConstants.TYPE_BYTE;
}
else if (body instanceof Character) {
return VertxConstants.TYPE_CHARACTER;
}
else if (body instanceof Double) {
return VertxConstants.TYPE_DOUBLE;
}
else if (body instanceof Float) {
return VertxConstants.TYPE_FLOAT;
}
else if (body instanceof Integer) {
return VertxConstants.TYPE_INT;
}
else if (body instanceof Long) {
return VertxConstants.TYPE_LONG;
}
else if (body instanceof Short) {
return VertxConstants.TYPE_SHORT;
}
else if (body instanceof String) {
return VertxConstants.TYPE_STRING;
}
else if (body instanceof JsonArray) {
return VertxConstants.TYPE_JSON_ARRAY;
}
else if (body instanceof JsonObject) {
return VertxConstants.TYPE_JSON_OBJECT;
}
else if (body instanceof ReplyException) {
return VertxConstants.TYPE_REPLY_FAILURE;
}
throw new IllegalArgumentException("Type not supported: " + message);
}
}
@Override
public String toString() {
return "[IncomingVertxEventHandler(" + connectorName + "), queueName: " + queueName + " host: " + host + " port: " + port + " vertxAddress: " + vertxAddress + "]";
}
}