| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.activemq.artemis.core.server.impl; |
| |
| import org.apache.activemq.artemis.api.core.Pair; |
| import org.apache.activemq.artemis.core.config.Configuration; |
| import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration; |
| import org.apache.activemq.artemis.core.persistence.StorageManager; |
| import org.apache.activemq.artemis.core.postoffice.PostOffice; |
| import org.apache.activemq.artemis.core.server.ActiveMQComponent; |
| import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; |
| import org.apache.activemq.artemis.core.server.ConnectorService; |
| import org.apache.activemq.artemis.core.server.ConnectorServiceFactory; |
| import org.apache.activemq.artemis.core.server.ServiceRegistry; |
| import org.apache.activemq.artemis.utils.ConfigurationHelper; |
| |
| import java.util.Collection; |
| import java.util.HashSet; |
| import java.util.Set; |
| import java.util.concurrent.ScheduledExecutorService; |
| |
| /** |
| * ConnectorsService will pool some resource for updates, e.g. Twitter, then the changes are picked |
| * and converted into a ServerMessage for a given destination (queue). |
| * <p> |
| * It may also listen to a queue, and forward them (e.g. messages arriving at the queue are picked |
| * and tweeted to some Twitter account). |
| */ |
| public final class ConnectorsService implements ActiveMQComponent { |
| |
| private final StorageManager storageManager; |
| |
| private final PostOffice postOffice; |
| |
| private final ScheduledExecutorService scheduledPool; |
| |
| private boolean isStarted = false; |
| |
| private final Configuration configuration; |
| |
| private final Set<ConnectorService> connectors = new HashSet<>(); |
| |
| private final ServiceRegistry serviceRegistry; |
| |
| public ConnectorsService(final Configuration configuration, |
| final StorageManager storageManager, |
| final ScheduledExecutorService scheduledPool, |
| final PostOffice postOffice, |
| final ServiceRegistry serviceRegistry) { |
| this.configuration = configuration; |
| this.storageManager = storageManager; |
| this.scheduledPool = scheduledPool; |
| this.postOffice = postOffice; |
| this.serviceRegistry = serviceRegistry; |
| } |
| |
| @Override |
| public void start() throws Exception { |
| Collection<Pair<ConnectorServiceFactory, ConnectorServiceConfiguration>> connectorServiceFactories = serviceRegistry.getConnectorServices(configuration.getConnectorServiceConfigurations()); |
| |
| for (Pair<ConnectorServiceFactory, ConnectorServiceConfiguration> pair : connectorServiceFactories) { |
| createService(pair.getB(), pair.getA()); |
| } |
| |
| for (ConnectorService connector : connectors) { |
| try { |
| connector.start(); |
| } |
| catch (Throwable e) { |
| ActiveMQServerLogger.LOGGER.errorStartingConnectorService(e, connector.getName()); |
| } |
| } |
| isStarted = true; |
| } |
| |
| public void createService(ConnectorServiceConfiguration info, ConnectorServiceFactory factory) { |
| if (info.getParams() != null) { |
| Set<String> invalid = ConfigurationHelper.checkKeys(factory.getAllowableProperties(), info.getParams().keySet()); |
| if (!invalid.isEmpty()) { |
| ActiveMQServerLogger.LOGGER.connectorKeysInvalid(ConfigurationHelper.stringSetToCommaListString(invalid)); |
| return; |
| } |
| } |
| |
| Set<String> invalid = ConfigurationHelper.checkKeysExist(factory.getRequiredProperties(), info.getParams().keySet()); |
| if (!invalid.isEmpty()) { |
| ActiveMQServerLogger.LOGGER.connectorKeysMissing(ConfigurationHelper.stringSetToCommaListString(invalid)); |
| return; |
| } |
| ConnectorService connectorService = factory.createConnectorService(info.getConnectorName(), info.getParams(), storageManager, postOffice, scheduledPool); |
| connectors.add(connectorService); |
| } |
| |
| @Override |
| public void stop() throws Exception { |
| if (!isStarted) { |
| return; |
| } |
| for (ConnectorService connector : connectors) { |
| try { |
| connector.stop(); |
| } |
| catch (Throwable e) { |
| ActiveMQServerLogger.LOGGER.errorStoppingConnectorService(e, connector.getName()); |
| } |
| } |
| connectors.clear(); |
| isStarted = false; |
| } |
| |
| @Override |
| public boolean isStarted() { |
| return isStarted; |
| } |
| |
| public Set<ConnectorService> getConnectors() { |
| return connectors; |
| } |
| } |