| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.activemq.broker; |
| |
| import java.io.BufferedReader; |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.InputStreamReader; |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.net.UnknownHostException; |
| import java.security.Provider; |
| import java.security.Security; |
| import java.util.*; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.RejectedExecutionException; |
| import java.util.concurrent.RejectedExecutionHandler; |
| import java.util.concurrent.SynchronousQueue; |
| import java.util.concurrent.ThreadFactory; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import javax.annotation.PostConstruct; |
| import javax.annotation.PreDestroy; |
| import javax.management.InstanceNotFoundException; |
| import javax.management.MalformedObjectNameException; |
| import javax.management.ObjectName; |
| |
| import org.apache.activemq.ActiveMQConnectionMetaData; |
| import org.apache.activemq.ConfigurationException; |
| import org.apache.activemq.Service; |
| import org.apache.activemq.advisory.AdvisoryBroker; |
| import org.apache.activemq.broker.cluster.ConnectionSplitBroker; |
| import org.apache.activemq.broker.jmx.AnnotatedMBean; |
| import org.apache.activemq.broker.jmx.BrokerMBeanSupport; |
| import org.apache.activemq.broker.jmx.BrokerView; |
| import org.apache.activemq.broker.jmx.ConnectorView; |
| import org.apache.activemq.broker.jmx.ConnectorViewMBean; |
| import org.apache.activemq.broker.jmx.HealthView; |
| import org.apache.activemq.broker.jmx.HealthViewMBean; |
| import org.apache.activemq.broker.jmx.JmsConnectorView; |
| import org.apache.activemq.broker.jmx.JobSchedulerView; |
| import org.apache.activemq.broker.jmx.JobSchedulerViewMBean; |
| import org.apache.activemq.broker.jmx.Log4JConfigView; |
| import org.apache.activemq.broker.jmx.ManagedRegionBroker; |
| import org.apache.activemq.broker.jmx.ManagementContext; |
| import org.apache.activemq.broker.jmx.NetworkConnectorView; |
| import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean; |
| import org.apache.activemq.broker.jmx.ProxyConnectorView; |
| import org.apache.activemq.broker.region.CompositeDestinationInterceptor; |
| import org.apache.activemq.broker.region.Destination; |
| import org.apache.activemq.broker.region.DestinationFactory; |
| import org.apache.activemq.broker.region.DestinationFactoryImpl; |
| import org.apache.activemq.broker.region.DestinationInterceptor; |
| import org.apache.activemq.broker.region.RegionBroker; |
| import org.apache.activemq.broker.region.policy.PolicyMap; |
| import org.apache.activemq.broker.region.virtual.MirroredQueue; |
| import org.apache.activemq.broker.region.virtual.VirtualDestination; |
| import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; |
| import org.apache.activemq.broker.region.virtual.VirtualTopic; |
| import org.apache.activemq.broker.scheduler.JobSchedulerStore; |
| import org.apache.activemq.broker.scheduler.SchedulerBroker; |
| import org.apache.activemq.broker.scheduler.memory.InMemoryJobSchedulerStore; |
| import org.apache.activemq.command.ActiveMQDestination; |
| import org.apache.activemq.command.ActiveMQQueue; |
| import org.apache.activemq.command.BrokerId; |
| import org.apache.activemq.command.ProducerInfo; |
| import org.apache.activemq.filter.DestinationFilter; |
| import org.apache.activemq.network.DiscoveryNetworkConnector; |
| import org.apache.activemq.network.NetworkConnector; |
| import org.apache.activemq.network.jms.JmsConnector; |
| import org.apache.activemq.openwire.OpenWireFormat; |
| import org.apache.activemq.proxy.ProxyConnector; |
| import org.apache.activemq.security.MessageAuthorizationPolicy; |
| import org.apache.activemq.selector.SelectorParser; |
| import org.apache.activemq.store.JournaledStore; |
| import org.apache.activemq.store.PListStore; |
| import org.apache.activemq.store.PersistenceAdapter; |
| import org.apache.activemq.store.PersistenceAdapterFactory; |
| import org.apache.activemq.store.memory.MemoryPersistenceAdapter; |
| import org.apache.activemq.thread.Scheduler; |
| import org.apache.activemq.thread.TaskRunnerFactory; |
| import org.apache.activemq.transport.TransportFactorySupport; |
| import org.apache.activemq.transport.TransportServer; |
| import org.apache.activemq.transport.vm.VMTransportFactory; |
| import org.apache.activemq.usage.PercentLimitUsage; |
| import org.apache.activemq.usage.StoreUsage; |
| import org.apache.activemq.usage.SystemUsage; |
| import org.apache.activemq.util.BrokerSupport; |
| import org.apache.activemq.util.DefaultIOExceptionHandler; |
| import org.apache.activemq.util.IOExceptionHandler; |
| import org.apache.activemq.util.IOExceptionSupport; |
| import org.apache.activemq.util.IOHelper; |
| import org.apache.activemq.util.InetAddressUtil; |
| import org.apache.activemq.util.ServiceStopper; |
| import org.apache.activemq.util.StoreUtil; |
| import org.apache.activemq.util.ThreadPoolUtils; |
| import org.apache.activemq.util.TimeUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.slf4j.MDC; |
| |
| /** |
| * Manages the life-cycle of an ActiveMQ Broker. A BrokerService consists of a |
| * number of transport connectors, network connectors and a bunch of properties |
| * which can be used to configure the broker as its lazily created. |
| * |
| * @org.apache.xbean.XBean |
| */ |
| public class BrokerService implements Service { |
| public static final String DEFAULT_PORT = "61616"; |
| public static final String LOCAL_HOST_NAME; |
| public static final String BROKER_VERSION; |
| public static final String DEFAULT_BROKER_NAME = "localhost"; |
| public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32; |
| public static final long DEFAULT_START_TIMEOUT = 600000L; |
| public static final int MAX_SCHEDULER_REPEAT_ALLOWED = 1000; |
| |
| private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class); |
| |
| @SuppressWarnings("unused") |
| private static final long serialVersionUID = 7353129142305630237L; |
| |
| private boolean useJmx = true; |
| private boolean enableStatistics = true; |
| private boolean persistent = true; |
| private boolean populateJMSXUserID; |
| private boolean useAuthenticatedPrincipalForJMSXUserID; |
| private boolean populateUserNameInMBeans; |
| private long mbeanInvocationTimeout = 0; |
| |
| private boolean useShutdownHook = true; |
| private boolean useLoggingForShutdownErrors; |
| private boolean shutdownOnMasterFailure; |
| private boolean shutdownOnSlaveFailure; |
| private boolean waitForSlave; |
| private long waitForSlaveTimeout = DEFAULT_START_TIMEOUT; |
| private boolean passiveSlave; |
| private String brokerName = DEFAULT_BROKER_NAME; |
| private File dataDirectoryFile; |
| private File tmpDataDirectory; |
| private Broker broker; |
| private BrokerView adminView; |
| private ManagementContext managementContext; |
| private ObjectName brokerObjectName; |
| private TaskRunnerFactory taskRunnerFactory; |
| private TaskRunnerFactory persistenceTaskRunnerFactory; |
| private SystemUsage systemUsage; |
| private SystemUsage producerSystemUsage; |
| private SystemUsage consumerSystemUsage; |
| private PersistenceAdapter persistenceAdapter; |
| private PersistenceAdapterFactory persistenceFactory; |
| protected DestinationFactory destinationFactory; |
| private MessageAuthorizationPolicy messageAuthorizationPolicy; |
| private final List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<>(); |
| private final List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<>(); |
| private final List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<>(); |
| private final List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<>(); |
| private final List<Service> services = new ArrayList<>(); |
| private transient Thread shutdownHook; |
| private String[] transportConnectorURIs; |
| private String[] networkConnectorURIs; |
| private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges |
| // to other jms messaging systems |
| private boolean deleteAllMessagesOnStartup; |
| private boolean advisorySupport = true; |
| private boolean anonymousProducerAdvisorySupport = false; |
| private URI vmConnectorURI; |
| private String defaultSocketURIString; |
| private PolicyMap destinationPolicy; |
| private final AtomicBoolean started = new AtomicBoolean(false); |
| private final AtomicBoolean stopped = new AtomicBoolean(false); |
| private final AtomicBoolean stopping = new AtomicBoolean(false); |
| private final AtomicBoolean preShutdownHooksInvoked = new AtomicBoolean(false); |
| private BrokerPlugin[] plugins; |
| private boolean keepDurableSubsActive = true; |
| private boolean enableMessageExpirationOnActiveDurableSubs = false; |
| private boolean useVirtualTopics = true; |
| private boolean useMirroredQueues = false; |
| private boolean useTempMirroredQueues = true; |
| /** |
| * Whether or not virtual destination subscriptions should cause network demand |
| */ |
| private boolean useVirtualDestSubs = false; |
| /** |
| * Whether or not the creation of destinations that match virtual destinations |
| * should cause network demand |
| */ |
| private boolean useVirtualDestSubsOnCreation = false; |
| private BrokerId brokerId; |
| private volatile DestinationInterceptor[] destinationInterceptors; |
| private ActiveMQDestination[] destinations; |
| private PListStore tempDataStore; |
| private int persistenceThreadPriority = Thread.MAX_PRIORITY; |
| private boolean useLocalHostBrokerName; |
| private final CountDownLatch stoppedLatch = new CountDownLatch(1); |
| private final CountDownLatch startedLatch = new CountDownLatch(1); |
| private Broker regionBroker; |
| private int producerSystemUsagePortion = 60; |
| private int consumerSystemUsagePortion = 40; |
| private boolean splitSystemUsageForProducersConsumers; |
| private boolean monitorConnectionSplits = false; |
| private int taskRunnerPriority = Thread.NORM_PRIORITY; |
| private boolean dedicatedTaskRunner; |
| private boolean cacheTempDestinations = false;// useful for failover |
| private int timeBeforePurgeTempDestinations = 5000; |
| private final List<Runnable> shutdownHooks = new ArrayList<>(); |
| private boolean systemExitOnShutdown; |
| private int systemExitOnShutdownExitCode; |
| private SslContext sslContext; |
| private boolean forceStart = false; |
| private IOExceptionHandler ioExceptionHandler; |
| private boolean schedulerSupport = false; |
| private int maxSchedulerRepeatAllowed = MAX_SCHEDULER_REPEAT_ALLOWED; |
| private File schedulerDirectoryFile; |
| private Scheduler scheduler; |
| private ThreadPoolExecutor executor; |
| private int schedulePeriodForDestinationPurge= 0; |
| private int maxPurgedDestinationsPerSweep = 0; |
| private int schedulePeriodForDiskUsageCheck = 0; |
| private int diskUsageCheckRegrowThreshold = -1; |
| private boolean adjustUsageLimits = true; |
| private BrokerContext brokerContext; |
| private boolean networkConnectorStartAsync = false; |
| private boolean allowTempAutoCreationOnSend; |
| private JobSchedulerStore jobSchedulerStore; |
| private final AtomicLong totalConnections = new AtomicLong(); |
| private final AtomicInteger currentConnections = new AtomicInteger(); |
| |
| private long offlineDurableSubscriberTimeout = -1; |
| private long offlineDurableSubscriberTaskSchedule = 300000; |
| private DestinationFilter virtualConsumerDestinationFilter; |
| |
| private final AtomicBoolean persistenceAdapterStarted = new AtomicBoolean(false); |
| private Throwable startException = null; |
| private boolean startAsync = false; |
| private Date startDate; |
| private boolean slave = true; |
| |
| private boolean restartAllowed = true; |
| private boolean restartRequested = false; |
| private boolean rejectDurableConsumers = false; |
| private boolean rollbackOnlyOnAsyncException = true; |
| |
| private int storeOpenWireVersion = OpenWireFormat.DEFAULT_STORE_VERSION; |
| private final List<Runnable> preShutdownHooks = new CopyOnWriteArrayList<>(); |
| |
| static { |
| |
| try { |
| ClassLoader loader = BrokerService.class.getClassLoader(); |
| Class<?> clazz = loader.loadClass("org.bouncycastle.jce.provider.BouncyCastleProvider"); |
| Provider bouncycastle = (Provider) clazz.getDeclaredConstructor().newInstance(); |
| Integer bouncyCastlePosition = Integer.getInteger("org.apache.activemq.broker.BouncyCastlePosition"); |
| int ret; |
| if (bouncyCastlePosition != null) { |
| ret = Security.insertProviderAt(bouncycastle, bouncyCastlePosition); |
| } else { |
| ret = Security.addProvider(bouncycastle); |
| } |
| LOG.info("Loaded the Bouncy Castle security provider at position: {}", ret); |
| } catch(Throwable e) { |
| // No BouncyCastle found so we use the default Java Security Provider |
| } |
| |
| String localHostName = "localhost"; |
| try { |
| localHostName = InetAddressUtil.getLocalHostName(); |
| } catch (UnknownHostException e) { |
| LOG.error("Failed to resolve localhost"); |
| } |
| LOCAL_HOST_NAME = localHostName; |
| |
| String version = null; |
| try(InputStream in = BrokerService.class.getResourceAsStream("/org/apache/activemq/version.txt")) { |
| if (in != null) { |
| try(InputStreamReader isr = new InputStreamReader(in); |
| BufferedReader reader = new BufferedReader(isr)) { |
| version = reader.readLine(); |
| } |
| } |
| } catch (IOException ie) { |
| LOG.warn("Error reading broker version", ie); |
| } |
| BROKER_VERSION = version; |
| } |
| |
| @Override |
| public String toString() { |
| return "BrokerService[" + getBrokerName() + "]"; |
| } |
| |
| private String getBrokerVersion() { |
| String version = ActiveMQConnectionMetaData.PROVIDER_VERSION; |
| if (version == null) { |
| version = BROKER_VERSION; |
| } |
| |
| return version; |
| } |
| |
| /** |
| * Adds a new transport connector for the given bind address |
| * |
| * @return the newly created and added transport connector |
| */ |
| public TransportConnector addConnector(String bindAddress) throws Exception { |
| return addConnector(new URI(bindAddress)); |
| } |
| |
| /** |
| * Adds a new transport connector for the given bind address |
| * |
| * @return the newly created and added transport connector |
| */ |
| public TransportConnector addConnector(URI bindAddress) throws Exception { |
| return addConnector(createTransportConnector(bindAddress)); |
| } |
| |
| /** |
| * Adds a new transport connector for the given TransportServer transport |
| * |
| * @return the newly created and added transport connector |
| */ |
| public TransportConnector addConnector(TransportServer transport) throws Exception { |
| return addConnector(new TransportConnector(transport)); |
| } |
| |
| /** |
| * Adds a new transport connector |
| * |
| * @return the transport connector |
| */ |
| public TransportConnector addConnector(TransportConnector connector) throws Exception { |
| transportConnectors.add(connector); |
| return connector; |
| } |
| |
| /** |
| * Stops and removes a transport connector from the broker. |
| * |
| * @return true if the connector has been previously added to the broker |
| */ |
| public boolean removeConnector(TransportConnector connector) throws Exception { |
| boolean rc = transportConnectors.remove(connector); |
| if (rc) { |
| unregisterConnectorMBean(connector); |
| } |
| return rc; |
| } |
| |
| /** |
| * Adds a new network connector using the given discovery address |
| * |
| * @return the newly created and added network connector |
| */ |
| public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception { |
| return addNetworkConnector(new URI(discoveryAddress)); |
| } |
| |
| /** |
| * Adds a new proxy connector using the given bind address |
| * |
| * @return the newly created and added network connector |
| */ |
| public ProxyConnector addProxyConnector(String bindAddress) throws Exception { |
| return addProxyConnector(new URI(bindAddress)); |
| } |
| |
| /** |
| * Adds a new network connector using the given discovery address |
| * |
| * @return the newly created and added network connector |
| */ |
| public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception { |
| NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress); |
| return addNetworkConnector(connector); |
| } |
| |
| /** |
| * Adds a new proxy connector using the given bind address |
| * |
| * @return the newly created and added network connector |
| */ |
| public ProxyConnector addProxyConnector(URI bindAddress) throws Exception { |
| ProxyConnector connector = new ProxyConnector(); |
| connector.setBind(bindAddress); |
| connector.setRemote(new URI("fanout:multicast://default")); |
| return addProxyConnector(connector); |
| } |
| |
| /** |
| * Adds a new network connector to connect this broker to a federated |
| * network |
| */ |
| public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception { |
| connector.setBrokerService(this); |
| connector.setLocalUri(getVmConnectorURI()); |
| // Set a connection filter so that the connector does not establish loop |
| // back connections. |
| connector.setConnectionFilter(location -> { |
| List<TransportConnector> transportConnectors = getTransportConnectors(); |
| for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) { |
| try { |
| TransportConnector tc = iter.next(); |
| if (location.equals(tc.getConnectUri())) { |
| return false; |
| } |
| } catch (Throwable e) { |
| // no-op |
| } |
| } |
| return true; |
| }); |
| networkConnectors.add(connector); |
| return connector; |
| } |
| |
| /** |
| * Removes the given network connector without stopping it. The caller |
| * should call {@link NetworkConnector#stop()} to close the connector |
| */ |
| public boolean removeNetworkConnector(NetworkConnector connector) { |
| boolean answer = networkConnectors.remove(connector); |
| if (answer) { |
| unregisterNetworkConnectorMBean(connector); |
| } |
| return answer; |
| } |
| |
| public ProxyConnector addProxyConnector(ProxyConnector connector) throws Exception { |
| URI uri = getVmConnectorURI(); |
| connector.setLocalUri(uri); |
| proxyConnectors.add(connector); |
| if (isUseJmx()) { |
| registerProxyConnectorMBean(connector); |
| } |
| return connector; |
| } |
| |
| public JmsConnector addJmsConnector(JmsConnector connector) throws Exception { |
| connector.setBrokerService(this); |
| jmsConnectors.add(connector); |
| if (isUseJmx()) { |
| registerJmsConnectorMBean(connector); |
| } |
| return connector; |
| } |
| |
| /** |
| * Adds a {@link Runnable} hook that will be invoked before the |
| * broker is stopped. This allows performing cleanup actions |
| * before the broker is stopped. The hook should not throw |
| * exceptions or block. |
| */ |
| public final void addPreShutdownHook(final Runnable hook) { |
| preShutdownHooks.add(hook); |
| } |
| |
| public JmsConnector removeJmsConnector(JmsConnector connector) { |
| if (jmsConnectors.remove(connector)) { |
| return connector; |
| } |
| return null; |
| } |
| |
| public void masterFailed() { |
| if (shutdownOnMasterFailure) { |
| LOG.error("The Master has failed ... shutting down"); |
| try { |
| stop(); |
| } catch (Exception e) { |
| LOG.error("Failed to stop for master failure", e); |
| } |
| } else { |
| LOG.warn("Master Failed - starting all connectors"); |
| try { |
| startAllConnectors(); |
| broker.nowMasterBroker(); |
| } catch (Exception e) { |
| LOG.error("Failed to startAllConnectors", e); |
| } |
| } |
| } |
| |
| public String getUptime() { |
| long delta = getUptimeMillis(); |
| |
| if (delta == 0) { |
| return "not started"; |
| } |
| |
| return TimeUtils.printDuration(delta); |
| } |
| |
| public long getUptimeMillis() { |
| if (startDate == null) { |
| return 0; |
| } |
| |
| return new Date().getTime() - startDate.getTime(); |
| } |
| |
| public boolean isStarted() { |
| return started.get() && startedLatch.getCount() == 0; |
| } |
| |
| /** |
| * Forces a start of the broker. |
| * By default a BrokerService instance that was |
| * previously stopped using BrokerService.stop() cannot be restarted |
| * using BrokerService.start(). |
| * This method enforces a restart. |
| * It is not recommended to force a restart of the broker and will not work |
| * for most but some very trivial broker configurations. |
| * For restarting a broker instance we recommend to first call stop() on |
| * the old instance and then recreate a new BrokerService instance. |
| * |
| * @param force - if true enforces a restart. |
| */ |
| public void start(boolean force) throws Exception { |
| forceStart = force; |
| stopped.set(false); |
| started.set(false); |
| start(); |
| } |
| |
| // Service interface |
| // ------------------------------------------------------------------------- |
| |
| protected boolean shouldAutostart() { |
| return true; |
| } |
| |
| /** |
| * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions |
| * |
| * delegates to autoStart, done to prevent backwards incompatible signature change |
| */ |
| @PostConstruct |
| private void postConstruct() { |
| try { |
| autoStart(); |
| } catch (Exception ex) { |
| throw new RuntimeException(ex); |
| } |
| } |
| |
| /** |
| * |
| * @org.apache.xbean.InitMethod |
| */ |
| public void autoStart() throws Exception { |
| if(shouldAutostart()) { |
| start(); |
| } |
| } |
| |
| @Override |
| public void start() throws Exception { |
| if (stopped.get() || !started.compareAndSet(false, true)) { |
| // lets just ignore redundant start() calls |
| // as its way too easy to not be completely sure if start() has been |
| // called or not with the gazillion of different configuration |
| // mechanisms |
| // throw new IllegalStateException("Already started."); |
| return; |
| } |
| |
| setStartException(null); |
| stopping.set(false); |
| preShutdownHooksInvoked.set(false); |
| startDate = new Date(); |
| MDC.put("activemq.broker", brokerName); |
| |
| try { |
| checkMemorySystemUsageLimits(); |
| if (systemExitOnShutdown && useShutdownHook) { |
| throw new ConfigurationException("'useShutdownHook' property cannot be be used with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)"); |
| } |
| processHelperProperties(); |
| if (isUseJmx()) { |
| // need to remove MDC during starting JMX, as that would otherwise causes leaks, as spawned threads inheirt the MDC and |
| // we cannot cleanup clear that during shutdown of the broker. |
| MDC.remove("activemq.broker"); |
| try { |
| startManagementContext(); |
| for (NetworkConnector connector : getNetworkConnectors()) { |
| registerNetworkConnectorMBean(connector); |
| } |
| } finally { |
| MDC.put("activemq.broker", brokerName); |
| } |
| } |
| |
| // in jvm master slave, lets not publish over existing broker till we get the lock |
| final BrokerRegistry brokerRegistry = BrokerRegistry.getInstance(); |
| if (brokerRegistry.lookup(getBrokerName()) == null) { |
| brokerRegistry.bind(getBrokerName(), BrokerService.this); |
| } |
| startPersistenceAdapter(startAsync); |
| startBroker(startAsync); |
| brokerRegistry.bind(getBrokerName(), BrokerService.this); |
| } catch (Exception e) { |
| LOG.error("Failed to start Apache ActiveMQ ({}, {})", getBrokerName(), brokerId, e); |
| try { |
| if (!stopped.get()) { |
| stop(); |
| } |
| } catch (Exception ex) { |
| LOG.warn("Failed to stop broker after failure in start. This exception will be ignored", ex); |
| } |
| throw e; |
| } finally { |
| MDC.remove("activemq.broker"); |
| } |
| } |
| |
| private void startPersistenceAdapter(boolean async) throws Exception { |
| if (async) { |
| new Thread("Persistence Adapter Starting Thread") { |
| @Override |
| public void run() { |
| try { |
| doStartPersistenceAdapter(); |
| } catch (Throwable e) { |
| setStartException(e); |
| } finally { |
| synchronized (persistenceAdapterStarted) { |
| persistenceAdapterStarted.set(true); |
| persistenceAdapterStarted.notifyAll(); |
| } |
| } |
| } |
| }.start(); |
| } else { |
| doStartPersistenceAdapter(); |
| } |
| } |
| |
| private void doStartPersistenceAdapter() throws Exception { |
| PersistenceAdapter persistenceAdapterToStart = getPersistenceAdapter(); |
| if (persistenceAdapterToStart == null) { |
| checkStartException(); |
| throw new ConfigurationException("Cannot start null persistence adapter"); |
| } |
| persistenceAdapterToStart.setUsageManager(getProducerSystemUsage()); |
| persistenceAdapterToStart.setBrokerName(getBrokerName()); |
| LOG.info("Using Persistence Adapter: {}", persistenceAdapterToStart); |
| if (deleteAllMessagesOnStartup) { |
| deleteAllMessages(); |
| } |
| persistenceAdapterToStart.start(); |
| |
| getTempDataStore(); |
| if (tempDataStore != null) { |
| try { |
| // start after we have the store lock |
| tempDataStore.start(); |
| } catch (Exception e) { |
| RuntimeException exception = new RuntimeException( |
| "Failed to start temp data store: " + tempDataStore, e); |
| LOG.error(exception.getLocalizedMessage(), e); |
| throw exception; |
| } |
| } |
| |
| getJobSchedulerStore(); |
| if (jobSchedulerStore != null) { |
| try { |
| jobSchedulerStore.start(); |
| } catch (Exception e) { |
| RuntimeException exception = new RuntimeException( |
| "Failed to start job scheduler store: " + jobSchedulerStore, e); |
| LOG.error(exception.getLocalizedMessage(), e); |
| throw exception; |
| } |
| } |
| } |
| |
| private void startBroker(boolean async) throws Exception { |
| if (async) { |
| new Thread("Broker Starting Thread") { |
| @Override |
| public void run() { |
| try { |
| synchronized (persistenceAdapterStarted) { |
| if (!persistenceAdapterStarted.get()) { |
| persistenceAdapterStarted.wait(); |
| } |
| } |
| doStartBroker(); |
| } catch (Throwable t) { |
| setStartException(t); |
| } |
| } |
| }.start(); |
| } else { |
| doStartBroker(); |
| } |
| } |
| |
| private void doStartBroker() throws Exception { |
| checkStartException(); |
| startDestinations(); |
| addShutdownHook(); |
| |
| broker = getBroker(); |
| brokerId = broker.getBrokerId(); |
| |
| // need to log this after creating the broker so we have its id and name |
| LOG.info("Apache ActiveMQ {} ({}, {}) is starting", getBrokerVersion(), getBrokerName(), brokerId); |
| broker.start(); |
| |
| if (isUseJmx()) { |
| if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) { |
| // try to restart management context |
| // typical for slaves that use the same ports as master |
| managementContext.stop(); |
| startManagementContext(); |
| } |
| ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker; |
| managedBroker.setContextBroker(broker); |
| adminView.setBroker(managedBroker); |
| } |
| |
| if (ioExceptionHandler == null) { |
| setIoExceptionHandler(new DefaultIOExceptionHandler()); |
| } |
| |
| if (isUseJmx() && Log4JConfigView.isLog4JAvailable()) { |
| ObjectName objectName = BrokerMBeanSupport.createLog4JConfigViewName(getBrokerObjectName().toString()); |
| Log4JConfigView log4jConfigView = new Log4JConfigView(); |
| AnnotatedMBean.registerMBean(getManagementContext(), log4jConfigView, objectName); |
| } |
| |
| startAllConnectors(); |
| |
| LOG.info("Apache ActiveMQ {} ({}, {}) started", getBrokerVersion(), getBrokerName(), brokerId); |
| LOG.info("For help or more information please see: http://activemq.apache.org"); |
| |
| getBroker().brokerServiceStarted(); |
| checkStoreSystemUsageLimits(); |
| startedLatch.countDown(); |
| getBroker().nowMasterBroker(); |
| } |
| |
| /** |
| * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions |
| * |
| * delegates to stop, done to prevent backwards incompatible signature change |
| */ |
| @PreDestroy |
| private void preDestroy () { |
| try { |
| stop(); |
| } catch (Exception ex) { |
| throw new RuntimeException(); |
| } |
| } |
| |
| /** |
| * |
| * @org.apache.xbean.DestroyMethod |
| */ |
| @Override |
| public void stop() throws Exception { |
| final ServiceStopper stopper = new ServiceStopper(); |
| |
| //The preShutdownHooks need to run before stopping.compareAndSet() |
| //so there is a separate AtomicBoolean so the hooks only run once |
| //We want to make sure the hooks are run before stop is initialized |
| //including setting the stopping variable - See AMQ-6706 |
| if (preShutdownHooksInvoked.compareAndSet(false, true)) { |
| for (Runnable hook : preShutdownHooks) { |
| try { |
| hook.run(); |
| } catch (Throwable e) { |
| stopper.onException(hook, e); |
| } |
| } |
| } |
| |
| if (!stopping.compareAndSet(false, true)) { |
| LOG.trace("Broker already stopping/stopped"); |
| return; |
| } |
| |
| setStartException(new BrokerStoppedException("Stop invoked")); |
| MDC.put("activemq.broker", brokerName); |
| |
| if (systemExitOnShutdown) { |
| new Thread(() -> System.exit(systemExitOnShutdownExitCode)).start(); |
| } |
| |
| LOG.info("Apache ActiveMQ {} ({}, {}) is shutting down", getBrokerVersion(), getBrokerName(), brokerId); |
| |
| removeShutdownHook(); |
| if (this.scheduler != null) { |
| this.scheduler.stop(); |
| this.scheduler = null; |
| } |
| if (services != null) { |
| for (Service service : services) { |
| stopper.stop(service); |
| } |
| } |
| stopAllConnectors(stopper); |
| this.slave = true; |
| // remove any VMTransports connected |
| // this has to be done after services are stopped, |
| // to avoid timing issue with discovery (spinning up a new instance) |
| BrokerRegistry.getInstance().unbind(getBrokerName()); |
| VMTransportFactory.stopped(getBrokerName()); |
| if (broker != null) { |
| stopper.stop(broker); |
| broker = null; |
| } |
| |
| if (jobSchedulerStore != null) { |
| jobSchedulerStore.stop(); |
| jobSchedulerStore = null; |
| } |
| if (tempDataStore != null) { |
| tempDataStore.stop(); |
| tempDataStore = null; |
| } |
| try { |
| stopper.stop(getPersistenceAdapter()); |
| persistenceAdapter = null; |
| if (isUseJmx()) { |
| stopper.stop(managementContext); |
| managementContext = null; |
| } |
| // Clear SelectorParser cache to free memory |
| SelectorParser.clearCache(); |
| } finally { |
| started.set(false); |
| stopped.set(true); |
| stoppedLatch.countDown(); |
| } |
| |
| if (this.taskRunnerFactory != null) { |
| this.taskRunnerFactory.shutdown(); |
| this.taskRunnerFactory = null; |
| } |
| if (this.executor != null) { |
| ThreadPoolUtils.shutdownNow(executor); |
| this.executor = null; |
| } |
| |
| this.destinationInterceptors = null; |
| this.destinationFactory = null; |
| |
| if (startDate != null) { |
| LOG.info("Apache ActiveMQ {} ({}, {}) uptime {}", getBrokerVersion(), getBrokerName(), brokerId, getUptime()); |
| } |
| LOG.info("Apache ActiveMQ {} ({}, {}) is shutdown", getBrokerVersion(), getBrokerName(), brokerId); |
| |
| synchronized (shutdownHooks) { |
| for (Runnable hook : shutdownHooks) { |
| try { |
| hook.run(); |
| } catch (Throwable e) { |
| stopper.onException(hook, e); |
| } |
| } |
| } |
| |
| MDC.remove("activemq.broker"); |
| |
| // and clear start date |
| startDate = null; |
| |
| stopper.throwFirstException(); |
| } |
| |
| public boolean checkQueueSize(String queueName) { |
| long count = 0; |
| long queueSize; |
| Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getDestinationMap(); |
| for (Map.Entry<ActiveMQDestination, Destination> entry : destinationMap.entrySet()) { |
| if (entry.getKey().isQueue()) { |
| if (entry.getValue().getName().matches(queueName)) { |
| queueSize = entry.getValue().getDestinationStatistics().getMessages().getCount(); |
| count += queueSize; |
| if (queueSize > 0) { |
| LOG.info("Queue has pending message: {} queueSize is: {}", entry.getValue().getName(), queueSize); |
| } |
| } |
| } |
| } |
| return count == 0; |
| } |
| |
| /** |
| * This method (both connectorName and queueName are using regex to match) |
| * 1. stop the connector (supposed the user input the connector which the |
| * clients connect to) 2. to check whether there is any pending message on |
| * the queues defined by queueName 3. supposedly, after stop the connector, |
| * client should failover to other broker and pending messages should be |
| * forwarded. if no pending messages, the method finally call stop to stop |
| * the broker. |
| */ |
| public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) throws Exception { |
| if (isUseJmx()) { |
| if (connectorName == null || queueName == null || timeout <= 0) { |
| throw new Exception( |
| "connectorName and queueName cannot be null and timeout should be >0 for stopGracefully."); |
| } |
| if (pollInterval <= 0) { |
| pollInterval = 30; |
| } |
| LOG.info("Stop gracefully with connectorName: {} queueName: {} timeout: {} pollInterval: {}", |
| connectorName, queueName, timeout, pollInterval); |
| TransportConnector connector; |
| for (TransportConnector transportConnector : transportConnectors) { |
| connector = transportConnector; |
| if (connector != null && connector.getName() != null && connector.getName().matches(connectorName)) { |
| connector.stop(); |
| } |
| } |
| long start = System.currentTimeMillis(); |
| while (System.currentTimeMillis() - start < timeout * 1000) { |
| // check quesize until it gets zero |
| if (checkQueueSize(queueName)) { |
| stop(); |
| break; |
| } else { |
| Thread.sleep(pollInterval * 1000); |
| } |
| } |
| if (stopped.get()) { |
| LOG.info("Successfully stop the broker."); |
| } else { |
| LOG.info("There is still pending message on the queue. Please check and stop the broker manually."); |
| } |
| } |
| } |
| |
| /** |
| * A helper method to block the caller thread until the broker has been |
| * stopped |
| */ |
| public void waitUntilStopped() { |
| while (isStarted() && !stopped.get()) { |
| try { |
| stoppedLatch.await(); |
| } catch (InterruptedException e) { |
| // ignore |
| } |
| } |
| } |
| |
| public boolean isStopped() { |
| return stopped.get(); |
| } |
| |
| /** |
| * A helper method to block the caller thread until the broker has fully started |
| * @return boolean true if wait succeeded false if broker was not started or was stopped |
| */ |
| public boolean waitUntilStarted() { |
| return waitUntilStarted(DEFAULT_START_TIMEOUT); |
| } |
| |
| /** |
| * A helper method to block the caller thread until the broker has fully started |
| * |
| * @param timeout |
| * the amount of time to wait before giving up and returning false. |
| * |
| * @return boolean true if wait succeeded false if broker was not started or was stopped |
| */ |
| public boolean waitUntilStarted(long timeout) { |
| boolean waitSucceeded = isStarted(); |
| long expiration = Math.max(0, timeout + System.currentTimeMillis()); |
| while (!isStarted() && !stopped.get() && !waitSucceeded && expiration > System.currentTimeMillis()) { |
| try { |
| if (getStartException() != null) { |
| return waitSucceeded; |
| } |
| waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS); |
| } catch (InterruptedException ignore) { |
| } |
| } |
| return waitSucceeded; |
| } |
| |
| // Properties |
| // ------------------------------------------------------------------------- |
| /** |
| * Returns the message broker |
| */ |
| public Broker getBroker() throws Exception { |
| if (broker == null) { |
| checkStartException(); |
| broker = createBroker(); |
| } |
| return broker; |
| } |
| |
| /** |
| * Returns the administration view of the broker; used to create and destroy |
| * resources such as queues and topics. Note this method returns null if JMX |
| * is disabled. |
| */ |
| public BrokerView getAdminView() throws Exception { |
| if (adminView == null) { |
| // force lazy creation |
| getBroker(); |
| } |
| return adminView; |
| } |
| |
| public void setAdminView(BrokerView adminView) { |
| this.adminView = adminView; |
| } |
| |
| public String getBrokerName() { |
| return brokerName; |
| } |
| |
| |
| private static final String brokerNameReplacedCharsRegExp = "[^a-zA-Z0-9\\.\\_\\-\\:]"; |
| |
| /** |
| * Sets the name of this broker; which must be unique in the network. |
| */ |
| public void setBrokerName(String brokerName) { |
| if (brokerName == null) { |
| throw new NullPointerException("The broker name cannot be null"); |
| } |
| String str = brokerName.replaceAll(brokerNameReplacedCharsRegExp, "_"); |
| if (!str.equals(brokerName)) { |
| LOG.error("Broker Name: {} contained illegal characters matching regExp: {} - replaced with {}", brokerName, brokerNameReplacedCharsRegExp, str); |
| } |
| this.brokerName = str.trim(); |
| } |
| |
| public PersistenceAdapterFactory getPersistenceFactory() { |
| return persistenceFactory; |
| } |
| |
| public File getDataDirectoryFile() { |
| if (dataDirectoryFile == null) { |
| dataDirectoryFile = new File(IOHelper.getDefaultDataDirectory()); |
| } |
| return dataDirectoryFile; |
| } |
| |
| public File getBrokerDataDirectory() { |
| String brokerDir = getBrokerName(); |
| return new File(getDataDirectoryFile(), brokerDir); |
| } |
| |
| /** |
| * Sets the directory in which the data files will be stored by default for |
| * the JDBC and Journal persistence adaptors. |
| * |
| * @param dataDirectory |
| * the directory to store data files |
| */ |
| public void setDataDirectory(String dataDirectory) { |
| setDataDirectoryFile(new File(dataDirectory)); |
| } |
| |
| /** |
| * Sets the directory in which the data files will be stored by default for |
| * the JDBC and Journal persistence adaptors. |
| * |
| * @param dataDirectoryFile |
| * the directory to store data files |
| */ |
| public void setDataDirectoryFile(File dataDirectoryFile) { |
| this.dataDirectoryFile = dataDirectoryFile; |
| } |
| |
| /** |
| * @return the tmpDataDirectory |
| */ |
| public File getTmpDataDirectory() { |
| if (tmpDataDirectory == null) { |
| tmpDataDirectory = new File(getBrokerDataDirectory(), "tmp_storage"); |
| } |
| return tmpDataDirectory; |
| } |
| |
| /** |
| * @param tmpDataDirectory |
| * the tmpDataDirectory to set |
| */ |
| public void setTmpDataDirectory(File tmpDataDirectory) { |
| this.tmpDataDirectory = tmpDataDirectory; |
| } |
| |
| public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) { |
| this.persistenceFactory = persistenceFactory; |
| } |
| |
| public void setDestinationFactory(DestinationFactory destinationFactory) { |
| this.destinationFactory = destinationFactory; |
| } |
| |
| public boolean isPersistent() { |
| return persistent; |
| } |
| |
| /** |
| * Sets whether or not persistence is enabled or disabled. |
| * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" |
| */ |
| public void setPersistent(boolean persistent) { |
| this.persistent = persistent; |
| } |
| |
| public boolean isPopulateJMSXUserID() { |
| return populateJMSXUserID; |
| } |
| |
| /** |
| * Sets whether or not the broker should populate the JMSXUserID header. |
| */ |
| public void setPopulateJMSXUserID(boolean populateJMSXUserID) { |
| this.populateJMSXUserID = populateJMSXUserID; |
| } |
| |
| public SystemUsage getSystemUsage() { |
| try { |
| if (systemUsage == null) { |
| |
| systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore(), getJobSchedulerStore()); |
| systemUsage.setExecutor(getExecutor()); |
| systemUsage.getMemoryUsage().setLimit(1024L * 1024 * 1024 * 1); // 1 GB |
| systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB |
| systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100 GB |
| systemUsage.getJobSchedulerUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB |
| addService(this.systemUsage); |
| } |
| return systemUsage; |
| } catch (IOException e) { |
| LOG.error("Cannot create SystemUsage", e); |
| throw new RuntimeException("Fatally failed to create SystemUsage" + e.getMessage(), e); |
| } |
| } |
| |
| public void setSystemUsage(SystemUsage memoryManager) { |
| if (this.systemUsage != null) { |
| removeService(this.systemUsage); |
| } |
| this.systemUsage = memoryManager; |
| if (this.systemUsage.getExecutor()==null) { |
| this.systemUsage.setExecutor(getExecutor()); |
| } |
| addService(this.systemUsage); |
| } |
| |
| /** |
| * @return the consumerUsageManager |
| */ |
| public SystemUsage getConsumerSystemUsage() { |
| if (this.consumerSystemUsage == null) { |
| if (splitSystemUsageForProducersConsumers) { |
| this.consumerSystemUsage = new SystemUsage(getSystemUsage(), "Consumer"); |
| float portion = consumerSystemUsagePortion / 100f; |
| this.consumerSystemUsage.getMemoryUsage().setUsagePortion(portion); |
| addService(this.consumerSystemUsage); |
| } else { |
| consumerSystemUsage = getSystemUsage(); |
| } |
| } |
| return this.consumerSystemUsage; |
| } |
| |
| /** |
| * @param consumerSystemUsage |
| * the storeSystemUsage to set |
| */ |
| public void setConsumerSystemUsage(SystemUsage consumerSystemUsage) { |
| if (this.consumerSystemUsage != null) { |
| removeService(this.consumerSystemUsage); |
| } |
| this.consumerSystemUsage = consumerSystemUsage; |
| addService(this.consumerSystemUsage); |
| } |
| |
| /** |
| * @return the producerUsageManager |
| */ |
| public SystemUsage getProducerSystemUsage() { |
| if (producerSystemUsage == null) { |
| if (splitSystemUsageForProducersConsumers) { |
| producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer"); |
| float portion = producerSystemUsagePortion / 100f; |
| producerSystemUsage.getMemoryUsage().setUsagePortion(portion); |
| addService(producerSystemUsage); |
| } else { |
| producerSystemUsage = getSystemUsage(); |
| } |
| } |
| return producerSystemUsage; |
| } |
| |
| /** |
| * @param producerUsageManager |
| * the producerUsageManager to set |
| */ |
| public void setProducerSystemUsage(SystemUsage producerUsageManager) { |
| if (this.producerSystemUsage != null) { |
| removeService(this.producerSystemUsage); |
| } |
| this.producerSystemUsage = producerUsageManager; |
| addService(this.producerSystemUsage); |
| } |
| |
| public synchronized PersistenceAdapter getPersistenceAdapter() throws IOException { |
| if (persistenceAdapter == null && !hasStartException()) { |
| persistenceAdapter = createPersistenceAdapter(); |
| configureService(persistenceAdapter); |
| this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter); |
| } |
| return persistenceAdapter; |
| } |
| |
| /** |
| * Sets the persistence adaptor implementation to use for this broker |
| */ |
| public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException { |
| if (!isPersistent() && ! (persistenceAdapter instanceof MemoryPersistenceAdapter)) { |
| LOG.warn("persistent=\"false\", ignoring configured persistenceAdapter: {}", persistenceAdapter); |
| return; |
| } |
| this.persistenceAdapter = persistenceAdapter; |
| configureService(this.persistenceAdapter); |
| this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter); |
| } |
| |
| public TaskRunnerFactory getTaskRunnerFactory() { |
| if (this.taskRunnerFactory == null) { |
| this.taskRunnerFactory = new TaskRunnerFactory("ActiveMQ BrokerService["+getBrokerName()+"] Task", getTaskRunnerPriority(), true, 1000, |
| isDedicatedTaskRunner()); |
| this.taskRunnerFactory.setThreadClassLoader(this.getClass().getClassLoader()); |
| } |
| return this.taskRunnerFactory; |
| } |
| |
| public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { |
| this.taskRunnerFactory = taskRunnerFactory; |
| } |
| |
| public TaskRunnerFactory getPersistenceTaskRunnerFactory() { |
| if (taskRunnerFactory == null) { |
| persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority, |
| true, 1000, isDedicatedTaskRunner()); |
| } |
| return persistenceTaskRunnerFactory; |
| } |
| |
| public void setPersistenceTaskRunnerFactory(TaskRunnerFactory persistenceTaskRunnerFactory) { |
| this.persistenceTaskRunnerFactory = persistenceTaskRunnerFactory; |
| } |
| |
| public boolean isUseJmx() { |
| return useJmx; |
| } |
| |
| public boolean isEnableStatistics() { |
| return enableStatistics; |
| } |
| |
| /** |
| * Sets whether or not the Broker's services enable statistics or not. |
| */ |
| public void setEnableStatistics(boolean enableStatistics) { |
| this.enableStatistics = enableStatistics; |
| } |
| |
| /** |
| * Sets whether or not the Broker's services should be exposed into JMX or |
| * not. |
| * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" |
| */ |
| public void setUseJmx(boolean useJmx) { |
| this.useJmx = useJmx; |
| } |
| |
| public ObjectName getBrokerObjectName() throws MalformedObjectNameException { |
| if (brokerObjectName == null) { |
| brokerObjectName = createBrokerObjectName(); |
| } |
| return brokerObjectName; |
| } |
| |
| /** |
| * Sets the JMX ObjectName for this broker |
| */ |
| public void setBrokerObjectName(ObjectName brokerObjectName) { |
| this.brokerObjectName = brokerObjectName; |
| } |
| |
| public ManagementContext getManagementContext() { |
| if (managementContext == null) { |
| checkStartException(); |
| managementContext = new ManagementContext(); |
| } |
| return managementContext; |
| } |
| |
| synchronized private void checkStartException() { |
| if (startException != null) { |
| throw new BrokerStoppedException(startException); |
| } |
| } |
| |
| synchronized private boolean hasStartException() { |
| return startException != null; |
| } |
| |
| synchronized private void setStartException(Throwable t) { |
| startException = t; |
| } |
| |
| public void setManagementContext(ManagementContext managementContext) { |
| this.managementContext = managementContext; |
| } |
| |
| public NetworkConnector getNetworkConnectorByName(String connectorName) { |
| for (NetworkConnector connector : networkConnectors) { |
| if (connector.getName().equals(connectorName)) { |
| return connector; |
| } |
| } |
| return null; |
| } |
| |
| public String[] getNetworkConnectorURIs() { |
| return networkConnectorURIs; |
| } |
| |
| public void setNetworkConnectorURIs(String[] networkConnectorURIs) { |
| this.networkConnectorURIs = networkConnectorURIs; |
| } |
| |
| public TransportConnector getConnectorByName(String connectorName) { |
| for (TransportConnector connector : transportConnectors) { |
| if (connector.getName().equals(connectorName)) { |
| return connector; |
| } |
| } |
| return null; |
| } |
| |
| public Map<String, String> getTransportConnectorURIsAsMap() { |
| Map<String, String> answer = new HashMap<>(); |
| for (TransportConnector connector : transportConnectors) { |
| try { |
| URI uri = connector.getConnectUri(); |
| if (uri != null) { |
| String scheme = uri.getScheme(); |
| if (scheme != null) { |
| answer.put(scheme.toLowerCase(Locale.ENGLISH), uri.toString()); |
| } |
| } |
| } catch (Exception e) { |
| LOG.debug("Failed to read URI to build transportURIsAsMap", e); |
| } |
| } |
| return answer; |
| } |
| |
| public ProducerBrokerExchange getProducerBrokerExchange(ProducerInfo producerInfo){ |
| ProducerBrokerExchange result = null; |
| |
| for (TransportConnector connector : transportConnectors) { |
| for (TransportConnection tc: connector.getConnections()){ |
| result = tc.getProducerBrokerExchangeIfExists(producerInfo); |
| if (result !=null){ |
| return result; |
| } |
| } |
| } |
| return result; |
| } |
| |
| public String[] getTransportConnectorURIs() { |
| return transportConnectorURIs; |
| } |
| |
| public void setTransportConnectorURIs(String[] transportConnectorURIs) { |
| this.transportConnectorURIs = transportConnectorURIs; |
| } |
| |
| /** |
| * @return Returns the jmsBridgeConnectors. |
| */ |
| public JmsConnector[] getJmsBridgeConnectors() { |
| return jmsBridgeConnectors; |
| } |
| |
| /** |
| * @param jmsConnectors |
| * The jmsBridgeConnectors to set. |
| */ |
| public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors) { |
| this.jmsBridgeConnectors = jmsConnectors; |
| } |
| |
| public Service[] getServices() { |
| return services.toArray(new Service[0]); |
| } |
| |
| /** |
| * Sets the services associated with this broker. |
| */ |
| public void setServices(Service[] services) { |
| this.services.clear(); |
| if (services != null) { |
| Collections.addAll(this.services, services); |
| } |
| } |
| |
| /** |
| * Adds a new service so that it will be started as part of the broker |
| * lifecycle |
| */ |
| public void addService(Service service) { |
| services.add(service); |
| } |
| |
| public void removeService(Service service) { |
| services.remove(service); |
| } |
| |
| public boolean isUseLoggingForShutdownErrors() { |
| return useLoggingForShutdownErrors; |
| } |
| |
| /** |
| * Sets whether or not we should use commons-logging when reporting errors |
| * when shutting down the broker |
| */ |
| public void setUseLoggingForShutdownErrors(boolean useLoggingForShutdownErrors) { |
| this.useLoggingForShutdownErrors = useLoggingForShutdownErrors; |
| } |
| |
| public boolean isUseShutdownHook() { |
| return useShutdownHook; |
| } |
| |
| /** |
| * Sets whether or not we should use a shutdown handler to close down the |
| * broker cleanly if the JVM is terminated. It is recommended you leave this |
| * enabled. |
| */ |
| public void setUseShutdownHook(boolean useShutdownHook) { |
| this.useShutdownHook = useShutdownHook; |
| } |
| |
| public boolean isAdvisorySupport() { |
| return advisorySupport; |
| } |
| |
| /** |
| * Allows the support of advisory messages to be disabled for performance |
| * reasons. |
| * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" |
| */ |
| public void setAdvisorySupport(boolean advisorySupport) { |
| this.advisorySupport = advisorySupport; |
| } |
| |
| public boolean isAnonymousProducerAdvisorySupport() { |
| return anonymousProducerAdvisorySupport; |
| } |
| |
| public void setAnonymousProducerAdvisorySupport(boolean anonymousProducerAdvisorySupport) { |
| this.anonymousProducerAdvisorySupport = anonymousProducerAdvisorySupport; |
| } |
| |
| public List<TransportConnector> getTransportConnectors() { |
| return new ArrayList<>(transportConnectors); |
| } |
| |
| /** |
| * Sets the transport connectors which this broker will listen on for new |
| * clients |
| * |
| * @org.apache.xbean.Property |
| * nestedType="org.apache.activemq.broker.TransportConnector" |
| */ |
| public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception { |
| for (TransportConnector connector : transportConnectors) { |
| addConnector(connector); |
| } |
| } |
| |
| public TransportConnector getTransportConnectorByName(String name){ |
| for (TransportConnector transportConnector : transportConnectors){ |
| if (name.equals(transportConnector.getName())){ |
| return transportConnector; |
| } |
| } |
| return null; |
| } |
| |
| public TransportConnector getTransportConnectorByScheme(String scheme){ |
| for (TransportConnector transportConnector : transportConnectors){ |
| if (scheme.equals(transportConnector.getUri().getScheme())){ |
| return transportConnector; |
| } |
| } |
| return null; |
| } |
| |
| public List<NetworkConnector> getNetworkConnectors() { |
| return new ArrayList<>(networkConnectors); |
| } |
| |
| public List<ProxyConnector> getProxyConnectors() { |
| return new ArrayList<>(proxyConnectors); |
| } |
| |
| /** |
| * Sets the network connectors which this broker will use to connect to |
| * other brokers in a federated network |
| * |
| * @org.apache.xbean.Property |
| * nestedType="org.apache.activemq.network.NetworkConnector" |
| */ |
| public void setNetworkConnectors(List<?> networkConnectors) throws Exception { |
| for (Object connector : networkConnectors) { |
| addNetworkConnector((NetworkConnector) connector); |
| } |
| } |
| |
| /** |
| * Sets the network connectors which this broker will use to connect to |
| * other brokers in a federated network |
| */ |
| public void setProxyConnectors(List<?> proxyConnectors) throws Exception { |
| for (Object connector : proxyConnectors) { |
| addProxyConnector((ProxyConnector) connector); |
| } |
| } |
| |
| public PolicyMap getDestinationPolicy() { |
| return destinationPolicy; |
| } |
| |
| /** |
| * Sets the destination specific policies available either for exact |
| * destinations or for wildcard areas of destinations. |
| */ |
| public void setDestinationPolicy(PolicyMap policyMap) { |
| this.destinationPolicy = policyMap; |
| } |
| |
| public BrokerPlugin[] getPlugins() { |
| return plugins; |
| } |
| |
| /** |
| * Sets a number of broker plugins to install such as for security |
| * authentication or authorization |
| */ |
| public void setPlugins(BrokerPlugin[] plugins) { |
| this.plugins = plugins; |
| } |
| |
| public MessageAuthorizationPolicy getMessageAuthorizationPolicy() { |
| return messageAuthorizationPolicy; |
| } |
| |
| /** |
| * Sets the policy used to decide if the current connection is authorized to |
| * consume a given message |
| */ |
| public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) { |
| this.messageAuthorizationPolicy = messageAuthorizationPolicy; |
| } |
| |
| /** |
| * Delete all messages from the persistent store |
| */ |
| public void deleteAllMessages() throws IOException { |
| getPersistenceAdapter().deleteAllMessages(); |
| } |
| |
| public boolean isDeleteAllMessagesOnStartup() { |
| return deleteAllMessagesOnStartup; |
| } |
| |
| /** |
| * Sets whether or not all messages are deleted on startup - mostly only |
| * useful for testing. |
| * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" |
| */ |
| public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) { |
| this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup; |
| } |
| |
| public URI getVmConnectorURI() { |
| if (vmConnectorURI == null) { |
| try { |
| vmConnectorURI = new URI("vm://" + getBrokerName()); |
| } catch (URISyntaxException e) { |
| LOG.error("Badly formed URI from {}", getBrokerName(), e); |
| } |
| } |
| return vmConnectorURI; |
| } |
| |
| public void setVmConnectorURI(URI vmConnectorURI) { |
| this.vmConnectorURI = vmConnectorURI; |
| } |
| |
| public String getDefaultSocketURIString() { |
| if (started.get()) { |
| if (this.defaultSocketURIString == null) { |
| for (TransportConnector tc:this.transportConnectors) { |
| String result = null; |
| try { |
| result = tc.getPublishableConnectString(); |
| } catch (Exception e) { |
| LOG.warn("Failed to get the ConnectURI for {}", tc, e); |
| } |
| if (result != null) { |
| // find first publishable uri |
| if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) { |
| this.defaultSocketURIString = result; |
| break; |
| } else { |
| // or use the first defined |
| if (this.defaultSocketURIString == null) { |
| this.defaultSocketURIString = result; |
| } |
| } |
| } |
| } |
| |
| } |
| return this.defaultSocketURIString; |
| } |
| return null; |
| } |
| |
| /** |
| * @return Returns the shutdownOnMasterFailure. |
| */ |
| public boolean isShutdownOnMasterFailure() { |
| return shutdownOnMasterFailure; |
| } |
| |
| /** |
| * @param shutdownOnMasterFailure |
| * The shutdownOnMasterFailure to set. |
| */ |
| public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure) { |
| this.shutdownOnMasterFailure = shutdownOnMasterFailure; |
| } |
| |
| public boolean isKeepDurableSubsActive() { |
| return keepDurableSubsActive; |
| } |
| |
| public void setKeepDurableSubsActive(boolean keepDurableSubsActive) { |
| this.keepDurableSubsActive = keepDurableSubsActive; |
| } |
| |
| public boolean isEnableMessageExpirationOnActiveDurableSubs() { |
| return enableMessageExpirationOnActiveDurableSubs; |
| } |
| |
| public void setEnableMessageExpirationOnActiveDurableSubs(boolean enableMessageExpirationOnActiveDurableSubs) { |
| this.enableMessageExpirationOnActiveDurableSubs = enableMessageExpirationOnActiveDurableSubs; |
| } |
| |
| public boolean isUseVirtualTopics() { |
| return useVirtualTopics; |
| } |
| |
| /** |
| * Sets whether or not <a |
| * href="http://activemq.apache.org/virtual-destinations.html">Virtual |
| * Topics</a> should be supported by default if they have not been |
| * explicitly configured. |
| */ |
| public void setUseVirtualTopics(boolean useVirtualTopics) { |
| this.useVirtualTopics = useVirtualTopics; |
| } |
| |
| public DestinationInterceptor[] getDestinationInterceptors() { |
| return destinationInterceptors; |
| } |
| |
| public boolean isUseMirroredQueues() { |
| return useMirroredQueues; |
| } |
| |
| /** |
| * Sets whether or not <a |
| * href="http://activemq.apache.org/mirrored-queues.html">Mirrored |
| * Queues</a> should be supported by default if they have not been |
| * explicitly configured. |
| */ |
| public void setUseMirroredQueues(boolean useMirroredQueues) { |
| this.useMirroredQueues = useMirroredQueues; |
| } |
| |
| /** |
| * Sets the destination interceptors to use |
| */ |
| public void setDestinationInterceptors(DestinationInterceptor[] destinationInterceptors) { |
| this.destinationInterceptors = destinationInterceptors; |
| } |
| |
| public ActiveMQDestination[] getDestinations() { |
| return destinations; |
| } |
| |
| /** |
| * Sets the destinations which should be loaded/created on startup |
| */ |
| public void setDestinations(ActiveMQDestination[] destinations) { |
| this.destinations = destinations; |
| } |
| |
| /** |
| * @return the tempDataStore |
| */ |
| public synchronized PListStore getTempDataStore() { |
| if (tempDataStore == null && !hasStartException()) { |
| if (!isPersistent()) { |
| return null; |
| } |
| |
| try { |
| PersistenceAdapter pa = getPersistenceAdapter(); |
| if( pa!=null && pa instanceof PListStore) { |
| return (PListStore) pa; |
| } |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| |
| try { |
| String clazz = "org.apache.activemq.store.kahadb.plist.PListStoreImpl"; |
| this.tempDataStore = (PListStore) getClass().getClassLoader().loadClass(clazz).getDeclaredConstructor().newInstance(); |
| this.tempDataStore.setDirectory(getTmpDataDirectory()); |
| configureService(tempDataStore); |
| } catch (ClassNotFoundException e) { |
| throw new RuntimeException("Kahadb class PListStoreImpl not found. Add activemq-kahadb jar or set persistent to false on BrokerService.", e); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| } |
| return tempDataStore; |
| } |
| |
| /** |
| * @param tempDataStore |
| * the tempDataStore to set |
| */ |
| public void setTempDataStore(PListStore tempDataStore) { |
| this.tempDataStore = tempDataStore; |
| if (tempDataStore != null) { |
| if (tmpDataDirectory == null) { |
| tmpDataDirectory = tempDataStore.getDirectory(); |
| } else if (tempDataStore.getDirectory() == null) { |
| tempDataStore.setDirectory(tmpDataDirectory); |
| } |
| } |
| configureService(tempDataStore); |
| } |
| |
| public int getPersistenceThreadPriority() { |
| return persistenceThreadPriority; |
| } |
| |
| public void setPersistenceThreadPriority(int persistenceThreadPriority) { |
| this.persistenceThreadPriority = persistenceThreadPriority; |
| } |
| |
| /** |
| * @return the useLocalHostBrokerName |
| */ |
| public boolean isUseLocalHostBrokerName() { |
| return this.useLocalHostBrokerName; |
| } |
| |
| /** |
| * @param useLocalHostBrokerName |
| * the useLocalHostBrokerName to set |
| */ |
| public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName) { |
| this.useLocalHostBrokerName = useLocalHostBrokerName; |
| if (useLocalHostBrokerName && !started.get() && brokerName == null || brokerName.equals(DEFAULT_BROKER_NAME)) { |
| brokerName = LOCAL_HOST_NAME; |
| } |
| } |
| |
| /** |
| * Looks up and lazily creates if necessary the destination for the given |
| * JMS name |
| */ |
| public Destination getDestination(ActiveMQDestination destination) throws Exception { |
| return getBroker().addDestination(getAdminConnectionContext(), destination,false); |
| } |
| |
| public void removeDestination(ActiveMQDestination destination) throws Exception { |
| getBroker().removeDestination(getAdminConnectionContext(), destination, 0); |
| } |
| |
| public int getProducerSystemUsagePortion() { |
| return producerSystemUsagePortion; |
| } |
| |
| public void setProducerSystemUsagePortion(int producerSystemUsagePortion) { |
| this.producerSystemUsagePortion = producerSystemUsagePortion; |
| } |
| |
| public int getConsumerSystemUsagePortion() { |
| return consumerSystemUsagePortion; |
| } |
| |
| public void setConsumerSystemUsagePortion(int consumerSystemUsagePortion) { |
| this.consumerSystemUsagePortion = consumerSystemUsagePortion; |
| } |
| |
| public boolean isSplitSystemUsageForProducersConsumers() { |
| return splitSystemUsageForProducersConsumers; |
| } |
| |
| public void setSplitSystemUsageForProducersConsumers(boolean splitSystemUsageForProducersConsumers) { |
| this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers; |
| } |
| |
| public boolean isMonitorConnectionSplits() { |
| return monitorConnectionSplits; |
| } |
| |
| public void setMonitorConnectionSplits(boolean monitorConnectionSplits) { |
| this.monitorConnectionSplits = monitorConnectionSplits; |
| } |
| |
| public int getTaskRunnerPriority() { |
| return taskRunnerPriority; |
| } |
| |
| public void setTaskRunnerPriority(int taskRunnerPriority) { |
| this.taskRunnerPriority = taskRunnerPriority; |
| } |
| |
| public boolean isDedicatedTaskRunner() { |
| return dedicatedTaskRunner; |
| } |
| |
| public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) { |
| this.dedicatedTaskRunner = dedicatedTaskRunner; |
| } |
| |
| public boolean isCacheTempDestinations() { |
| return cacheTempDestinations; |
| } |
| |
| public void setCacheTempDestinations(boolean cacheTempDestinations) { |
| this.cacheTempDestinations = cacheTempDestinations; |
| } |
| |
| public int getTimeBeforePurgeTempDestinations() { |
| return timeBeforePurgeTempDestinations; |
| } |
| |
| public void setTimeBeforePurgeTempDestinations(int timeBeforePurgeTempDestinations) { |
| this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations; |
| } |
| |
| public boolean isUseTempMirroredQueues() { |
| return useTempMirroredQueues; |
| } |
| |
| public void setUseTempMirroredQueues(boolean useTempMirroredQueues) { |
| this.useTempMirroredQueues = useTempMirroredQueues; |
| } |
| |
| public synchronized JobSchedulerStore getJobSchedulerStore() { |
| |
| // If support is off don't allow any scheduler even is user configured their own. |
| if (!isSchedulerSupport()) { |
| return null; |
| } |
| |
| // If the user configured their own we use it even if persistence is disabled since |
| // we don't know anything about their implementation. |
| if (jobSchedulerStore == null && !hasStartException()) { |
| |
| if (!isPersistent()) { |
| this.jobSchedulerStore = new InMemoryJobSchedulerStore(); |
| configureService(jobSchedulerStore); |
| return this.jobSchedulerStore; |
| } |
| |
| try { |
| PersistenceAdapter pa = getPersistenceAdapter(); |
| if (pa != null) { |
| this.jobSchedulerStore = pa.createJobSchedulerStore(); |
| jobSchedulerStore.setDirectory(getSchedulerDirectoryFile()); |
| configureService(jobSchedulerStore); |
| return this.jobSchedulerStore; |
| } |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } catch (UnsupportedOperationException ex) { |
| // It's ok if the store doesn't implement a scheduler. |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| |
| try { |
| PersistenceAdapter pa = getPersistenceAdapter(); |
| if (pa != null && pa instanceof JobSchedulerStore) { |
| this.jobSchedulerStore = (JobSchedulerStore) pa; |
| configureService(jobSchedulerStore); |
| return this.jobSchedulerStore; |
| } |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| |
| // Load the KahaDB store as a last resort, this only works if KahaDB is |
| // included at runtime, otherwise this will fail. User should disable |
| // scheduler support if this fails. |
| try { |
| String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter"; |
| PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).getDeclaredConstructor().newInstance(); |
| jobSchedulerStore = adaptor.createJobSchedulerStore(); |
| jobSchedulerStore.setDirectory(getSchedulerDirectoryFile()); |
| configureService(jobSchedulerStore); |
| LOG.info("JobScheduler using directory: {}", getSchedulerDirectoryFile()); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| } |
| return jobSchedulerStore; |
| } |
| |
| public void setJobSchedulerStore(JobSchedulerStore jobSchedulerStore) { |
| this.jobSchedulerStore = jobSchedulerStore; |
| configureService(jobSchedulerStore); |
| } |
| |
| // |
| // Implementation methods |
| // ------------------------------------------------------------------------- |
| /** |
| * Handles any lazy-creation helper properties which are added to make |
| * things easier to configure inside environments such as Spring |
| */ |
| protected void processHelperProperties() throws Exception { |
| if (transportConnectorURIs != null) { |
| for (String uri : transportConnectorURIs) { |
| addConnector(uri); |
| } |
| } |
| if (networkConnectorURIs != null) { |
| for (String uri : networkConnectorURIs) { |
| addNetworkConnector(uri); |
| } |
| } |
| if (jmsBridgeConnectors != null) { |
| for (JmsConnector jmsBridgeConnector : jmsBridgeConnectors) { |
| addJmsConnector(jmsBridgeConnector); |
| } |
| } |
| } |
| |
| /** |
| * Check that the store usage limit is not greater than max usable |
| * space and adjust if it is |
| */ |
| protected void checkStoreUsageLimits() throws Exception { |
| final SystemUsage usage = getSystemUsage(); |
| |
| if (getPersistenceAdapter() != null) { |
| PersistenceAdapter adapter = getPersistenceAdapter(); |
| checkUsageLimit(adapter.getDirectory(), usage.getStoreUsage(), usage.getStoreUsage().getPercentLimit()); |
| |
| long maxJournalFileSize = 0; |
| long storeLimit = usage.getStoreUsage().getLimit(); |
| |
| if (adapter instanceof JournaledStore) { |
| maxJournalFileSize = ((JournaledStore) adapter).getJournalMaxFileLength(); |
| } |
| |
| if (storeLimit > 0 && storeLimit < maxJournalFileSize) { |
| LOG.error("Store limit is {} mb, whilst the max journal file size for the store is {} mb, the store will not accept any data when used.", |
| (storeLimit / (1024 * 1024)), (maxJournalFileSize / (1024 * 1024))); |
| |
| } |
| } |
| } |
| |
| /** |
| * Check that temporary usage limit is not greater than max usable |
| * space and adjust if it is |
| */ |
| protected void checkTmpStoreUsageLimits() throws Exception { |
| final SystemUsage usage = getSystemUsage(); |
| |
| File tmpDir = getTmpDataDirectory(); |
| |
| if (tmpDir != null) { |
| checkUsageLimit(tmpDir, usage.getTempUsage(), usage.getTempUsage().getPercentLimit()); |
| |
| if (isPersistent()) { |
| long maxJournalFileSize; |
| |
| PListStore store = usage.getTempUsage().getStore(); |
| if (store != null && store instanceof JournaledStore) { |
| maxJournalFileSize = ((JournaledStore) store).getJournalMaxFileLength(); |
| } else { |
| maxJournalFileSize = DEFAULT_MAX_FILE_LENGTH; |
| } |
| long storeLimit = usage.getTempUsage().getLimit(); |
| |
| if (storeLimit > 0 && storeLimit < maxJournalFileSize) { |
| LOG.error("Temporary Store limit {} mb, whilst the max journal file size for the temporary store is {} mb, the temp store will not accept any data when used.", |
| (storeLimit / (1024 * 1024)), (maxJournalFileSize / (1024 * 1024))); |
| } |
| } |
| } |
| } |
| |
| protected void checkUsageLimit(File dir, PercentLimitUsage<?> storeUsage, int percentLimit) throws ConfigurationException { |
| if (dir != null) { |
| dir = StoreUtil.findParentDirectory(dir); |
| String storeName = storeUsage instanceof StoreUsage ? "Store" : "Temporary Store"; |
| long storeLimit = storeUsage.getLimit(); |
| long storeCurrent = storeUsage.getUsage(); |
| long totalSpace = storeUsage.getTotal() > 0 ? storeUsage.getTotal() : dir.getTotalSpace(); |
| long totalUsableSpace = (storeUsage.getTotal() > 0 ? storeUsage.getTotal() : dir.getUsableSpace()) + storeCurrent; |
| if (totalUsableSpace < 0 || totalSpace < 0) { |
| LOG.error("File system space reported by {} was negative, possibly a huge file system, set a sane usage.total to provide some guidance", dir); |
| final String message = "File system space reported by: " + dir + " was negative, possibly a huge file system, set a sane usage.total to provide some guidance"; |
| throw new ConfigurationException(message); |
| } |
| //compute byte value of the percent limit |
| long bytePercentLimit = totalSpace * percentLimit / 100; |
| int oneMeg = 1024 * 1024; |
| |
| //Check if the store limit is less than the percent Limit that was set and also |
| //the usable space...this means we can grow the store larger |
| //Changes in partition size (total space) as well as changes in usable space should |
| //be detected here |
| if (diskUsageCheckRegrowThreshold > -1 && percentLimit > 0 |
| && storeUsage.getTotal() == 0 |
| && storeLimit < bytePercentLimit && storeLimit < totalUsableSpace){ |
| |
| // set the limit to be bytePercentLimit or usableSpace if |
| // usableSpace is less than the percentLimit |
| long newLimit = Math.min(bytePercentLimit, totalUsableSpace); |
| |
| //To prevent changing too often, check threshold |
| if (newLimit - storeLimit >= diskUsageCheckRegrowThreshold) { |
| LOG.info("Usable disk space has been increased, attempting to regrow {} limit to {}% of the partition size", |
| storeName, percentLimit); |
| storeUsage.setLimit(newLimit); |
| LOG.info("storeUsage limit has been increase to {}% ({} mb) of the partition size.", |
| (newLimit * 100 / totalSpace), (newLimit / oneMeg)); |
| } |
| |
| //check if the limit is too large for the amount of usable space |
| } else if (storeLimit > totalUsableSpace) { |
| final String message = storeName + " limit is " + storeLimit / oneMeg |
| + " mb (current store usage is " + storeCurrent / oneMeg |
| + " mb). The data directory: " + dir.getAbsolutePath() |
| + " only has " + totalUsableSpace / oneMeg |
| + " mb of usable space."; |
| |
| if (!isAdjustUsageLimits()) { |
| LOG.error(message); |
| throw new ConfigurationException(message); |
| } |
| |
| if (percentLimit > 0) { |
| LOG.warn("{} limit has been set to {}% ({} mb) of the partition size but there is not enough usable space." + |
| "The current store limit (which may have been adjusted by a previous usage limit check) is set to ({} mb) " + |
| "but only {}% ({} mb) is available - resetting limit", |
| storeName, |
| percentLimit, |
| (bytePercentLimit / oneMeg), |
| (storeLimit / oneMeg), |
| (totalUsableSpace * 100 / totalSpace), |
| (totalUsableSpace / oneMeg)); |
| } else { |
| LOG.warn("{} - resetting to maximum available disk space: {} mb", message, (totalUsableSpace / oneMeg)); |
| } |
| storeUsage.setLimit(totalUsableSpace); |
| } |
| } |
| } |
| |
| /** |
| * Schedules a periodic task based on schedulePeriodForDiskLimitCheck to |
| * update store and temporary store limits if the amount of available space |
| * plus current store size is less than the existing configured limit |
| */ |
| protected void scheduleDiskUsageLimitsCheck() throws IOException { |
| if (schedulePeriodForDiskUsageCheck > 0 && |
| (getPersistenceAdapter() != null || getTmpDataDirectory() != null)) { |
| Runnable diskLimitCheckTask = () -> { |
| try { |
| checkStoreUsageLimits(); |
| } catch (Throwable e) { |
| LOG.error("Failed to check persistent disk usage limits", e); |
| } |
| |
| try { |
| checkTmpStoreUsageLimits(); |
| } catch (Throwable e) { |
| LOG.error("Failed to check temporary store usage limits", e); |
| } |
| }; |
| scheduler.executePeriodically(diskLimitCheckTask, schedulePeriodForDiskUsageCheck); |
| } |
| } |
| |
| protected void checkMemorySystemUsageLimits() throws Exception { |
| final SystemUsage usage = getSystemUsage(); |
| long memLimit = usage.getMemoryUsage().getLimit(); |
| long jvmLimit = Runtime.getRuntime().maxMemory(); |
| |
| if (memLimit > jvmLimit) { |
| final String message = "Memory Usage for the Broker (" + memLimit / (1024 * 1024) |
| + "mb) is more than the maximum available for the JVM: " + jvmLimit / (1024 * 1024); |
| |
| if (adjustUsageLimits) { |
| usage.getMemoryUsage().setPercentOfJvmHeap(70); |
| LOG.warn("{} mb - resetting to 70% of maximum available: {}", |
| message, (usage.getMemoryUsage().getLimit() / (1024 * 1024))); |
| } else { |
| LOG.error(message); |
| throw new ConfigurationException(message); |
| } |
| } |
| } |
| |
| protected void checkStoreSystemUsageLimits() throws Exception { |
| final SystemUsage usage = getSystemUsage(); |
| |
| //Check the persistent store and temp store limits if they exist |
| //and schedule a periodic check to update disk limits if |
| //schedulePeriodForDiskLimitCheck is set |
| checkStoreUsageLimits(); |
| checkTmpStoreUsageLimits(); |
| scheduleDiskUsageLimitsCheck(); |
| |
| if (getJobSchedulerStore() != null) { |
| JobSchedulerStore scheduler = getJobSchedulerStore(); |
| File schedulerDir = scheduler.getDirectory(); |
| if (schedulerDir != null) { |
| |
| String schedulerDirPath = schedulerDir.getAbsolutePath(); |
| if (!schedulerDir.isAbsolute()) { |
| schedulerDir = new File(schedulerDirPath); |
| } |
| |
| while (schedulerDir != null && !schedulerDir.isDirectory()) { |
| schedulerDir = schedulerDir.getParentFile(); |
| } |
| long schedulerLimit = usage.getJobSchedulerUsage().getLimit(); |
| long dirFreeSpace = 0; |
| if (schedulerDir != null) { |
| dirFreeSpace = schedulerDir.getUsableSpace(); |
| } |
| if (schedulerLimit > dirFreeSpace) { |
| LOG.warn("Job Scheduler Store limit is {} mb, whilst the data directory: {} " + |
| "only has {} mb of usage space - resetting to {} mb.", |
| schedulerLimit / (1024 * 1024), |
| schedulerDir.getAbsolutePath(), |
| dirFreeSpace / (1024 * 1024), |
| dirFreeSpace / (1042 * 1024)); |
| usage.getJobSchedulerUsage().setLimit(dirFreeSpace); |
| } |
| } |
| } |
| } |
| |
| public void stopAllConnectors(ServiceStopper stopper) { |
| for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) { |
| NetworkConnector connector = iter.next(); |
| unregisterNetworkConnectorMBean(connector); |
| stopper.stop(connector); |
| } |
| for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) { |
| ProxyConnector connector = iter.next(); |
| stopper.stop(connector); |
| } |
| for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) { |
| JmsConnector connector = iter.next(); |
| stopper.stop(connector); |
| } |
| for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) { |
| TransportConnector connector = iter.next(); |
| try { |
| unregisterConnectorMBean(connector); |
| } catch (IOException e) { |
| } |
| stopper.stop(connector); |
| } |
| } |
| |
| protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException { |
| try { |
| ObjectName objectName = createConnectorObjectName(connector); |
| connector = connector.asManagedConnector(getManagementContext(), objectName); |
| ConnectorViewMBean view = new ConnectorView(connector); |
| AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); |
| return connector; |
| } catch (Throwable e) { |
| throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e, e); |
| } |
| } |
| |
| protected void unregisterConnectorMBean(TransportConnector connector) throws IOException { |
| if (isUseJmx()) { |
| try { |
| ObjectName objectName = createConnectorObjectName(connector); |
| getManagementContext().unregisterMBean(objectName); |
| } catch (Throwable e) { |
| throw IOExceptionSupport.create( |
| "Transport Connector could not be unregistered in JMX: " + e.getMessage(), e); |
| } |
| } |
| } |
| |
| protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException { |
| return adaptor; |
| } |
| |
| protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException { |
| if (isUseJmx()) {} |
| } |
| |
| private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException { |
| return BrokerMBeanSupport.createConnectorName(getBrokerObjectName(), "clientConnectors", connector.getName()); |
| } |
| |
| public void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException { |
| NetworkConnectorViewMBean view = new NetworkConnectorView(connector); |
| try { |
| ObjectName objectName = createNetworkConnectorObjectName(connector); |
| connector.setObjectName(objectName); |
| AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); |
| } catch (Throwable e) { |
| throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e); |
| } |
| } |
| |
| public ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException { |
| return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "networkConnectors", connector.getName()); |
| } |
| |
| public ObjectName createDuplexNetworkConnectorObjectName(String transport) throws MalformedObjectNameException { |
| return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "duplexNetworkConnectors", transport); |
| } |
| |
| protected void unregisterNetworkConnectorMBean(NetworkConnector connector) { |
| if (isUseJmx()) { |
| try { |
| ObjectName objectName = createNetworkConnectorObjectName(connector); |
| getManagementContext().unregisterMBean(objectName); |
| } catch (Exception e) { |
| LOG.warn("Network Connector could not be unregistered from JMX due {}. This exception is ignored.", e.getMessage(), e); |
| } |
| } |
| } |
| |
| protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException { |
| ProxyConnectorView view = new ProxyConnectorView(connector); |
| try { |
| ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "proxyConnectors", connector.getName()); |
| AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); |
| } catch (Throwable e) { |
| throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e); |
| } |
| } |
| |
| protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException { |
| JmsConnectorView view = new JmsConnectorView(connector); |
| try { |
| ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "jmsConnectors", connector.getName()); |
| AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); |
| } catch (Throwable e) { |
| throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e); |
| } |
| } |
| |
| /** |
| * Factory method to create a new broker |
| * |
| * @throws Exception |
| */ |
| protected Broker createBroker() throws Exception { |
| regionBroker = createRegionBroker(); |
| Broker broker = addInterceptors(regionBroker); |
| // Add a filter that will stop access to the broker once stopped |
| broker = new MutableBrokerFilter(broker) { |
| Broker old; |
| |
| @Override |
| public void stop() throws Exception { |
| old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) { |
| // Just ignore additional stop actions. |
| @Override |
| public void stop() throws Exception { |
| } |
| }); |
| old.stop(); |
| } |
| |
| @Override |
| public void start() throws Exception { |
| if (forceStart && old != null) { |
| this.next.set(old); |
| } |
| getNext().start(); |
| } |
| }; |
| return broker; |
| } |
| |
| /** |
| * Factory method to create the core region broker onto which interceptors |
| * are added |
| * |
| * @throws Exception |
| */ |
| protected Broker createRegionBroker() throws Exception { |
| if (destinationInterceptors == null) { |
| destinationInterceptors = createDefaultDestinationInterceptor(); |
| } |
| configureServices(destinationInterceptors); |
| DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors); |
| if (destinationFactory == null) { |
| destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter()); |
| } |
| return createRegionBroker(destinationInterceptor); |
| } |
| |
| protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException { |
| RegionBroker regionBroker; |
| if (isUseJmx()) { |
| try { |
| regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(), |
| getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor()); |
| } catch(MalformedObjectNameException me){ |
| LOG.warn("Cannot create ManagedRegionBroker due {}", me.getMessage(), me); |
| throw new IOException(me); |
| } |
| } else { |
| regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, |
| destinationInterceptor,getScheduler(),getExecutor()); |
| } |
| destinationFactory.setRegionBroker(regionBroker); |
| regionBroker.setKeepDurableSubsActive(keepDurableSubsActive); |
| regionBroker.setBrokerName(getBrokerName()); |
| regionBroker.getDestinationStatistics().setEnabled(enableStatistics); |
| regionBroker.setAllowTempAutoCreationOnSend(isAllowTempAutoCreationOnSend()); |
| if (brokerId != null) { |
| regionBroker.setBrokerId(brokerId); |
| } |
| return regionBroker; |
| } |
| |
| /** |
| * Create the default destination interceptor |
| */ |
| protected DestinationInterceptor[] createDefaultDestinationInterceptor() { |
| List<DestinationInterceptor> answer = new ArrayList<>(); |
| if (isUseVirtualTopics()) { |
| VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor(); |
| VirtualTopic virtualTopic = new VirtualTopic(); |
| virtualTopic.setName("VirtualTopic.>"); |
| VirtualDestination[] virtualDestinations = { virtualTopic }; |
| interceptor.setVirtualDestinations(virtualDestinations); |
| answer.add(interceptor); |
| } |
| if (isUseMirroredQueues()) { |
| MirroredQueue interceptor = new MirroredQueue(); |
| answer.add(interceptor); |
| } |
| DestinationInterceptor[] array = new DestinationInterceptor[answer.size()]; |
| answer.toArray(array); |
| return array; |
| } |
| |
| /** |
| * Strategy method to add interceptors to the broker |
| * |
| * @throws IOException |
| */ |
| protected Broker addInterceptors(Broker broker) throws Exception { |
| if (isSchedulerSupport()) { |
| SchedulerBroker sb = new SchedulerBroker(this, broker, getJobSchedulerStore()); |
| sb.setMaxRepeatAllowed(maxSchedulerRepeatAllowed); |
| if (isUseJmx()) { |
| JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler()); |
| try { |
| ObjectName objectName = BrokerMBeanSupport.createJobSchedulerServiceName(getBrokerObjectName()); |
| AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); |
| this.adminView.setJMSJobScheduler(objectName); |
| } catch (Throwable e) { |
| throw IOExceptionSupport.create("JobScheduler could not be registered in JMX: " |
| + e.getMessage(), e); |
| } |
| } |
| broker = sb; |
| } |
| if (isUseJmx()) { |
| HealthViewMBean statusView = new HealthView((ManagedRegionBroker)getRegionBroker()); |
| try { |
| ObjectName objectName = BrokerMBeanSupport.createHealthServiceName(getBrokerObjectName()); |
| AnnotatedMBean.registerMBean(getManagementContext(), statusView, objectName); |
| } catch (Throwable e) { |
| throw IOExceptionSupport.create("Status MBean could not be registered in JMX: " |
| + e.getMessage(), e); |
| } |
| } |
| if (isAdvisorySupport()) { |
| broker = new AdvisoryBroker(broker); |
| } |
| broker = new CompositeDestinationBroker(broker); |
| broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore()); |
| if (isPopulateJMSXUserID()) { |
| UserIDBroker userIDBroker = new UserIDBroker(broker); |
| userIDBroker.setUseAuthenticatePrincipal(isUseAuthenticatedPrincipalForJMSXUserID()); |
| broker = userIDBroker; |
| } |
| if (isMonitorConnectionSplits()) { |
| broker = new ConnectionSplitBroker(broker); |
| } |
| if (plugins != null) { |
| for (int i = 0; i < plugins.length; i++) { |
| BrokerPlugin plugin = plugins[i]; |
| broker = plugin.installPlugin(broker); |
| } |
| } |
| return broker; |
| } |
| |
| protected PersistenceAdapter createPersistenceAdapter() throws IOException { |
| if (isPersistent()) { |
| PersistenceAdapterFactory fac = getPersistenceFactory(); |
| if (fac != null) { |
| return fac.createPersistenceAdapter(); |
| } else { |
| try { |
| String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter"; |
| PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance(); |
| File dir = new File(getBrokerDataDirectory(),"KahaDB"); |
| adaptor.setDirectory(dir); |
| return adaptor; |
| } catch (Throwable e) { |
| throw IOExceptionSupport.create(e); |
| } |
| } |
| } else { |
| return new MemoryPersistenceAdapter(); |
| } |
| } |
| |
| protected ObjectName createBrokerObjectName() throws MalformedObjectNameException { |
| return BrokerMBeanSupport.createBrokerObjectName(getManagementContext().getJmxDomainName(), getBrokerName()); |
| } |
| |
| protected TransportConnector createTransportConnector(URI brokerURI) throws Exception { |
| TransportServer transport = TransportFactorySupport.bind(this, brokerURI); |
| return new TransportConnector(transport); |
| } |
| |
| /** |
| * Extracts the port from the options |
| */ |
| protected Object getPort(Map<?,?> options) { |
| Object port = options.get("port"); |
| if (port == null) { |
| port = DEFAULT_PORT; |
| LOG.warn("No port specified so defaulting to: {}", port); |
| } |
| return port; |
| } |
| |
| protected void addShutdownHook() { |
| if (useShutdownHook) { |
| shutdownHook = new Thread("ActiveMQ ShutdownHook") { |
| @Override |
| public void run() { |
| containerShutdown(); |
| } |
| }; |
| Runtime.getRuntime().addShutdownHook(shutdownHook); |
| } |
| } |
| |
| protected void removeShutdownHook() { |
| if (shutdownHook != null) { |
| try { |
| Runtime.getRuntime().removeShutdownHook(shutdownHook); |
| } catch (Exception e) { |
| LOG.debug("Caught exception, must be shutting down. This exception is ignored.", e); |
| } |
| } |
| } |
| |
| /** |
| * Sets hooks to be executed when broker shut down |
| * |
| * @org.apache.xbean.Property |
| */ |
| public void setShutdownHooks(List<Runnable> hooks) throws Exception { |
| for (Runnable hook : hooks) { |
| addShutdownHook(hook); |
| } |
| } |
| |
| /** |
| * Causes a clean shutdown of the container when the VM is being shut down |
| */ |
| protected void containerShutdown() { |
| try { |
| stop(); |
| } catch (IOException e) { |
| Throwable linkedException = e.getCause(); |
| if (linkedException != null) { |
| logError("Failed to shut down: " + e + ". Reason: " + linkedException, linkedException); |
| } else { |
| logError("Failed to shut down: " + e, e); |
| } |
| if (!useLoggingForShutdownErrors) { |
| e.printStackTrace(System.err); |
| } |
| } catch (Exception e) { |
| logError("Failed to shut down: " + e, e); |
| } |
| } |
| |
| protected void logError(String message, Throwable e) { |
| if (useLoggingForShutdownErrors) { |
| LOG.error("Failed to shut down", e); |
| } else { |
| System.err.println("Failed to shut down: " + e); |
| } |
| } |
| |
| /** |
| * Starts any configured destinations on startup |
| */ |
| protected void startDestinations() throws Exception { |
| if (destinations != null) { |
| ConnectionContext adminConnectionContext = getAdminConnectionContext(); |
| for (int i = 0; i < destinations.length; i++) { |
| ActiveMQDestination destination = destinations[i]; |
| getBroker().addDestination(adminConnectionContext, destination,true); |
| } |
| } |
| if (isUseVirtualTopics()) { |
| startVirtualConsumerDestinations(); |
| } |
| } |
| |
| /** |
| * Returns the broker's administration connection context used for |
| * configuring the broker at startup |
| */ |
| public ConnectionContext getAdminConnectionContext() throws Exception { |
| return BrokerSupport.getConnectionContext(getBroker()); |
| } |
| |
| protected void startManagementContext() throws Exception { |
| getManagementContext().setBrokerName(brokerName); |
| getManagementContext().start(); |
| adminView = new BrokerView(this, null); |
| ObjectName objectName = getBrokerObjectName(); |
| AnnotatedMBean.registerMBean(getManagementContext(), adminView, objectName); |
| } |
| |
| /** |
| * Start all transport and network connections, proxies and bridges |
| * |
| * @throws Exception |
| */ |
| public void startAllConnectors() throws Exception { |
| final Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations(); |
| List<TransportConnector> al = new ArrayList<>(); |
| for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) { |
| TransportConnector connector = iter.next(); |
| al.add(startTransportConnector(connector)); |
| } |
| if (al.size() > 0) { |
| // let's clear the transportConnectors list and replace it with |
| // the started transportConnector instances |
| this.transportConnectors.clear(); |
| setTransportConnectors(al); |
| } |
| this.slave = false; |
| if (!stopped.get()) { |
| ThreadPoolExecutor networkConnectorStartExecutor = null; |
| if (isNetworkConnectorStartAsync()) { |
| // spin up as many threads as needed |
| networkConnectorStartExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, |
| 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), |
| new ThreadFactory() { |
| int count=0; |
| @Override |
| public Thread newThread(Runnable runnable) { |
| Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++)); |
| thread.setDaemon(true); |
| return thread; |
| } |
| }); |
| } |
| |
| for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) { |
| final NetworkConnector connector = iter.next(); |
| connector.setLocalUri(getVmConnectorURI()); |
| startNetworkConnector(connector, durableDestinations, networkConnectorStartExecutor); |
| } |
| if (networkConnectorStartExecutor != null) { |
| // executor done when enqueued tasks are complete |
| ThreadPoolUtils.shutdown(networkConnectorStartExecutor); |
| } |
| |
| for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) { |
| ProxyConnector connector = iter.next(); |
| connector.start(); |
| } |
| for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) { |
| JmsConnector connector = iter.next(); |
| connector.start(); |
| } |
| for (Service service : services) { |
| configureService(service); |
| service.start(); |
| } |
| } |
| } |
| |
| public void startNetworkConnector(final NetworkConnector connector, |
| final ThreadPoolExecutor networkConnectorStartExecutor) throws Exception { |
| startNetworkConnector(connector, getBroker().getDurableDestinations(), networkConnectorStartExecutor); |
| } |
| |
| public void startNetworkConnector(final NetworkConnector connector, |
| final Set<ActiveMQDestination> durableDestinations, |
| final ThreadPoolExecutor networkConnectorStartExecutor) throws Exception { |
| connector.setBrokerName(getBrokerName()); |
| //set the durable destinations to match the broker if not set on the connector |
| if (connector.getDurableDestinations() == null) { |
| connector.setDurableDestinations(durableDestinations); |
| } |
| String defaultSocketURI = getDefaultSocketURIString(); |
| if (defaultSocketURI != null) { |
| connector.setBrokerURL(defaultSocketURI); |
| } |
| //If using the runtime plugin to start a network connector then the mbean needs |
| //to be added, under normal start it will already exist so check for InstanceNotFoundException |
| if (isUseJmx()) { |
| ObjectName networkMbean = createNetworkConnectorObjectName(connector); |
| try { |
| getManagementContext().getObjectInstance(networkMbean); |
| } catch (InstanceNotFoundException e) { |
| LOG.debug("Network connector MBean {} not found, registering", networkMbean); |
| registerNetworkConnectorMBean(connector); |
| } |
| } |
| if (networkConnectorStartExecutor != null) { |
| networkConnectorStartExecutor.execute(new Runnable() { |
| @Override |
| public void run() { |
| try { |
| LOG.info("Async start of {}", connector); |
| connector.start(); |
| } catch(Exception e) { |
| LOG.error("Async start of network connector: {} failed", connector, e); |
| } |
| } |
| }); |
| } else { |
| connector.start(); |
| } |
| } |
| |
| public TransportConnector startTransportConnector(TransportConnector connector) throws Exception { |
| connector.setBrokerService(this); |
| connector.setTaskRunnerFactory(getTaskRunnerFactory()); |
| MessageAuthorizationPolicy policy = getMessageAuthorizationPolicy(); |
| if (policy != null) { |
| connector.setMessageAuthorizationPolicy(policy); |
| } |
| if (isUseJmx()) { |
| connector = registerConnectorMBean(connector); |
| } |
| connector.getStatistics().setEnabled(enableStatistics); |
| connector.start(); |
| return connector; |
| } |
| |
| /** |
| * Perform any custom dependency injection |
| */ |
| protected void configureServices(Object[] services) { |
| for (Object service : services) { |
| configureService(service); |
| } |
| } |
| |
| /** |
| * Perform any custom dependency injection |
| */ |
| protected void configureService(Object service) { |
| if (service instanceof BrokerServiceAware) { |
| BrokerServiceAware serviceAware = (BrokerServiceAware) service; |
| serviceAware.setBrokerService(this); |
| } |
| } |
| |
| public void handleIOException(IOException exception) { |
| if (ioExceptionHandler != null) { |
| ioExceptionHandler.handle(exception); |
| } else { |
| LOG.info("No IOExceptionHandler registered, ignoring IO exception", exception); |
| } |
| } |
| |
| protected void startVirtualConsumerDestinations() throws Exception { |
| checkStartException(); |
| ConnectionContext adminConnectionContext = getAdminConnectionContext(); |
| Set<ActiveMQDestination> destinations = destinationFactory.getDestinations(); |
| DestinationFilter filter = getVirtualTopicConsumerDestinationFilter(); |
| if (!destinations.isEmpty()) { |
| for (ActiveMQDestination destination : destinations) { |
| if (filter.matches(destination) == true) { |
| broker.addDestination(adminConnectionContext, destination, false); |
| } |
| } |
| } |
| } |
| |
| private DestinationFilter getVirtualTopicConsumerDestinationFilter() { |
| // created at startup, so no sync needed |
| if (virtualConsumerDestinationFilter == null) { |
| Set <ActiveMQQueue> consumerDestinations = new HashSet<>(); |
| if (destinationInterceptors != null) { |
| for (DestinationInterceptor interceptor : destinationInterceptors) { |
| if (interceptor instanceof VirtualDestinationInterceptor) { |
| VirtualDestinationInterceptor virtualDestinationInterceptor = (VirtualDestinationInterceptor) interceptor; |
| for (VirtualDestination virtualDestination: virtualDestinationInterceptor.getVirtualDestinations()) { |
| if (virtualDestination instanceof VirtualTopic) { |
| consumerDestinations.add(new ActiveMQQueue(((VirtualTopic) virtualDestination).getPrefix() + DestinationFilter.ANY_DESCENDENT)); |
| } |
| if (isUseVirtualDestSubs()) { |
| try { |
| broker.virtualDestinationAdded(getAdminConnectionContext(), virtualDestination); |
| LOG.debug("Adding virtual destination: {}", virtualDestination); |
| } catch (Exception e) { |
| LOG.warn("Could not fire virtual destination consumer advisory", e); |
| } |
| } |
| } |
| } |
| } |
| } |
| ActiveMQQueue filter = new ActiveMQQueue(); |
| filter.setCompositeDestinations(consumerDestinations.toArray(new ActiveMQDestination[]{})); |
| virtualConsumerDestinationFilter = DestinationFilter.parseFilter(filter); |
| } |
| return virtualConsumerDestinationFilter; |
| } |
| |
| protected synchronized ThreadPoolExecutor getExecutor() { |
| if (this.executor == null) { |
| this.executor = new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { |
| |
| private long i = 0; |
| |
| @Override |
| public Thread newThread(Runnable runnable) { |
| this.i++; |
| Thread thread = new Thread(runnable, "ActiveMQ BrokerService.worker." + this.i); |
| thread.setDaemon(true); |
| thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { |
| @Override |
| public void uncaughtException(final Thread t, final Throwable e) { |
| LOG.error("Error in thread '{}'", t.getName(), e); |
| } |
| }); |
| return thread; |
| } |
| }, new RejectedExecutionHandler() { |
| @Override |
| public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) { |
| try { |
| if (!executor.getQueue().offer(r, 60, TimeUnit.SECONDS)) { |
| throw new RejectedExecutionException("Timed Out while attempting to enqueue Task."); |
| } |
| } catch (InterruptedException e) { |
| throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker"); |
| } |
| } |
| }); |
| } |
| return this.executor; |
| } |
| |
| public synchronized Scheduler getScheduler() { |
| if (this.scheduler==null) { |
| this.scheduler = new Scheduler("ActiveMQ Broker["+getBrokerName()+"] Scheduler"); |
| try { |
| this.scheduler.start(); |
| } catch (Exception e) { |
| LOG.error("Failed to start Scheduler", e); |
| } |
| } |
| return this.scheduler; |
| } |
| |
| public Broker getRegionBroker() { |
| return regionBroker; |
| } |
| |
| public void setRegionBroker(Broker regionBroker) { |
| this.regionBroker = regionBroker; |
| } |
| |
| public final void removePreShutdownHook(final Runnable hook) { |
| preShutdownHooks.remove(hook); |
| } |
| |
| public void addShutdownHook(Runnable hook) { |
| synchronized (shutdownHooks) { |
| shutdownHooks.add(hook); |
| } |
| } |
| |
| public void removeShutdownHook(Runnable hook) { |
| synchronized (shutdownHooks) { |
| shutdownHooks.remove(hook); |
| } |
| } |
| |
| public boolean isSystemExitOnShutdown() { |
| return systemExitOnShutdown; |
| } |
| |
| /** |
| * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" |
| */ |
| public void setSystemExitOnShutdown(boolean systemExitOnShutdown) { |
| this.systemExitOnShutdown = systemExitOnShutdown; |
| } |
| |
| public int getSystemExitOnShutdownExitCode() { |
| return systemExitOnShutdownExitCode; |
| } |
| |
| public void setSystemExitOnShutdownExitCode(int systemExitOnShutdownExitCode) { |
| this.systemExitOnShutdownExitCode = systemExitOnShutdownExitCode; |
| } |
| |
| public SslContext getSslContext() { |
| return sslContext; |
| } |
| |
| public void setSslContext(SslContext sslContext) { |
| this.sslContext = sslContext; |
| } |
| |
| public boolean isShutdownOnSlaveFailure() { |
| return shutdownOnSlaveFailure; |
| } |
| |
| /** |
| * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" |
| */ |
| public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) { |
| this.shutdownOnSlaveFailure = shutdownOnSlaveFailure; |
| } |
| |
| public boolean isWaitForSlave() { |
| return waitForSlave; |
| } |
| |
| /** |
| * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" |
| */ |
| public void setWaitForSlave(boolean waitForSlave) { |
| this.waitForSlave = waitForSlave; |
| } |
| |
| public long getWaitForSlaveTimeout() { |
| return this.waitForSlaveTimeout; |
| } |
| |
| public void setWaitForSlaveTimeout(long waitForSlaveTimeout) { |
| this.waitForSlaveTimeout = waitForSlaveTimeout; |
| } |
| |
| /** |
| * Get the passiveSlave |
| * @return the passiveSlave |
| */ |
| public boolean isPassiveSlave() { |
| return this.passiveSlave; |
| } |
| |
| /** |
| * Set the passiveSlave |
| * @param passiveSlave the passiveSlave to set |
| * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" |
| */ |
| public void setPassiveSlave(boolean passiveSlave) { |
| this.passiveSlave = passiveSlave; |
| } |
| |
| /** |
| * override the Default IOException handler, called when persistence adapter |
| * has experiences File or JDBC I/O Exceptions |
| * |
| * @param ioExceptionHandler |
| */ |
| public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) { |
| configureService(ioExceptionHandler); |
| this.ioExceptionHandler = ioExceptionHandler; |
| } |
| |
| public IOExceptionHandler getIoExceptionHandler() { |
| return ioExceptionHandler; |
| } |
| |
| /** |
| * @return the schedulerSupport |
| */ |
| public boolean isSchedulerSupport() { |
| return this.schedulerSupport; |
| } |
| |
| /** |
| * @param schedulerSupport the schedulerSupport to set |
| * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" |
| */ |
| public void setSchedulerSupport(boolean schedulerSupport) { |
| this.schedulerSupport = schedulerSupport; |
| } |
| |
| /** |
| * @return the schedulerDirectory |
| */ |
| public File getSchedulerDirectoryFile() { |
| if (this.schedulerDirectoryFile == null) { |
| this.schedulerDirectoryFile = new File(getBrokerDataDirectory(), "scheduler"); |
| } |
| return schedulerDirectoryFile; |
| } |
| |
| /** |
| * @param schedulerDirectory the schedulerDirectory to set |
| */ |
| public void setSchedulerDirectoryFile(File schedulerDirectory) { |
| this.schedulerDirectoryFile = schedulerDirectory; |
| } |
| |
| public void setSchedulerDirectory(String schedulerDirectory) { |
| setSchedulerDirectoryFile(new File(schedulerDirectory)); |
| } |
| |
| public int getSchedulePeriodForDestinationPurge() { |
| return this.schedulePeriodForDestinationPurge; |
| } |
| |
| public void setSchedulePeriodForDestinationPurge(int schedulePeriodForDestinationPurge) { |
| this.schedulePeriodForDestinationPurge = schedulePeriodForDestinationPurge; |
| } |
| |
| /** |
| * @param schedulePeriodForDiskUsageCheck |
| */ |
| public void setSchedulePeriodForDiskUsageCheck( |
| int schedulePeriodForDiskUsageCheck) { |
| this.schedulePeriodForDiskUsageCheck = schedulePeriodForDiskUsageCheck; |
| } |
| |
| public int getDiskUsageCheckRegrowThreshold() { |
| return diskUsageCheckRegrowThreshold; |
| } |
| |
| /** |
| * @param diskUsageCheckRegrowThreshold |
| * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" |
| */ |
| public void setDiskUsageCheckRegrowThreshold(int diskUsageCheckRegrowThreshold) { |
| this.diskUsageCheckRegrowThreshold = diskUsageCheckRegrowThreshold; |
| } |
| |
| public int getMaxPurgedDestinationsPerSweep() { |
| return this.maxPurgedDestinationsPerSweep; |
| } |
| |
| public void setMaxPurgedDestinationsPerSweep(int maxPurgedDestinationsPerSweep) { |
| this.maxPurgedDestinationsPerSweep = maxPurgedDestinationsPerSweep; |
| } |
| |
| public BrokerContext getBrokerContext() { |
| return brokerContext; |
| } |
| |
| public void setBrokerContext(BrokerContext brokerContext) { |
| this.brokerContext = brokerContext; |
| } |
| |
| public void setBrokerId(String brokerId) { |
| this.brokerId = new BrokerId(brokerId); |
| } |
| |
| public boolean isUseAuthenticatedPrincipalForJMSXUserID() { |
| return useAuthenticatedPrincipalForJMSXUserID; |
| } |
| |
| public void setUseAuthenticatedPrincipalForJMSXUserID(boolean useAuthenticatedPrincipalForJMSXUserID) { |
| this.useAuthenticatedPrincipalForJMSXUserID = useAuthenticatedPrincipalForJMSXUserID; |
| } |
| |
| /** |
| * Should MBeans that support showing the Authenticated User Name information have this |
| * value filled in or not. |
| * |
| * @return true if user names should be exposed in MBeans |
| */ |
| public boolean isPopulateUserNameInMBeans() { |
| return this.populateUserNameInMBeans; |
| } |
| |
| /** |
| * Sets whether Authenticated User Name information is shown in MBeans that support this field. |
| * @param value if MBeans should expose user name information. |
| */ |
| public void setPopulateUserNameInMBeans(boolean value) { |
| this.populateUserNameInMBeans = value; |
| } |
| |
| /** |
| * Gets the time in Milliseconds that an invocation of an MBean method will wait before |
| * failing. The default value is to wait forever (zero). |
| * |
| * @return timeout in milliseconds before MBean calls fail, (default is 0 or no timeout). |
| */ |
| public long getMbeanInvocationTimeout() { |
| return mbeanInvocationTimeout; |
| } |
| |
| /** |
| * Gets the time in Milliseconds that an invocation of an MBean method will wait before |
| * failing. The default value is to wait forever (zero). |
| * |
| * @param mbeanInvocationTimeout |
| * timeout in milliseconds before MBean calls fail, (default is 0 or no timeout). |
| */ |
| public void setMbeanInvocationTimeout(long mbeanInvocationTimeout) { |
| this.mbeanInvocationTimeout = mbeanInvocationTimeout; |
| } |
| |
| public boolean isNetworkConnectorStartAsync() { |
| return networkConnectorStartAsync; |
| } |
| |
| public void setNetworkConnectorStartAsync(boolean networkConnectorStartAsync) { |
| this.networkConnectorStartAsync = networkConnectorStartAsync; |
| } |
| |
| public boolean isAllowTempAutoCreationOnSend() { |
| return allowTempAutoCreationOnSend; |
| } |
| |
| /** |
| * enable if temp destinations need to be propagated through a network when |
| * advisorySupport==false. This is used in conjunction with the policy |
| * gcInactiveDestinations for matching temps so they can get removed |
| * when inactive |
| * |
| * @param allowTempAutoCreationOnSend |
| */ |
| public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) { |
| this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend; |
| } |
| |
| public long getOfflineDurableSubscriberTimeout() { |
| return offlineDurableSubscriberTimeout; |
| } |
| |
| public void setOfflineDurableSubscriberTimeout(long offlineDurableSubscriberTimeout) { |
| this.offlineDurableSubscriberTimeout = offlineDurableSubscriberTimeout; |
| } |
| |
| public long getOfflineDurableSubscriberTaskSchedule() { |
| return offlineDurableSubscriberTaskSchedule; |
| } |
| |
| public void setOfflineDurableSubscriberTaskSchedule(long offlineDurableSubscriberTaskSchedule) { |
| this.offlineDurableSubscriberTaskSchedule = offlineDurableSubscriberTaskSchedule; |
| } |
| |
| public boolean shouldRecordVirtualDestination(ActiveMQDestination destination) { |
| return isUseVirtualTopics() && destination.isQueue() && |
| getVirtualTopicConsumerDestinationFilter().matches(destination); |
| } |
| |
| synchronized public Throwable getStartException() { |
| return startException; |
| } |
| |
| public boolean isStartAsync() { |
| return startAsync; |
| } |
| |
| public void setStartAsync(boolean startAsync) { |
| this.startAsync = startAsync; |
| } |
| |
| public boolean isSlave() { |
| return this.slave; |
| } |
| |
| public boolean isStopping() { |
| return this.stopping.get(); |
| } |
| |
| /** |
| * @return true if the broker allowed to restart on shutdown. |
| */ |
| public boolean isRestartAllowed() { |
| return restartAllowed; |
| } |
| |
| /** |
| * Sets if the broker allowed to restart on shutdown. |
| */ |
| public void setRestartAllowed(boolean restartAllowed) { |
| this.restartAllowed = restartAllowed; |
| } |
| |
| /** |
| * A lifecycle manager of the BrokerService should |
| * inspect this property after a broker shutdown has occurred |
| * to find out if the broker needs to be re-created and started |
| * again. |
| * |
| * @return true if the broker wants to be restarted after it shuts down. |
| */ |
| public boolean isRestartRequested() { |
| return restartRequested; |
| } |
| |
| public void requestRestart() { |
| this.restartRequested = true; |
| } |
| |
| public int getStoreOpenWireVersion() { |
| return storeOpenWireVersion; |
| } |
| |
| public void setStoreOpenWireVersion(int storeOpenWireVersion) { |
| this.storeOpenWireVersion = storeOpenWireVersion; |
| } |
| |
| /** |
| * @return the current number of connections on this Broker. |
| */ |
| public int getCurrentConnections() { |
| return this.currentConnections.get(); |
| } |
| |
| /** |
| * @return the total number of connections this broker has handled since startup. |
| */ |
| public long getTotalConnections() { |
| return this.totalConnections.get(); |
| } |
| |
| public void incrementCurrentConnections() { |
| this.currentConnections.incrementAndGet(); |
| } |
| |
| public void decrementCurrentConnections() { |
| this.currentConnections.decrementAndGet(); |
| } |
| |
| public void incrementTotalConnections() { |
| this.totalConnections.incrementAndGet(); |
| } |
| |
| public boolean isRejectDurableConsumers() { |
| return rejectDurableConsumers; |
| } |
| |
| public void setRejectDurableConsumers(boolean rejectDurableConsumers) { |
| this.rejectDurableConsumers = rejectDurableConsumers; |
| } |
| |
| public boolean isUseVirtualDestSubs() { |
| return useVirtualDestSubs; |
| } |
| |
| public void setUseVirtualDestSubs( |
| boolean useVirtualDestSubs) { |
| this.useVirtualDestSubs = useVirtualDestSubs; |
| } |
| |
| public boolean isUseVirtualDestSubsOnCreation() { |
| return useVirtualDestSubsOnCreation; |
| } |
| |
| public void setUseVirtualDestSubsOnCreation( |
| boolean useVirtualDestSubsOnCreation) { |
| this.useVirtualDestSubsOnCreation = useVirtualDestSubsOnCreation; |
| } |
| |
| public boolean isAdjustUsageLimits() { |
| return adjustUsageLimits; |
| } |
| |
| public void setAdjustUsageLimits(boolean adjustUsageLimits) { |
| this.adjustUsageLimits = adjustUsageLimits; |
| } |
| |
| public void setRollbackOnlyOnAsyncException(boolean rollbackOnlyOnAsyncException) { |
| this.rollbackOnlyOnAsyncException = rollbackOnlyOnAsyncException; |
| } |
| |
| public boolean isRollbackOnlyOnAsyncException() { |
| return rollbackOnlyOnAsyncException; |
| } |
| |
| public int getMaxSchedulerRepeatAllowed() { |
| return maxSchedulerRepeatAllowed; |
| } |
| |
| public void setMaxSchedulerRepeatAllowed(int maxSchedulerRepeatAllowed) { |
| this.maxSchedulerRepeatAllowed = maxSchedulerRepeatAllowed; |
| } |
| } |