blob: 8cb2093778c41f4232d1334f0912817c4ce3f746 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package flex.management.runtime.messaging;
import flex.management.BaseControl;
import flex.management.runtime.AdminConsoleDisplayRegistrar;
import flex.management.runtime.AdminConsoleTypes;
import flex.messaging.MessageBroker;
import flex.messaging.endpoints.AMFEndpoint;
import flex.messaging.endpoints.AbstractEndpoint;
import flex.messaging.endpoints.Endpoint;
import flex.messaging.endpoints.HTTPEndpoint;
import flex.messaging.endpoints.StreamingAMFEndpoint;
import flex.messaging.endpoints.StreamingHTTPEndpoint;
import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.ArrayList;
import javax.management.ObjectName;
/**
* The <code>MessageBrokerControl</code> class is the MBean implemenation for monitoring
* and managing a <code>MessageBroker</code> at runtime.
*/
public class MessageBrokerControl extends BaseControl implements MessageBrokerControlMBean {
private static final Object classMutex = new Object();
private static final String TYPE = "MessageBroker";
private static int instanceCount = 0;
private String id;
private MessageBroker broker;
private List endpointNames;
private List amfEndpoints;
private List httpEndpoints;
private List enterpriseEndpoints;
private List streamingAmfEndpoints;
private List streamingHttpEndpoints;
private List services;
/**
* Constructs a new <code>MessageBrokerControl</code> instance, assigning its
* backing <code>MessageBroker</code>.
*
* @param broker The <code>MessageBroker</code> managed by this MBean.
*/
public MessageBrokerControl(MessageBroker broker) {
super(null);
this.broker = broker;
endpointNames = new ArrayList();
amfEndpoints = new ArrayList();
httpEndpoints = new ArrayList();
enterpriseEndpoints = new ArrayList();
streamingAmfEndpoints = new ArrayList();
streamingHttpEndpoints = new ArrayList();
services = new ArrayList();
synchronized (classMutex) {
id = TYPE + ++instanceCount;
}
setRegistrar(new AdminConsoleDisplayRegistrar(this));
}
protected void onRegistrationComplete() {
String name = this.getObjectName().getCanonicalName();
getRegistrar().registerObject(AdminConsoleTypes.GENERAL_POLLABLE, name, "FlexSessionCount");
getRegistrar().registerObjects(new int[]{AdminConsoleTypes.GENERAL_POLLABLE, AdminConsoleTypes.GRAPH_BY_POLL_INTERVAL},
name, new String[]{"AMFThroughput", "HTTPThroughput", "EnterpriseThroughput"});
getRegistrar().registerObject(AdminConsoleTypes.GENERAL_SERVER, name, "MaxFlexSessionsInCurrentHour");
}
/*
* (non-Javadoc)
* @see flex.management.BaseControlMBean#getId()
*/
public String getId() {
return id;
}
/*
* (non-Javadoc)
* @see flex.management.BaseControlMBean#getType()
*/
public String getType() {
return TYPE;
}
/*
* (non-Javadoc)
* @see flex.management.runtime.MessageBrokerControlMBean#isRunning()
*/
public Boolean isRunning() {
return Boolean.valueOf(broker.isStarted());
}
/*
* (non-Javadoc)
* @see flex.management.runtime.MessageBrokerControlMBean#getStartTimestamp()
*/
public Date getStartTimestamp() {
return startTimestamp;
}
/*
* (non-Javadoc)
* @see flex.management.runtime.MessageBrokerControlMBean#getEndpoints()
*/
public ObjectName[] getEndpoints() throws IOException {
int size = endpointNames.size();
ObjectName[] endpointNameObjects = new ObjectName[size];
for (int i = 0; i < size; ++i) {
endpointNameObjects[i] = (ObjectName) endpointNames.get(i);
}
return endpointNameObjects;
}
/**
* Adds an <code>Endpoint</code> for an endpoint registered with the backing <code>MessageBroker</code>.
*
* @param value The endpoint <code>Endpoint</code>.
*/
public void addEndpoint(Endpoint value) {
if (value instanceof AMFEndpoint)
amfEndpoints.add(value);
else if (value instanceof HTTPEndpoint)
httpEndpoints.add(value);
else if (value instanceof StreamingAMFEndpoint)
streamingAmfEndpoints.add(value);
else if (value instanceof StreamingHTTPEndpoint)
streamingHttpEndpoints.add(value);
else
enterpriseEndpoints.add(value);
endpointNames.add(value.getControl().getObjectName());
}
/**
* Removes an <code>ObjectName</code> for an endpoint registered with the backing <code>MessageBroker</code>.
*
* @param value The endpoint <code>ObjectName</code>.
*/
public void removeEndpoint(ObjectName value) {
endpointNames.remove(value);
}
/*
* (non-Javadoc)
* @see flex.management.runtime.MessageBrokerControlMBean#getServices()
*/
public ObjectName[] getServices() throws IOException {
int size = services.size();
ObjectName[] serviceNames = new ObjectName[size];
for (int i = 0; i < size; ++i) {
serviceNames[i] = (ObjectName) services.get(i);
}
return serviceNames;
}
/**
* Adds an <code>ObjectName</code> for a service registered with the backing <code>MessageBroker</code>.
*
* @param value The service <code>ObjectName</code>.
*/
public void addService(ObjectName value) {
services.add(value);
}
/**
* Removes an <code>ObjectName</code> for a service registered with the backing <code>MessageBroker</code>.
*
* @param value The service <code>ObjectName</code>.
*/
public void removeService(ObjectName value) {
services.remove(value);
}
/*
* (non-Javadoc)
* @see flex.management.runtime.MessageBrokerControlMBean#getFlexSessionCount()
*/
public Integer getFlexSessionCount() {
return broker.getFlexSessionManager().getFlexSessionCount();
}
/*
* (non-Javadoc)
* @see flex.management.runtime.MessageBrokerControlMBean#getMaxFlexSessionsInCurrentHour()
*/
public Integer getMaxFlexSessionsInCurrentHour() {
return broker.getFlexSessionManager().getMaxFlexSessionsInCurrentHour();
}
/* (non-Javadoc)
* @see flex.management.runtime.messaging.MessageBrokerControlMBean#getRTMPConnectionCount()
*/
public Integer getEnterpriseConnectionCount() throws IOException {
int connections = 0;
/*
for (int i = 0; i < rtmpEndpoints.size(); i++)
{
connections += (((RTMPEndpoint)rtmpEndpoints.get(i)).getConnectionCount());
}
*/
return new Integer(connections);
}
/* (non-Javadoc)
* @see flex.management.runtime.messaging.MessageBrokerControlMBean#getAMFThroughput()
*/
public Long getAMFThroughput() throws IOException {
return new Long(calculateEndpointThroughput(amfEndpoints));
}
/* (non-Javadoc)
* @see flex.management.runtime.messaging.MessageBrokerControlMBean#getHTTPThroughput()
*/
public Long getHTTPThroughput() throws IOException {
return new Long(calculateEndpointThroughput(httpEndpoints));
}
/* (non-Javadoc)
* @see flex.management.runtime.messaging.MessageBrokerControlMBean#getRTMPThroughput()
*/
public Long getEnterpriseThroughput() throws IOException {
return new Long(calculateEndpointThroughput(enterpriseEndpoints));
}
/* (non-Javadoc)
* @see flex.management.runtime.messaging.MessageBrokerControlMBean#getStreamingAMFThroughput()
*/
public Long getStreamingAMFThroughput() throws IOException {
return new Long(calculateEndpointThroughput(streamingAmfEndpoints));
}
/* (non-Javadoc)
* @see flex.management.runtime.messaging.MessageBrokerControlMBean#getStreamingHTTPThroughput()
*/
public Long getStreamingHTTPThroughput() throws IOException {
return new Long(calculateEndpointThroughput(streamingHttpEndpoints));
}
private long calculateEndpointThroughput(List endpoints) {
long throughput = 0;
for (int i = 0; i < endpoints.size(); i++) {
// This method shouldn't be used with Lists containing objects that are not AbstractEndpoints
if (endpoints.get(i) instanceof AbstractEndpoint)
throughput += ((AbstractEndpoint) endpoints.get(i)).getThroughput();
}
return throughput;
}
}