blob: b8969cd0a6abd1cb5c59332ced7982b94ea97d34 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.apollo.web.resources;
import java.{lang => jl}
import org.fusesource.hawtdispatch._
import scala.collection.Iterable
import org.apache.activemq.apollo.util.path.PathParser
import org.apache.activemq.apollo.util._
import javax.ws.rs._
import core.{Response, Context}
import javax.ws.rs.core.Response.Status._
import management.ManagementFactory
import javax.management.ObjectName
import javax.management.openmbean.CompositeData
import org.josql.{QueryResults, Query}
import java.util.regex.Pattern
import java.util.{Collections, ArrayList}
import org.apache.activemq.apollo.broker._
import org.apache.activemq.apollo.dto._
import javax.ws.rs.core.MediaType._
import javax.servlet.http.HttpServletResponse
import FutureResult._
import com.wordnik.swagger.annotations.{ApiOperation, Api}
import language.implicitConversions
import org.fusesource.hawtbuf.Buffer
import org.apache.commons.codec.binary.Base64
;
@Path( "/api/json/broker")
@Api(value = "/api/json/broker",
listingPath = "/api/docs/broker")
@Produces(Array("application/json"))
class BrokerResourceJSON extends BrokerResource
@Path( "/api/docs/broker{ext:(\\.json)?}")
@Api(value = "/api/json/broker",
listingPath = "/api/docs/broker",
listingClass = "org.apache.activemq.apollo.web.resources.BrokerResourceJSON")
class BrokerResourceHelp extends HelpResourceJSON
@Path("/broker")
@Produces(Array(APPLICATION_JSON, APPLICATION_XML, TEXT_XML, "text/html;qs=5"))
class BrokerResourceHTML extends BrokerResource
/**
* <p>
* The RuntimeResource resource manages access to the runtime state of a broker. It is used
* to see the status of the broker and to apply management operations against the broker.
* </p>
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
class BrokerResource() extends Resource {
import Resource._
def SessionResource() = {
val rc = new SessionResource()
rc.setHttpRequest(http_request)
rc
}
@Deprecated()
@GET
@Path("/whoami")
def whoami() = SessionResource().whoami()
@Deprecated()
@Produces(Array("text/html;qs=5"))
@GET
@Path("/signin")
def get_signin_html(@Context response:HttpServletResponse,
@QueryParam("username") username:String,
@QueryParam("password") password:String,
@QueryParam("target") target:String
): ErrorDTO = SessionResource().get_signin_html(response, username, password, target)
@Deprecated()
@POST
@Path("/signin")
def post_signin(@Context response:HttpServletResponse,
@FormParam("username") username:String,
@FormParam("password") password:String):Boolean = SessionResource().post_signin(response, username, password)
@Deprecated()
@Produces(Array("text/html"))
@GET @Path("/signout")
def signout_html():String = SessionResource().signout_html()
@Deprecated()
@Produces(Array(APPLICATION_JSON, APPLICATION_XML, TEXT_XML))
@GET @Path("/signout")
def signout():String = SessionResource().signout()
@GET
@ApiOperation(value = "Returns a BrokerStatusDTO which contains summary information about the broker and the JVM")
def get_broker(@QueryParam("connections") connections:Boolean):BrokerStatusDTO = {
with_broker { broker =>
monitoring(broker) {
val result = new BrokerStatusDTO
result.id = broker.id
result.jvm_metrics = create_jvm_metrics
result.current_time = now
result.state = broker.service_state.toString
result.state_since = broker.service_state.since
result.version = Broker.version
result.home_location = Option(System.getProperty("apollo.home")).getOrElse(null)
result.base_location = Option(System.getProperty("apollo.base")).getOrElse(null)
result.version = Broker.version
result.connection_counter = broker.connection_id_counter.get()
result.connected = broker.connections.size
broker.virtual_hosts.values.foreach{ host=>
result.virtual_hosts.add( host.id )
}
broker.connectors.values.foreach{ c=>
result.connectors.add( c.id )
val status = c.status
status match {
case status:ConnectorStatusDTO =>
result.messages_sent += status.messages_sent
result.messages_received += status.messages_received
result.read_counter += status.read_counter
result.write_counter += status.write_counter
}
}
// only include the connection list if it was requested.
if( !connections ) {
result.connections = null;
} else {
broker.connections.foreach { case (id,connection) =>
result.connections.add( new LongIdLabeledDTO(id, connection.transport.getRemoteAddress.toString ) )
}
}
result
}
}
}
private def create_jvm_metrics = {
val rc = new JvmMetricsDTO
rc.jvm_name = Broker.jvm
implicit def to_object_name(value:String):ObjectName = new ObjectName(value)
implicit def to_long(value:AnyRef):Long = value.asInstanceOf[java.lang.Long].longValue()
implicit def to_int(value:AnyRef):Int = value.asInstanceOf[java.lang.Integer].intValue()
implicit def to_double(value:AnyRef):Double = value.asInstanceOf[java.lang.Double].doubleValue()
def attempt(func: => Unit) = {
try {
func
} catch {
case _:Throwable => // ignore
}
}
val mbean_server = ManagementFactory.getPlatformMBeanServer()
attempt( rc.uptime = mbean_server.getAttribute("java.lang:type=Runtime", "Uptime") )
attempt( rc.start_time = mbean_server.getAttribute("java.lang:type=Runtime", "StartTime") )
attempt( rc.runtime_name = mbean_server.getAttribute("java.lang:type=Runtime", "Name").toString )
rc.os_name = Broker.os
attempt( rc.os_arch = mbean_server.getAttribute("java.lang:type=OperatingSystem", "Arch").toString )
attempt( rc.os_fd_open = mbean_server.getAttribute("java.lang:type=OperatingSystem", "OpenFileDescriptorCount"))
rc.os_fd_max = Broker.max_fd_limit.getOrElse(0)
attempt( rc.os_memory_total = mbean_server.getAttribute("java.lang:type=OperatingSystem", "TotalPhysicalMemorySize") )
attempt( rc.os_memory_free = mbean_server.getAttribute("java.lang:type=OperatingSystem", "FreePhysicalMemorySize") )
attempt( rc.os_swap_free = mbean_server.getAttribute("java.lang:type=OperatingSystem", "FreeSwapSpaceSize") )
attempt( rc.os_swap_free = mbean_server.getAttribute("java.lang:type=OperatingSystem", "TotalSwapSpaceSize") )
attempt( rc.os_load_average = mbean_server.getAttribute("java.lang:type=OperatingSystem", "SystemLoadAverage") )
attempt( rc.os_cpu_time = mbean_server.getAttribute("java.lang:type=OperatingSystem", "ProcessCpuTime") )
attempt( rc.os_processors = mbean_server.getAttribute("java.lang:type=OperatingSystem", "AvailableProcessors") )
attempt( rc.classes_loaded = mbean_server.getAttribute("java.lang:type=ClassLoading", "LoadedClassCount") )
attempt( rc.classes_unloaded = mbean_server.getAttribute("java.lang:type=ClassLoading", "UnloadedClassCount") )
attempt( rc.threads_peak = mbean_server.getAttribute("java.lang:type=Threading", "PeakThreadCount") )
attempt( rc.threads_current = mbean_server.getAttribute("java.lang:type=Threading", "ThreadCount") )
def memory_metrics(data:CompositeData) = {
val rc = new MemoryMetricsDTO
rc.alloc = data.get("committed").asInstanceOf[java.lang.Long].longValue()
rc.used = data.get("used").asInstanceOf[java.lang.Long].longValue()
rc.max = data.get("max").asInstanceOf[java.lang.Long].longValue()
rc
}
attempt( rc.heap_memory = memory_metrics(mbean_server.getAttribute("java.lang:type=Memory", "HeapMemoryUsage").asInstanceOf[CompositeData]) )
attempt( rc.non_heap_memory = memory_metrics(mbean_server.getAttribute("java.lang:type=Memory", "NonHeapMemoryUsage").asInstanceOf[CompositeData]) )
rc
}
@GET
@Path("/queue-metrics")
@ApiOperation(value = "Returns an AggregateDestMetricsDTO holding the summary of all the queue metrics")
def get_queue_metrics(): AggregateDestMetricsDTO = {
val rc:AggregateDestMetricsDTO = with_broker { broker =>
monitoring(broker) {
get_queue_metrics(broker)
}
}
rc.current_time = now
rc
}
@GET
@Path("/topic-metrics")
@ApiOperation(value = "Returns an AggregateDestMetricsDTO holding the summary of all the topic metrics")
def get_topic_metrics(): AggregateDestMetricsDTO = {
val rc:AggregateDestMetricsDTO = with_broker { broker =>
monitoring(broker) {
get_topic_metrics(broker)
}
}
rc.current_time = now
rc
}
@GET
@Path("/dsub-metrics")
@ApiOperation(value = "Returns an AggregateDestMetricsDTO holding the summary of all the durable subscription metrics")
def get_dsub_metrics(): AggregateDestMetricsDTO = {
val rc:AggregateDestMetricsDTO = with_broker { broker =>
monitoring(broker) {
get_dsub_metrics(broker)
}
}
rc.current_time = now
rc
}
def aggregate(queue:AggregateDestMetricsDTO, topic:AggregateDestMetricsDTO, dsub:AggregateDestMetricsDTO):AggregateDestMetricsDTO = {
// zero out the enqueue stats on the dsubs since they will already be accounted for in the topic
// stats.
dsub.enqueue_item_counter = 0
dsub.enqueue_size_counter = 0
dsub.enqueue_ts = 0
val rc = aggregate_dest_metrics(List(queue, dsub))
DestinationMetricsSupport.add_destination_metrics(rc, topic)
rc.objects += topic.objects
rc.current_time = now
rc
}
@GET
@Path("/dest-metrics")
@ApiOperation(value = "Returns an AggregateDestMetricsDTO holding the summary of all the destination metrics")
def get_dest_metrics(): AggregateDestMetricsDTO = {
aggregate(get_queue_metrics(), get_topic_metrics(), get_dsub_metrics())
}
def aggregate_dest_metrics(metrics:Iterable[DestMetricsDTO]):AggregateDestMetricsDTO = {
metrics.foldLeft(new AggregateDestMetricsDTO){ (to, from)=>
DestinationMetricsSupport.add_destination_metrics(to, from)
if( from.isInstanceOf[AggregateDestMetricsDTO] ) {
to.objects += from.asInstanceOf[AggregateDestMetricsDTO].objects
} else {
to.objects += 1
}
to
}
}
def get_queue_metrics(broker:Broker):FutureResult[AggregateDestMetricsDTO] = {
val metrics = sync_all(broker.virtual_hosts.values) { host =>
get_queue_metrics(host)
}
metrics.map( x=> Success(aggregate_dest_metrics(x.flatMap(_.success_option)) ))
}
def get_queue_metrics(host:VirtualHost):FutureResult[AggregateDestMetricsDTO] = host.get_queue_metrics
def get_topic_metrics(broker:Broker):FutureResult[AggregateDestMetricsDTO] = {
val metrics = sync_all(broker.virtual_hosts.values) { host =>
get_topic_metrics(host)
}
metrics.map( x=> Success(aggregate_dest_metrics(x.flatMap(_.success_option)) ))
}
def get_topic_metrics(host:VirtualHost):FutureResult[AggregateDestMetricsDTO] = host.get_topic_metrics
def get_dsub_metrics(broker:Broker):FutureResult[AggregateDestMetricsDTO] = {
val metrics = sync_all(broker.virtual_hosts.values) { host =>
get_dsub_metrics(host)
}
metrics.map( x=> Success(aggregate_dest_metrics(x.flatMap(_.success_option)) ))
}
def get_dsub_metrics(host:VirtualHost):FutureResult[AggregateDestMetricsDTO] = host.get_dsub_metrics
@GET @Path("/virtual-hosts")
@Produces(Array(APPLICATION_JSON))
@ApiOperation(value = "Returns an DataPageDTO holding all the virtual hosts")
def virtual_host(@QueryParam("f") f:java.util.List[String], @QueryParam("q") q:String,
@QueryParam("p") p:java.lang.Integer, @QueryParam("ps") ps:java.lang.Integer, @QueryParam("o") o:java.util.List[String] ):DataPageDTO = {
with_broker { broker =>
monitoring(broker) {
val records = broker.virtual_hosts.values.map { value =>
Success(status(value))
}
FutureResult(narrow(classOf[VirtualHostStatusDTO], records, f, q, p, ps, o))
}
}
}
@GET @Path("/virtual-hosts/{id}")
@ApiOperation(value = "Returns a VirtualHostStatusDTO holding the status of the requested virtual host")
def virtual_host(@PathParam("id") id : String):VirtualHostStatusDTO = {
with_virtual_host(id) { host =>
monitoring(host) {
status(host)
}
}
}
def status(host: VirtualHost): VirtualHostStatusDTO = {
val result = new VirtualHostStatusDTO
result.id = host.id
result.state = host.service_state.toString
result.state_since = host.service_state.since
result.store = host.store != null
result.host_names = host.config.host_names
val router: LocalRouter = host
router.local_queue_domain.destinations.foreach { node =>
result.queues.add(node.id)
}
router.local_topic_domain.destinations.foreach { node =>
result.topics.add(node.id)
}
router.local_dsub_domain.destination_by_id.keys.foreach { id =>
result.dsubs.add(id)
}
result
}
@GET @Path("/virtual-hosts/{id}/queue-metrics")
@ApiOperation(value = "Aggregates the messaging metrics for all the queue destinations")
def virtual_host_queue_metrics(@PathParam("id") id : String): AggregateDestMetricsDTO = {
val rc:AggregateDestMetricsDTO = with_virtual_host(id) { host =>
monitoring(host) {
get_queue_metrics(host)
}
}
rc.current_time = now
rc
}
@GET @Path("/virtual-hosts/{id}/topic-metrics")
@ApiOperation(value = "Aggregates the messaging metrics for all the topic destinations")
def virtual_host_topic_metrics(@PathParam("id") id : String): AggregateDestMetricsDTO = {
val rc:AggregateDestMetricsDTO = with_virtual_host(id) { host =>
monitoring(host) {
get_topic_metrics(host)
}
}
rc.current_time = now
rc
}
@GET @Path("/virtual-hosts/{id}/dsub-metrics")
@ApiOperation(value = "Aggregates the messaging metrics for all the durable subscription destinations")
def virtual_host_dsub_metrics(@PathParam("id") id : String): AggregateDestMetricsDTO = {
val rc:AggregateDestMetricsDTO = with_virtual_host(id) { host =>
monitoring(host) {
get_dsub_metrics(host)
}
}
rc.current_time = now
rc
}
@GET @Path("/virtual-hosts/{id}/dest-metrics")
@ApiOperation(value = "Aggregates the messaging metrics for all the destinations")
def virtual_host_dest_metrics(@PathParam("id") id : String): AggregateDestMetricsDTO = {
with_virtual_host(id) { host =>
monitoring(host) {
host.get_dest_metrics
}
}
}
@GET @Path("/virtual-hosts/{id}/store")
@ApiOperation(value = "Gets metrics about the status of the message store used by the {host} virtual host.")
def store(@PathParam("id") id : String):StoreStatusDTO = {
with_virtual_host(id) { host =>
monitoring(host) {
if(host.store!=null) {
val rc = FutureResult[StoreStatusDTO]()
host.store.get_store_status { status =>
rc(Success(status))
}
rc
} else {
result(NOT_FOUND)
}
}
}
}
@POST @Path("/virtual-hosts/{id}/store/action/compact")
@ApiOperation(value = "Compacts the store.")
@Produces(Array("text/html;qs=5"))
def store_compact(@PathParam("id") id : String):Unit = {
with_virtual_host(id) { host =>
admining(host) {
if(host.store!=null) {
val rc = FutureResult[Unit]()
host.store.compact {
rc(Success(()))
}
rc
} else {
result(NOT_FOUND)
}
}
}
}
class JosqlHelper {
def get(o:AnyRef, name:String):AnyRef = {
def invoke(o:AnyRef, name:String):Option[AnyRef] = {
try {
if(name.endsWith("()")) {
Option(o.getClass().getMethod(name.stripSuffix("()")).invoke(o))
} else {
Option(o.getClass().getField(name).get(o))
}
} catch {
case e:Throwable =>
None
}
}
var parts = name.split(Pattern.quote("."))
parts.foldLeft(Option(o)) { case(memo, field)=>
memo.flatMap(invoke(_, field))
}.getOrElse(null)
}
def NOT(o:AnyRef):AnyRef = not(o)
def Not(o:AnyRef):AnyRef = not(o)
def not(o:AnyRef):AnyRef = {
o match {
case java.lang.Boolean.TRUE => java.lang.Boolean.FALSE
case java.lang.Boolean.FALSE => java.lang.Boolean.TRUE
case null => java.lang.Boolean.TRUE
case _ => java.lang.Boolean.FALSE
}
}
}
def narrow[T](kind:Class[T], x:Iterable[Result[T, Throwable]], f:java.util.List[String], q:String, p:java.lang.Integer, ps:java.lang.Integer, o:java.util.List[String]) = {
import collection.JavaConversions._
try {
var records = x.toSeq.flatMap(_.success_option)
val page_size = if( ps !=null ) ps.intValue() else 100
val page = if( p !=null ) p.intValue() else 0
val query = new Query
query.addFunctionHandler(new JosqlHelper)
val fields = if (f.isEmpty) "*" else f.toList.map("get(:_currobj, \""+_+"\")").mkString(",")
val where_clause = if (q != null) q else "1=1"
val orderby_clause = if (o.isEmpty) "" else " ORDER BY "+o.toList.mkString(",")
query.parse("SELECT " + fields + " FROM " + kind.getName + " WHERE "+ where_clause+orderby_clause+" LIMIT "+((page_size*page)+1)+", "+page_size)
val headers = if (f.isEmpty) seqAsJavaList(List("*")) else f
val query_result: QueryResults = query.execute(records)
val list = query_result.getResults
val rc = new DataPageDTO
rc.page = page
rc.page_size = page_size
def total_pages(x:Int,y:Int) = if(x==0) 1 else { x/y + (if ( x%y == 0 ) 0 else 1) }
rc.total_pages = total_pages(query_result.getWhereResults.length, rc.page_size)
rc.total_rows = query_result.getWhereResults.length
rc.headers = headers
rc.rows = list
Success(rc)
} catch {
case e:Throwable =>
Failure(create_result(BAD_REQUEST, e.getMessage))
}
}
@GET @Path("/virtual-hosts/{id}/topics")
@ApiOperation(value = "Gets a list of all the topics that exist on the broker.")
@Produces(Array(APPLICATION_JSON))
def topics(@PathParam("id") id : String, @QueryParam("f") f:java.util.List[String],
@QueryParam("q") q:String, @QueryParam("p") p:java.lang.Integer, @QueryParam("ps") ps:java.lang.Integer, @QueryParam("o") o:java.util.List[String] ):DataPageDTO = {
with_virtual_host(id) { host =>
val router: LocalRouter = host
val records = Future.all {
router.local_topic_domain.destination_by_id.values.map { value =>
monitoring(value) {
value.status(false, false)
}
}
}
val rc:FutureResult[DataPageDTO] = records.map(narrow(classOf[TopicStatusDTO], _, f, q, p, ps, o))
rc
}
}
@GET @Path("/virtual-hosts/{id}/topics/{name:.*}")
@ApiOperation(value = "Gets the status of the named topic.")
def topic(@PathParam("id") id : String, @PathParam("name") name : String,
@QueryParam("producers") producers:Boolean,
@QueryParam("consumers") consumers:Boolean):TopicStatusDTO = {
with_virtual_host(id) { host =>
val router:LocalRouter = host
val node = router.local_topic_domain.destination_by_id.get(name).getOrElse(result(NOT_FOUND))
monitoring(node) {
node.status(producers, consumers)
}
}
}
@PUT @Path("/virtual-hosts/{id}/topics/{name:.*}")
@Produces(Array(APPLICATION_JSON, APPLICATION_XML,TEXT_XML))
@ApiOperation(value = "Creates the named topic.")
def topic_create(@PathParam("id") id : String, @PathParam("name") name : String) = ok {
with_virtual_host(id) { host =>
val rc = FutureResult[Null]()
authenticate(host.broker.authenticator) { security_context =>
val address = (new DestinationParser).decode_single_destination("topic:"+name, null)
host.local_topic_domain.get_or_create_destination(address, security_context).failure_option match {
case Some(x) => rc.set(Failure(new Exception(x)))
case _ =>
}
rc.set(Success(null))
}
rc
}
}
@DELETE @Path("/virtual-hosts/{id}/topics/{name:.*}")
@Produces(Array(APPLICATION_JSON, APPLICATION_XML,TEXT_XML))
@ApiOperation(value = "Deletes the named topic.")
def topic_delete(@PathParam("id") id : String, @PathParam("name") name : String) = ok {
with_virtual_host(id) { host =>
val router: LocalRouter = host
val node = router.local_topic_domain.destination_by_id.get(name).getOrElse(result(NOT_FOUND))
admining(node) {
node.delete.map(result(NOT_MODIFIED, _))
}
}
}
@POST @Path("/virtual-hosts/{id}/topics/{name:.*}/action/delete")
@Produces(Array("text/html;qs=5"))
def post_topic_delete_and_redirect(@PathParam("id") id : String, @PathParam("name") name : String) = {
if_ok(topic_delete(id, name)) {
result(strip_resolve("../../.."))
}
}
@GET @Path("/virtual-hosts/{id}/topic-queues/{name:.*}/{qid}")
@ApiOperation(value = "Gets the status of a topic consumer queue.")
def topic(@PathParam("id") id : String,@PathParam("name") name : String, @PathParam("qid") qid : Long,
@QueryParam("entries") entries:Boolean,
@QueryParam("producers") producers:Boolean,
@QueryParam("consumers") consumers:Boolean):QueueStatusDTO = {
with_virtual_host(id) { host =>
val router:LocalRouter = host
val node = router.local_topic_domain.destination_by_id.get(name).getOrElse(result(NOT_FOUND))
val queue =router.queues_by_store_id.get(qid).getOrElse(result(NOT_FOUND))
monitoring(node) {
sync(queue) {
queue.status(entries, producers, consumers)
}
}
}
}
@GET @Path("/virtual-hosts/{id}/topics/{name:.*}/messages")
@ApiOperation(value = "Gets a list of recent messages sent to the topic.")
def topic_messages(@PathParam("id") id : String, @PathParam("name") name : String,
@QueryParam("from") _from:java.lang.Long,
@QueryParam("max") _max:java.lang.Long,
@QueryParam("max_body") _max_body:java.lang.Integer):BrowsePageDTO = {
var from = OptionSupport(_from).getOrElse(0L)
var max = OptionSupport(_max).getOrElse(100L)
var max_body = OptionSupport(_max_body).getOrElse(0)
with_virtual_host(id) { host =>
val rc = FutureResult[BrowsePageDTO]()
val router: LocalRouter = host
val node = router.local_topic_domain.destination_by_id.get(name).getOrElse(result(NOT_FOUND))
monitoring(node) {
node.browse(from, None, max) { browse_result =>
val page = new BrowsePageDTO()
for( entry <- browse_result.entries ) {
page.messages.add(message_convert(max_body, entry))
}
page.first_seq = browse_result.first_seq
page.last_seq = browse_result.last_seq
page.total_messages = browse_result.total_entries
rc.set(Success(page))
}
}
rc
}
}
@GET @Path("/virtual-hosts/{id}/queues")
@ApiOperation(value = "Gets a list of all the queues that exist on the broker.")
@Produces(Array(APPLICATION_JSON))
def queues(@PathParam("id") id : String, @QueryParam("f") f:java.util.List[String],
@QueryParam("q") q:String, @QueryParam("p") p:java.lang.Integer, @QueryParam("ps") ps:java.lang.Integer, @QueryParam("o") o:java.util.List[String] ):DataPageDTO = {
with_virtual_host(id) { host =>
val router: LocalRouter = host
val values: Iterable[Queue] = router.local_queue_domain.destination_by_id.values
val records = sync_all(values) { value =>
status(value, false, false , false)
}
val rc:FutureResult[DataPageDTO] = records.map(narrow(classOf[QueueStatusDTO], _, f, q, p, ps, o))
rc
}
}
@GET @Path("/virtual-hosts/{id}/queues/{name:.*}")
@ApiOperation(value = "Gets the status of the named queue.")
def queue(@PathParam("id") id : String, @PathParam("name") name : String,
@QueryParam("entries") entries:Boolean,
@QueryParam("producers") producers:Boolean,
@QueryParam("consumers") consumers:Boolean ):QueueStatusDTO = {
with_virtual_host(id) { host =>
val router: LocalRouter = host
val node = router.local_queue_domain.destination_by_id.get(name).getOrElse(result(NOT_FOUND))
sync(node) {
status(node, entries, producers, consumers)
}
}
}
@GET @Path("/virtual-hosts/{id}/queues/{name:.*}/messages")
@ApiOperation(value = "Gets a list of messages that exist on the queue.")
def queue_messages(@PathParam("id") id : String, @PathParam("name") name : String,
@QueryParam("from") _from:java.lang.Long,
@QueryParam("to") _to:java.lang.Long,
@QueryParam("max") _max:java.lang.Long,
@QueryParam("max_body") _max_body:java.lang.Integer):BrowsePageDTO = {
var from = OptionSupport(_from).getOrElse(0L)
var to = OptionSupport(_to)
var max = OptionSupport(_max).getOrElse(100L)
var max_body = OptionSupport(_max_body).getOrElse(0)
with_virtual_host(id) { host =>
val rc = FutureResult[BrowsePageDTO]()
val router: LocalRouter = host
val node = router.local_queue_domain.destination_by_id.get(name).getOrElse(result(NOT_FOUND))
monitoring(node) {
node.browse(from, to, max) { browe_result =>
val page = new BrowsePageDTO()
for( entry <- browe_result.entries ) {
page.messages.add(message_convert(max_body, entry))
}
page.first_seq = browe_result.first_seq
page.last_seq = browe_result.last_seq
page.total_messages = browe_result.total_entries
rc.set(Success(page))
}
}
rc
}
}
@DELETE @Path("/virtual-hosts/{id}/queues/{name:.*}")
@Produces(Array(APPLICATION_JSON, APPLICATION_XML,TEXT_XML))
@ApiOperation(value = "Deletes the named queue.")
def queue_delete(@PathParam("id") id : String, @PathParam("name") name : String) = ok {
with_virtual_host(id) { host =>
val router: LocalRouter = host
val node = router.local_queue_domain.destination_by_id.get(name).getOrElse(result(NOT_FOUND))
admining(node) {
host.dispatch_queue {
router._destroy_queue(node)
}
}
}
}
@PUT @Path("/virtual-hosts/{id}/queues/{name:.*}")
@Produces(Array(APPLICATION_JSON, APPLICATION_XML,TEXT_XML))
@ApiOperation(value = "Creates the named queue.")
def queue_create(@PathParam("id") id : String, @PathParam("name") name : String) = ok {
with_virtual_host(id) { host =>
val rc = FutureResult[Null]()
authenticate(host.broker.authenticator) { security_context =>
val address = (new DestinationParser).decode_single_destination("queue:"+name, null)
host.local_queue_domain.get_or_create_destination(address, security_context).failure_option match {
case Some(x) => rc.set(Failure(new Exception(x)))
case _ =>
}
rc.set(Success(null))
}
rc
}
}
@POST @Path("/virtual-hosts/{id}/queues/{name:.*}/action/delete")
@Produces(Array("text/html;qs=5"))
def post_queue_delete_and_redirect(@PathParam("id") id : String, @PathParam("name") name : String) = {
if_ok(queue_delete(id, name)) {
result(strip_resolve("../../.."))
}
}
private def base64(buffer:Buffer) =
new String(Base64.encodeBase64(buffer.toByteArray), "UTF-8");
private def message_convert(max_body:Int, value:(EntryStatusDTO, Delivery)): MessageStatusDTO = {
val (entry, delivery) = value
val rc = new MessageStatusDTO
rc.codec = delivery.message.codec.id()
rc.headers = delivery.message.headers_as_json;
rc.expiration = delivery.expiration
rc.persistent = delivery.persistent
rc.entry = entry
if( max_body > 0 ) {
val body = new Buffer(delivery.message.getBodyAs(classOf[Buffer]))
if( body.length > max_body) {
body.length = max_body
rc.body_truncated = true
}
rc.base64_body = base64(body)
}
rc
}
@GET @Path("/virtual-hosts/{id}/dsubs")
@ApiOperation(value = "Gets a list of all the durable subscriptions that exist on the broker.")
@Produces(Array(APPLICATION_JSON))
def durable_subscriptions(@PathParam("id") id : String, @QueryParam("f") f:java.util.List[String],
@QueryParam("q") q:String, @QueryParam("p") p:java.lang.Integer, @QueryParam("ps") ps:java.lang.Integer, @QueryParam("o") o:java.util.List[String] ):DataPageDTO = {
with_virtual_host(id) { host =>
val router: LocalRouter = host
val values: Iterable[Queue] = router.local_dsub_domain.destination_by_id.values
val records = sync_all(values) { value =>
status(value, false, false, false)
}
val rc:FutureResult[DataPageDTO] = records.map(narrow(classOf[QueueStatusDTO], _, f, q, p, ps, o))
rc
}
}
@GET @Path("/virtual-hosts/{id}/dsubs/{name:.*}")
@ApiOperation(value = "Gets the status of the named durable subscription.")
def durable_subscription(@PathParam("id") id : String, @PathParam("name") name : String,
@QueryParam("entries") entries:Boolean,
@QueryParam("producers") producers:Boolean,
@QueryParam("consumers") consumers:Boolean):QueueStatusDTO = {
with_virtual_host(id) { host =>
val router:LocalRouter = host
val node = router.local_dsub_domain.destination_by_id.get(name).getOrElse(result(NOT_FOUND))
sync(node) {
status(node, entries, producers, consumers)
}
}
}
@DELETE @Path("/virtual-hosts/{id}/dsubs/{name:.*}")
@ApiOperation(value = "Deletes the named virtual host.")
@Produces(Array(APPLICATION_JSON, APPLICATION_XML,TEXT_XML))
def dsub_delete(@PathParam("id") id : String, @PathParam("name") name : String) = ok {
with_virtual_host(id) { host =>
val router: LocalRouter = host
val node = router.local_dsub_domain.destination_by_id.get(name).getOrElse(result(NOT_FOUND))
admining(node) {
host.dispatch_queue {
router._destroy_queue(node)
}
}
}
}
@POST @Path("/virtual-hosts/{id}/dsubs/{name:.*}/action/delete")
@Produces(Array("text/html;qs=5"))
def post_dsub_delete_and_redirect(@PathParam("id") id : String, @PathParam("name") name : String) = {
if_ok(dsub_delete(id, name)) {
result(strip_resolve("../../.."))
}
}
@GET @Path("/virtual-hosts/{id}/dsubs/{name:.*}/messages")
@ApiOperation(value = "Gets a list of the messages that exist on the durable sub.")
def dsub_messages(@PathParam("id") id : String, @PathParam("name") name : String,
@QueryParam("from") _from:java.lang.Long,
@QueryParam("to") _to:java.lang.Long,
@QueryParam("max") _max:java.lang.Long,
@QueryParam("max_body") _max_body:java.lang.Integer):BrowsePageDTO = {
var from = OptionSupport(_from).getOrElse(0L)
var to = OptionSupport(_to)
var max = OptionSupport(_max).getOrElse(100L)
var max_body = OptionSupport(_max_body).getOrElse(0)
with_virtual_host(id) { host =>
val rc = FutureResult[BrowsePageDTO]()
val router: LocalRouter = host
val node = router.local_dsub_domain.destination_by_id.get(name).getOrElse(result(NOT_FOUND))
monitoring(node) {
node.browse(from, to, max) { browse_result =>
val page = new BrowsePageDTO()
for( entry <- browse_result.entries ) {
page.messages.add(message_convert(max_body, entry))
}
page.first_seq = browse_result.first_seq
page.last_seq = browse_result.last_seq
page.total_messages = browse_result.total_entries
rc.set(Success(page))
}
}
rc
}
}
private def decode_path(name:String) = {
try {
LocalRouter.destination_parser.decode_path(name)
} catch {
case x:PathParser.PathException => result(NOT_FOUND)
}
}
def status(q:Queue, entries:Boolean=false, producers:Boolean, consumers:Boolean) = monitoring(q) {
q.status(entries, producers, consumers)
}
@GET @Path("/connectors")
@ApiOperation(value = "Gets a paginated view of all the the connectors .")
@Produces(Array(APPLICATION_JSON))
def connectors(@QueryParam("f") f:java.util.List[String], @QueryParam("q") q:String,
@QueryParam("p") p:java.lang.Integer, @QueryParam("ps") ps:java.lang.Integer, @QueryParam("o") o:java.util.List[String] ):DataPageDTO = {
with_broker { broker =>
monitoring(broker) {
val records = broker.connectors.values.map { value =>
Success(value.status)
}
FutureResult(narrow(classOf[ServiceStatusDTO], records, f, q, p, ps, o))
}
}
}
@GET @Path("/connectors/{id}")
@ApiOperation(value = "Gets the status of the specified connector.")
def connector(@PathParam("id") id : String, @QueryParam("connections") connections:Boolean):ServiceStatusDTO = {
with_connector(id) { connector =>
monitoring(connector.broker) {
val rc = connector.status
// only include the connection list if it was requested.
rc match {
case rc:ConnectorStatusDTO=>
if( !connections ) {
rc.connections = null
}
case _ =>
}
rc
}
}
}
@POST @Path("/connectors/{id}/action/stop")
@ApiOperation(value = "Stops a connector.")
def post_connector_stop(@PathParam("id") id : String) = ok {
with_connector(id) { connector =>
admining(connector.broker) {
connector.stop(NOOP)
}
}
result(strip_resolve(".."))
}
@POST @Path("/connectors/{id}/action/start")
@ApiOperation(value = "Starts a connector.")
def post_connector_start(@PathParam("id") id : String) = ok {
with_connector(id) { connector =>
admining(connector.broker) {
connector.start(NOOP)
}
}
result(strip_resolve(".."))
}
@GET
@Path("/connection-metrics")
@ApiOperation(value = "Aggregates connection metreics for all the connectors.")
def get_connection_metrics(): AggregateConnectionMetricsDTO = {
val f = new ArrayList[String]()
f.add("read_counter")
f.add("write_counter")
f.add("subscription_count")
val rs = connections(f, null, null, Integer.MAX_VALUE, Collections.emptyList())
val rc = new AggregateConnectionMetricsDTO
import collection.JavaConversions._
for( row <- rs.rows ) {
val info = row.asInstanceOf[java.util.List[_]]
def read(index:Int) = try {
info.get(index).asInstanceOf[java.lang.Number].longValue()
} catch {
case e:Throwable => 0L
}
rc.read_counter += read(0)
rc.write_counter += read(1)
rc.subscription_count += read(2)
}
rc.objects = rs.rows.length
rc.current_time = System.currentTimeMillis()
rc
}
@GET @Path("/connections")
@ApiOperation(value = "A paged view of all the connections.")
@Produces(Array(APPLICATION_JSON))
def connections(@QueryParam("f") f:java.util.List[String], @QueryParam("q") q:String,
@QueryParam("p") p:java.lang.Integer, @QueryParam("ps") ps:java.lang.Integer, @QueryParam("o") o:java.util.List[String] ):DataPageDTO = {
with_broker { broker =>
monitoring(broker) {
val records = sync_all(broker.connections.values) { value =>
value.get_connection_status(false)
}
val rc:FutureResult[DataPageDTO] = records.map(narrow(classOf[ConnectionStatusDTO], _, f, q, p, ps, o))
rc
}
}
}
@GET @Path("/connections/{id}")
@ApiOperation(value = "Gets that status of a connection.")
def connection(@PathParam("id") id : Long, @QueryParam("debug") debug:Boolean):ConnectionStatusDTO = {
with_connection(id){ connection=>
monitoring(connection.connector.broker) {
sync(connection) {
connection.get_connection_status(debug)
}
}
}
}
@DELETE @Path("/connections/{id}")
@ApiOperation(value = "Disconnect a connection from the broker.")
@Produces(Array(APPLICATION_JSON, APPLICATION_XML,TEXT_XML))
def connection_delete(@PathParam("id") id : Long) = ok {
with_connection(id){ connection=>
admining(connection.connector.broker) {
connection.stop(NOOP)
}
}
}
@POST @Path("/connections/{id}/action/delete")
@ApiOperation(value = "Disconnect a connection from the broker.")
@Produces(Array("text/html;qs=5"))
def post_connection_delete_and_redirect(@PathParam("id") id : Long) = {
if_ok(connection_delete(id)) {
result(strip_resolve("../../.."))
}
}
@POST
@Path("/action/shutdown")
@ApiOperation(value = "Shutsdown the JVM")
def command_shutdown = ok {
info("JVM shutdown requested via web interface")
with_broker { broker =>
admining(broker) {
// do the the exit async so that we don't
// kill the current request.
Broker.BLOCKABLE_THREAD_POOL.apply {
Thread.sleep(200)
System.exit(0)
}
}
}
}
@GET
@Path("/hawtdispatch/profile")
@ApiOperation(value="Enables or disables profiling")
def hawtdispatch_profile(@QueryParam("enabled") enabled : Boolean):String = {
with_broker { broker =>
admining(broker) {
Dispatch.profile(enabled)
""
}
}
}
@GET
@Path("/hawtdispatch/metrics")
@ApiOperation(value="Enables or disables profiling")
def hawtdispatch_metrics():Array[DispatchQueueMetrics] = {
with_broker { broker =>
monitoring(broker) {
val m = Dispatch.metrics()
m.toArray(new Array[Metrics](m.size())).sortWith{ case (l,r)=> l.totalRunTimeNS > r.totalRunTimeNS }.map { x =>
val rc = new DispatchQueueMetrics
rc.queue = x.queue.getLabel
rc.duration = x.durationNS
rc.waiting = (x.enqueued - x.dequeued).max(0)
rc.wait_time_max = x.maxWaitTimeNS
rc.wait_time_total = x.totalWaitTimeNS
rc.execute_time_max = x.maxRunTimeNS
rc.execute_time_total = x.totalRunTimeNS
rc.executed = x.dequeued
rc
}
}
}
}
}