blob: 1826553bd4f0fe8519d7bb722e2886f4dc335399 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.apollo.openwire
import command._
import org.apache.activemq.apollo.dto.DestinationDTO
import org.apache.activemq.apollo.broker.security.SecurityContext
import collection.mutable.HashMap
import DestinationConverter._
import support.advisory.AdvisorySupport
import scala.util.continuations._
import org.apache.activemq.apollo.util._
import java.util.Map.Entry
import org.apache.activemq.apollo.broker._
import org.fusesource.hawtdispatch._
import org.fusesource.hawtbuf.UTF8Buffer
/**
* <p>
* </p>
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
object DestinationAdvisoryRouterListenerFactory extends RouterListenerFactory {
def create(router: Router) = new DestinationAdvisoryRouterListener(router)
}
object DestinationAdvisoryRouterListener extends Log {
final val ID_GENERATOR = new IdGenerator
}
/**
* <p>
* A listener to Route events which implements Destination advisories
* which are needed
* </p>
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
class DestinationAdvisoryRouterListener(router: Router) extends RouterListener {
import DestinationAdvisoryRouterListener._
final val destination_advisories = HashMap[ActiveMQDestination, Delivery]()
final val advisoryProducerId = new ProducerId
final val messageIdGenerator = new LongSequenceGenerator
advisoryProducerId.setConnectionId(new UTF8Buffer(ID_GENERATOR.generateId))
class ProducerRoute extends DeliveryProducerRoute(router) {
val sink_switcher = new MutableSink[Delivery]
val overflow_sink = new OverflowSink(sink_switcher)
override protected def on_connected = {
sink_switcher.downstream = Some(this)
}
override def dispatch_queue = router.virtual_host.dispatch_queue
}
var producerRoutes = new LRUCache[List[DestinationDTO], ProducerRoute](10) {
override def onCacheEviction(eldest: Entry[List[DestinationDTO], ProducerRoute]) = {
router.disconnect(eldest.getKey.toArray, eldest.getValue)
}
}
def on_create(dest: DomainDestination, security: SecurityContext) = {
val ow_destination = to_activemq_destination(Array(dest.destination_dto))
if (ow_destination!=null && !AdvisorySupport.isAdvisoryTopic(ow_destination)) {
destination_advisories.getOrElseUpdate(ow_destination, {
var info = new DestinationInfo(null, DestinationInfo.ADD_OPERATION_TYPE, ow_destination)
val topic = AdvisorySupport.getDestinationAdvisoryTopic(ow_destination);
val advisory = create_advisory_delivery(topic, info)
send(advisory)
advisory
})
}
}
def on_destroy(dest: DomainDestination, security: SecurityContext) = {
val destination = to_activemq_destination(Array(dest.destination_dto))
if (destination!=null && !AdvisorySupport.isAdvisoryTopic(destination)) {
for (info <- destination_advisories.remove(destination)) {
var info = new DestinationInfo(null, DestinationInfo.REMOVE_OPERATION_TYPE, destination)
val topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
send(create_advisory_delivery(topic, info));
}
}
}
def on_bind(dest: DomainDestination, consumer: DeliveryConsumer, security: SecurityContext) = {
val destination = to_activemq_destination(Array(dest.destination_dto))
if (destination!=null && AdvisorySupport.isDestinationAdvisoryTopic(destination) && !destination_advisories.isEmpty) {
// replay the destination advisories..
val producer = new ProducerRoute {
override def on_connected = {
overflow_sink.refiller = ^{
// once the sink is not overflowed.. then we can disconnect
if(!overflow_sink.overflowed) {
unbind(consumer::Nil)
overflow_sink.refiller = NOOP
}
}
overflow_sink.refiller.run()
super.on_connected
}
}
producer.bind(consumer::Nil)
producer.connected()
for( info <- destination_advisories.values ) {
producer.overflow_sink.offer(info)
}
}
}
def on_unbind(dest: DomainDestination, consumer: DeliveryConsumer, persistent: Boolean) = {
}
def on_connect(dest: DomainDestination, producer: BindableDeliveryProducer, security: SecurityContext) = {
}
def on_disconnect(dest: DomainDestination, producer: BindableDeliveryProducer) = {
}
def close = {
import collection.JavaConversions._
for (entry <- producerRoutes.entrySet()) {
router.disconnect(entry.getKey.toArray, entry.getValue)
}
producerRoutes.clear
}
def create_advisory_delivery(topic: ActiveMQTopic, command: Command) = {
// advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName());
// val id = getBrokerId() != null ? getBrokerId().getValue(): "NOT_SET";
// advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id);
val message = new ActiveMQMessage()
message.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, "NOT_SET");
// val url = getBrokerService().getVmConnectorURI().toString();
// if (getBrokerService().getDefaultSocketURIString() != null) {
// url = getBrokerService().getDefaultSocketURIString();
// }
// advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url);
//set the data structure
message.setDataStructure(command);
message.setPersistent(false);
message.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE_BUFFER);
message.setMessageId(new MessageId(advisoryProducerId, messageIdGenerator.getNextSequenceId()));
// message.setTargetConsumerId(targetConsumerId);
message.setDestination(topic);
message.setResponseRequired(false);
message.setProducerId(advisoryProducerId);
val delivery = new Delivery
delivery.message = new OpenwireMessage(message)
delivery.size = message.getSize
delivery
}
def send(delivery:Delivery): Unit = {
val message = delivery.message.asInstanceOf[OpenwireMessage].message
val dest: Array[DestinationDTO] = to_destination_dto(message.getDestination,null)
val key = dest.toList
val route = producerRoutes.get(key) match {
case null =>
// create the producer route...
val route = new ProducerRoute
producerRoutes.put(key, route)
reset {
val rc = router.connect(dest, route, null)
rc match {
case Some(failure) =>
warn("Could not connect to advisory topic: " + message.getDestination)
case None =>
}
}
route
case route => route
}
route.overflow_sink.offer(delivery)
}
}