blob: 0cb873ff5905159cefbd1a6d04bd4e15c4a2c510 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.apollo.broker
import _root_.java.io.File
import _root_.java.lang.String
import org.fusesource.hawtdispatch._
import org.fusesource.hawtbuf._
import collection.JavaConversions
import JavaConversions._
import security._
import org.apache.activemq.apollo.broker.web._
import collection.mutable.{HashSet, LinkedHashMap, HashMap}
import org.apache.activemq.apollo.util._
import org.fusesource.hawtbuf.AsciiBuffer._
import CollectionsSupport._
import FileSupport._
import management.ManagementFactory
import org.apache.activemq.apollo.dto._
import javax.management.ObjectName
import org.fusesource.hawtdispatch.TaskTracker._
import java.util.concurrent.TimeUnit
import security.SecuredResource.BrokerKind
import reflect.BeanProperty
/**
* <p>
* The BrokerFactory creates Broker objects from a URI.
* </p>
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
trait BrokerFactoryTrait {
def createBroker(brokerURI:String):Broker
}
/**
* <p>
* The BrokerFactory creates Broker objects from a URI.
* </p>
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
object BrokerFactory {
val finder = new ClassFinder[BrokerFactoryTrait]("META-INF/services/org.apache.activemq.apollo/broker-factory.index",classOf[BrokerFactoryTrait])
def createBroker(uri:String):Broker = {
if( uri == null ) {
return null
}
finder.singletons.foreach { provider=>
val broker = provider.createBroker(uri)
if( broker!=null ) {
return broker;
}
}
throw new IllegalArgumentException("Uknonwn broker uri: "+uri)
}
}
object BufferConversions {
implicit def toAsciiBuffer(value:String) = new AsciiBuffer(value)
implicit def toUTF8Buffer(value:String) = new UTF8Buffer(value)
implicit def fromAsciiBuffer(value:AsciiBuffer) = value.toString
implicit def fromUTF8Buffer(value:UTF8Buffer) = value.toString
implicit def toAsciiBuffer(value:Buffer) = value.ascii
implicit def toUTF8Buffer(value:Buffer) = value.utf8
}
/**
* <p>
* </p>
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
object BrokerRegistry extends Log {
private val brokers = HashSet[Broker]()
@volatile
private var monitor_session = 0
def list():Array[Broker] = this.synchronized {
brokers.toArray
}
def add(broker:Broker) = this.synchronized {
val rc = brokers.add(broker)
if(rc && brokers.size==1 && java.lang.Boolean.getBoolean("hawtdispatch.profile")) {
// start monitoring when the first broker starts..
monitor_session += 1
monitor_hawtdispatch(monitor_session)
}
rc
}
def remove(broker:Broker) = this.synchronized {
val rc = brokers.remove(broker)
if(rc && brokers.size==0 && java.lang.Boolean.getBoolean("hawtdispatch.profile")) {
// stomp monitoring when the last broker stops..
monitor_session += 1
}
rc
}
def monitor_hawtdispatch(session_id:Int):Unit = {
import collection.JavaConversions._
import java.util.concurrent.TimeUnit._
getGlobalQueue().after(1, SECONDS) {
if( session_id == monitor_session ) {
val m = Dispatch.metrics.toList.flatMap{x=>
if( x.totalWaitTimeNS > MILLISECONDS.toNanos(10) || x.totalRunTimeNS > MILLISECONDS.toNanos(10) ) {
Some(x)
} else {
None
}
}
if( !m.isEmpty ) {
info("-- hawtdispatch metrics -----------------------\n"+m.mkString("\n"))
}
monitor_hawtdispatch(session_id)
}
}
}
}
object Broker extends Log {
val BLOCKABLE_THREAD_POOL = ApolloThreadPool.INSTANCE
private val SERVICE_TIMEOUT = 1000*5;
def class_loader:ClassLoader = ClassFinder.class_loader
val version = using(getClass().getResourceAsStream("version.txt")) { source=>
read_text(source).trim
}
def capture(command:String*) = {
import ProcessSupport._
try {
system(command:_*) match {
case(0, out, _) => Some(new String(out).trim)
case _ => None
}
} catch {
case _ => None
}
}
val os = {
val os = System.getProperty("os.name")
val rc = os +" "+System.getProperty("os.version")
// Try to get a better version from the OS itself..
val los = os.toLowerCase()
if( los.startsWith("linux") ) {
capture("lsb_release", "-sd").map("%s (%s)".format(rc, _)).getOrElse(rc)
} else {
rc
}
}
val jvm = {
val vendor = System.getProperty("java.vendor")
val version =System.getProperty("java.version")
val vm =System.getProperty("java.vm.name")
"%s %s (%s)".format(vm, version, vendor)
}
val max_fd_limit = {
if( System.getProperty("os.name").toLowerCase().startsWith("windows") ) {
None
} else {
val mbean_server = ManagementFactory.getPlatformMBeanServer()
mbean_server.getAttribute(new ObjectName("java.lang:type=OperatingSystem"), "MaxFileDescriptorCount") match {
case x:java.lang.Long=> Some(x.longValue)
case _ => None
}
}
}
}
/**
* <p>
* A Broker is parent object of all services assoicated with the serverside of
* a message passing system. It keeps track of all running connections,
* virtual hosts and assoicated messaging destintations.
* </p>
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
class Broker() extends BaseService with SecuredResource {
import Broker._
@BeanProperty
var tmp: File = _
@BeanProperty
var config: BrokerDTO = new BrokerDTO
config.virtual_hosts.add({
val rc = new VirtualHostDTO
rc.id = "default"
rc.host_names.add("localhost")
rc
})
config.connectors.add({
val rc = new AcceptingConnectorDTO()
rc.id = "default"
rc.bind = "tcp://0.0.0.0:0"
rc
})
var default_virtual_host: VirtualHost = null
val virtual_hosts = LinkedHashMap[AsciiBuffer, VirtualHost]()
val virtual_hosts_by_hostname = new LinkedHashMap[AsciiBuffer, VirtualHost]()
val connectors = LinkedHashMap[String, Connector]()
val connections = LinkedHashMap[Long, BrokerConnection]()
val dispatch_queue = createQueue("broker")
def id = "default"
val connection_id_counter = new LongCounter
var key_storage:KeyStorage = _
var web_server:WebServer = _
@volatile
var now = System.currentTimeMillis()
var config_log:Log = Log(new MemoryLogger(Broker.log))
var audit_log:Log = Broker
var security_log:Log = Broker
var connection_log:Log = Broker
var console_log:Log = Broker
var services = Map[String, (CustomServiceDTO, Service)]()
override def toString() = "broker: "+id
var authenticator:Authenticator = _
var authorizer = Authorizer()
def resource_kind = SecuredResource.BrokerKind
/**
* Validates and then applies the configuration.
*/
def update(config: BrokerDTO, on_completed:Runnable) = dispatch_queue {
dispatch_queue.assertExecuting()
this.config = config
val tracker = new LoggingTracker("broker reconfiguration", console_log, SERVICE_TIMEOUT)
if( service_state.is_started ) {
apply_update(tracker)
}
tracker.callback(on_completed)
}
override def _start(on_completed:Runnable) = {
// create the runtime objects from the config
init_logs
log_versions
check_file_limit
BrokerRegistry.add(this)
schedule_now_update
schedule_virtualhost_maintenance
val tracker = new LoggingTracker("broker startup", console_log, SERVICE_TIMEOUT)
apply_update(tracker)
tracker.callback(on_completed)
}
def _stop(on_completed:Runnable): Unit = {
val tracker = new LoggingTracker("broker shutdown", console_log, SERVICE_TIMEOUT)
// Stop the services...
services.values.foreach( x=>
tracker.stop(x._2)
)
services = Map()
// Stop accepting connections..
connectors.values.foreach( x=>
tracker.stop(x)
)
connectors.clear()
// stop the connections..
connections.valuesIterator.foreach { connection=>
tracker.stop(connection)
}
connections.clear()
// Shutdown the virtual host services
virtual_hosts.valuesIterator.foreach( x=>
tracker.stop(x)
)
virtual_hosts.clear()
virtual_hosts_by_hostname.clear()
Option(web_server).foreach(tracker.stop(_))
web_server = null
BrokerRegistry.remove(this)
tracker.callback(on_completed)
}
def schedule_now_update:Unit = dispatch_queue.after(100, TimeUnit.MILLISECONDS) {
if( service_state.is_starting_or_started ) {
now = System.currentTimeMillis
schedule_now_update
}
}
def schedule_virtualhost_maintenance:Unit = dispatch_queue.after(1, TimeUnit.SECONDS) {
if( service_state.is_started ) {
val active_connections = connections.keySet
virtual_hosts.values.foreach { host=>
host.dispatch_queue {
if(host.service_state.is_started) {
host.router.remove_temp_destinations(active_connections)
}
}
}
schedule_virtualhost_maintenance
}
}
protected def init_logs = {
import OptionSupport._
// Configure the logging categories...
val log_category = config.log_category.getOrElse(new LogCategoryDTO)
val base_category = "org.apache.activemq.apollo.log."
security_log = Log(log_category.security.getOrElse(base_category + "security"))
audit_log = Log(log_category.audit.getOrElse(base_category + "audit"))
connection_log = Log(log_category.connection.getOrElse(base_category + "connection"))
console_log = Log(log_category.console.getOrElse(base_category + "console"))
}
protected def apply_update(tracker:LoggingTracker) {
import OptionSupport._
init_logs
key_storage = if (config.key_storage != null) {
new KeyStorage(config.key_storage)
} else {
null
}
if (config.authentication != null && config.authentication.enabled.getOrElse(true)) {
authenticator = new JaasAuthenticator(config.authentication, security_log)
authorizer=Authorizer(this)
} else {
authenticator = null
authorizer=Authorizer()
}
val host_config_by_id = HashMap[AsciiBuffer, VirtualHostDTO]()
config.virtual_hosts.foreach{ value =>
host_config_by_id += ascii(value.id) -> value
}
diff(virtual_hosts.keySet.toSet, host_config_by_id.keySet.toSet) match { case (added, updated, removed) =>
removed.foreach { id =>
for( host <- virtual_hosts.remove(id) ) {
host.config.host_names.foreach { name =>
virtual_hosts_by_hostname.remove(ascii(name))
}
tracker.stop(host)
}
}
updated.foreach { id=>
for( host <- virtual_hosts.get(id); config <- host_config_by_id.get(id) ) {
host.config.host_names.foreach { name =>
virtual_hosts_by_hostname.remove(ascii(name))
}
if( host.config.getClass == config.getClass ) {
host.update(config, tracker.task("update: "+host))
config.host_names.foreach { name =>
virtual_hosts_by_hostname += ascii(name) -> host
}
} else {
// The dto type changed.. so we have to re-create
val on_completed = tracker.task("recreate virtual host: "+id)
host.stop(^{
val host = VirtualHostFactory.create(this, config)
if( host == null ) {
console_log.warn("Could not create virtual host: "+config.id);
on_completed.run()
} else {
config.host_names.foreach { name =>
virtual_hosts_by_hostname += ascii(name) -> host
}
host.start(on_completed)
}
})
}
}
}
added.foreach { id=>
for( config <- host_config_by_id.get(id) ) {
val host = VirtualHostFactory.create(this, config)
if( host == null ) {
console_log.warn("Could not create virtual host: "+config.id);
} else {
virtual_hosts += ascii(config.id) -> host
// add all the host names of the virtual host to the virtual_hosts_by_hostname map..
config.host_names.foreach { name =>
virtual_hosts_by_hostname += ascii(name) -> host
}
tracker.start(host)
}
}
}
}
// first defined host is the default virtual host
config.virtual_hosts.headOption.map(x=>ascii(x.id)).foreach { id =>
default_virtual_host = virtual_hosts.get(id).getOrElse(null)
}
val connector_config_by_id = HashMap[String, ConnectorTypeDTO]()
config.connectors.foreach{ value =>
connector_config_by_id += value.id -> value
}
diff(connectors.keySet.toSet, connector_config_by_id.keySet.toSet) match { case (added, updated, removed) =>
removed.foreach { id =>
for( connector <- connectors.remove(id) ) {
tracker.stop(connector)
}
}
updated.foreach { id=>
for( connector <- connectors.get(id); config <- connector_config_by_id.get(id) ) {
if( connector.config.getClass == config.getClass ) {
connector.update(config, tracker.task("update: "+connector))
} else {
// The dto type changed.. so we have to re-create the connector.
val on_completed = tracker.task("recreate connector: "+id)
connector.stop(^{
val connector = ConnectorFactory.create(this, config)
if( connector == null ) {
console_log.warn("Could not create connector: "+config.id);
on_completed.run()
} else {
connectors += config.id -> connector
connector.start(on_completed)
}
})
}
}
}
added.foreach { id=>
for( config <- connector_config_by_id.get(id) ) {
val connector = ConnectorFactory.create(this, config)
if( connector == null ) {
console_log.warn("Could not create connector: "+config.id);
} else {
connectors += config.id -> connector
tracker.start(connector)
}
}
}
}
val t:Seq[(String, CustomServiceDTO)] = asScalaBuffer(config.services).map(x => (x.id ->x) )
val services_config = Map(t: _*)
diff(services.keySet, services_config.keySet) match { case (added, updated, removed) =>
removed.foreach { id =>
for( service <- services.get(id) ) {
services -= id
tracker.stop(service._2)
}
}
// Handle the updates.
added.foreach { id=>
for( new_dto <- services_config.get(id); (old_dto, service) <- services.get(id) ) {
if( new_dto != old_dto ) {
// restart.. needed.
val task = tracker.task("restart "+service)
service.stop(dispatch_queue.runnable {
// create with the new config..
val service = CustomServiceFactory.create(this, new_dto)
if( service == null ) {
console_log.warn("Could not create service: "+new_dto.id);
task.run()
} else {
// start it again..
services += new_dto.id -> (new_dto, service)
service.start(task)
}
})
}
}
}
// Create the new services..
added.foreach { id=>
for( dto <- services_config.get(id) ) {
val service = CustomServiceFactory.create(this, dto)
if( service == null ) {
console_log.warn("Could not create service: "+dto.id);
} else {
services += dto.id -> (dto, service)
tracker.start(service)
}
}
}
}
if( !config.web_admins.isEmpty ) {
if ( web_server!=null ) {
web_server.update(tracker.task("update: "+web_server))
} else {
web_server = WebServerFactory.create(this)
if (web_server==null) {
warn("Could not start admistration interface.")
} else {
tracker.start(web_server)
}
}
} else {
if( web_server!=null ) {
tracker.stop(web_server)
web_server = null
}
}
}
private def log_versions = {
val location_info = Option(System.getProperty("apollo.home")).map { home=>
" (at: "+new File(home).getCanonicalPath+")"
}.getOrElse("")
console_log.info("OS : %s", os)
console_log.info("JVM : %s", jvm)
console_log.info("Apollo : %s%s", Broker.version, location_info)
}
private def check_file_limit:Unit = {
max_fd_limit match {
case Some(limit) =>
console_log.info("OS is restricting the open file limit to: %s", limit)
var min_limit = 500 // estimate.. perhaps could we do better?
config.connectors.foreach { connector=>
import OptionSupport._
min_limit += connector.connection_limit.getOrElse(10000)
}
if( limit < min_limit ) {
console_log.warn("Please increase the process file limit using 'ulimit -n %d' or configure lower connection limits on the broker connectors.", min_limit)
}
case None =>
}
}
def get_virtual_host(name: AsciiBuffer) = dispatch_queue ! {
virtual_hosts_by_hostname.getOrElse(name, null)
}
def get_default_virtual_host = dispatch_queue ! {
default_virtual_host
}
//useful for testing
def get_connect_address = {
Option(config.client_address).getOrElse(first_accepting_connector.get.transport_server.getConnectAddress)
}
def get_socket_address = {
first_accepting_connector.get.socket_address
}
def first_accepting_connector = connectors.values.find(_.isInstanceOf[AcceptingConnector]).map(_.asInstanceOf[AcceptingConnector])
}