| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.stratos.python.cartridge.agent.test; |
| |
| import org.apache.activemq.broker.BrokerService; |
| import org.apache.commons.exec.*; |
| import org.apache.commons.io.FileUtils; |
| import org.apache.commons.io.output.ByteArrayOutputStream; |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.stratos.common.domain.LoadBalancingIPType; |
| import org.apache.stratos.common.threading.StratosThreadPool; |
| import org.apache.stratos.messaging.broker.publish.EventPublisher; |
| import org.apache.stratos.messaging.broker.publish.EventPublisherPool; |
| import org.apache.stratos.messaging.domain.topology.*; |
| import org.apache.stratos.messaging.event.Event; |
| import org.apache.stratos.messaging.event.instance.notifier.ArtifactUpdatedEvent; |
| import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent; |
| import org.apache.stratos.messaging.event.topology.MemberInitializedEvent; |
| import org.apache.stratos.messaging.listener.instance.status.InstanceActivatedEventListener; |
| import org.apache.stratos.messaging.listener.instance.status.InstanceStartedEventListener; |
| import org.apache.stratos.messaging.message.receiver.instance.status.InstanceStatusEventReceiver; |
| import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver; |
| import org.apache.stratos.messaging.util.MessagingUtil; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.Parameterized; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.net.ServerSocket; |
| import java.util.*; |
| import java.util.concurrent.ExecutorService; |
| |
| import static junit.framework.Assert.assertTrue; |
| |
| @RunWith(Parameterized.class) |
| public class PythonCartridgeAgentTest { |
| |
| private static final Log log = LogFactory.getLog(PythonCartridgeAgentTest.class); |
| |
| private static final String NEW_LINE = System.getProperty("line.separator"); |
| // private static final long TIMEOUT = 1440000; |
| private static final long TIMEOUT = 120000; |
| private static final String CLUSTER_ID = "php.php.domain"; |
| private static final String DEPLOYMENT_POLICY_NAME = "deployment-policy-1"; |
| private static final String AUTOSCALING_POLICY_NAME = "autoscaling-policy-1"; |
| private static final String APP_ID = "application-1"; |
| private static final String MEMBER_ID = "php.member-1"; |
| private static final String CLUSTER_INSTANCE_ID = "cluster-1-instance-1"; |
| private static final String NETWORK_PARTITION_ID = "network-partition-1"; |
| private static final String PARTITION_ID = "partition-1"; |
| private static final String TENANT_ID = "-1234"; |
| private static final String SERVICE_NAME = "php"; |
| public static final String SOURCE_PATH = "/tmp/stratos-pca-test-app-path/"; |
| |
| private static List<ServerSocket> serverSocketList; |
| private static Map<String, Executor> executorList; |
| private final ArtifactUpdatedEvent artifactUpdatedEvent; |
| private final Boolean expectedResult; |
| private boolean instanceStarted; |
| private boolean instanceActivated; |
| private ByteArrayOutputStreamLocal outputStream; |
| private boolean eventReceiverInitiated = false; |
| private TopologyEventReceiver topologyEventReceiver; |
| private InstanceStatusEventReceiver instanceStatusEventReceiver; |
| private int cepPort = 7712; |
| private BrokerService broker = new BrokerService(); |
| private static final String ACTIVEMQ_AMQP_BIND_ADDRESS = "tcp://localhost:61617"; |
| private static final String ACTIVEMQ_MQTT_BIND_ADDRESS = "mqtt://localhost:1884"; |
| private static final UUID PYTHON_AGENT_DIR_NAME = UUID.randomUUID(); |
| |
| public PythonCartridgeAgentTest(ArtifactUpdatedEvent artifactUpdatedEvent, Boolean expectedResult) { |
| this.artifactUpdatedEvent = artifactUpdatedEvent; |
| this.expectedResult = expectedResult; |
| } |
| |
| /** |
| * Setup method for test class |
| */ |
| @BeforeClass |
| public static void oneTimeSetUp() { |
| // Set jndi.properties.dir system property for initializing event publishers and receivers |
| System.setProperty("jndi.properties.dir", getResourcesFolderPath()); |
| } |
| |
| /** |
| * Setup method for test method testPythonCartridgeAgent |
| */ |
| @Before |
| public void setup() { |
| serverSocketList = new ArrayList<ServerSocket>(); |
| executorList = new HashMap<String, Executor>(); |
| try { |
| broker.addConnector(ACTIVEMQ_AMQP_BIND_ADDRESS); |
| broker.addConnector(ACTIVEMQ_MQTT_BIND_ADDRESS); |
| broker.setBrokerName("testBroker"); |
| broker.setDataDirectory(PythonCartridgeAgentTest.class.getResource("/").getPath() + |
| File.separator + ".." + File.separator + PYTHON_AGENT_DIR_NAME + File.separator + "activemq-data"); |
| broker.start(); |
| log.info("Broker service started!"); |
| } |
| catch (Exception e) { |
| log.error("Error while setting up broker service", e); |
| } |
| if (!this.eventReceiverInitiated) { |
| ExecutorService executorService = StratosThreadPool.getExecutorService("TEST_THREAD_POOL", 15); |
| topologyEventReceiver = new TopologyEventReceiver(); |
| topologyEventReceiver.setExecutorService(executorService); |
| topologyEventReceiver.execute(); |
| |
| instanceStatusEventReceiver = new InstanceStatusEventReceiver(); |
| instanceStatusEventReceiver.setExecutorService(executorService); |
| instanceStatusEventReceiver.execute(); |
| |
| this.instanceStarted = false; |
| instanceStatusEventReceiver.addEventListener(new InstanceStartedEventListener() { |
| @Override |
| protected void onEvent(Event event) { |
| log.info("Instance started event received"); |
| instanceStarted = true; |
| } |
| }); |
| |
| this.instanceActivated = false; |
| instanceStatusEventReceiver.addEventListener(new InstanceActivatedEventListener() { |
| @Override |
| protected void onEvent(Event event) { |
| log.info("Instance activated event received"); |
| instanceActivated = true; |
| } |
| }); |
| |
| this.eventReceiverInitiated = true; |
| } |
| // Simulate CEP server socket |
| startServerSocket(cepPort); |
| String agentPath = setupPythonAgent(); |
| log.info("Python agent working directory name: " + PYTHON_AGENT_DIR_NAME); |
| log.info("Starting python cartridge agent..."); |
| this.outputStream = executeCommand( |
| "python " + agentPath + "/agent.py > " + getResourcesFolderPath() + File.separator + ".." + |
| File.separator + PYTHON_AGENT_DIR_NAME + File.separator + "cartridge-agent.log"); |
| } |
| |
| /** |
| * TearDown method for test method testPythonCartridgeAgent |
| */ |
| @After |
| public void tearDown() { |
| for (Map.Entry<String, Executor> entry : executorList.entrySet()) { |
| try { |
| String commandText = entry.getKey(); |
| Executor executor = entry.getValue(); |
| ExecuteWatchdog watchdog = executor.getWatchdog(); |
| if (watchdog != null) { |
| log.info("Terminating process: " + commandText); |
| watchdog.destroyProcess(); |
| } |
| } |
| catch (Exception ignore) { |
| } |
| } |
| for (ServerSocket serverSocket : serverSocketList) { |
| try { |
| log.info("Stopping socket server: " + serverSocket.getLocalSocketAddress()); |
| serverSocket.close(); |
| } |
| catch (IOException ignore) { |
| } |
| } |
| |
| try { |
| log.info("Deleting source checkout folder..."); |
| FileUtils.deleteDirectory(new File(SOURCE_PATH)); |
| } |
| catch (Exception ignore) { |
| |
| } |
| |
| this.instanceStatusEventReceiver.terminate(); |
| this.topologyEventReceiver.terminate(); |
| |
| this.instanceActivated = false; |
| this.instanceStarted = false; |
| try { |
| broker.stop(); |
| } |
| catch (Exception e) { |
| log.error("Error while stopping the broker service", e); |
| } |
| } |
| |
| |
| /** |
| * This method returns a collection of {@link org.apache.stratos.messaging.event.instance.notifier.ArtifactUpdatedEvent} |
| * objects as parameters to the test |
| * |
| * @return |
| */ |
| @Parameterized.Parameters |
| public static Collection getArtifactUpdatedEventsAsParams() { |
| ArtifactUpdatedEvent publicRepoEvent = createTestArtifactUpdatedEvent(); |
| |
| ArtifactUpdatedEvent privateRepoEvent = createTestArtifactUpdatedEvent(); |
| privateRepoEvent.setRepoURL("https://bitbucket.org/testapache2211/testrepo.git"); |
| privateRepoEvent.setRepoUserName("testapache2211"); |
| privateRepoEvent.setRepoPassword("RExPDGa4GkPJj4kJDzSROQ=="); |
| |
| ArtifactUpdatedEvent privateRepoEvent2 = createTestArtifactUpdatedEvent(); |
| privateRepoEvent2.setRepoURL("https://testapache2211@bitbucket.org/testapache2211/testrepo.git"); |
| privateRepoEvent2.setRepoUserName("testapache2211"); |
| privateRepoEvent2.setRepoPassword("iF7qT+BKKPE3PGV1TeDsJA=="); |
| |
| return Arrays.asList(new Object[][]{ |
| {publicRepoEvent, true}, |
| {privateRepoEvent, true}, |
| {privateRepoEvent2, true} |
| }); |
| |
| // return Arrays.asList(new Object[][]{ |
| // {publicRepoEvent, true} |
| // }); |
| |
| } |
| |
| /** |
| * Creates an {@link org.apache.stratos.messaging.event.instance.notifier.ArtifactUpdatedEvent} object with a public |
| * repository URL |
| * |
| * @return |
| */ |
| private static ArtifactUpdatedEvent createTestArtifactUpdatedEvent() { |
| ArtifactUpdatedEvent publicRepoEvent = new ArtifactUpdatedEvent(); |
| publicRepoEvent.setClusterId(CLUSTER_ID); |
| publicRepoEvent.setTenantId(TENANT_ID); |
| publicRepoEvent.setRepoURL("https://bitbucket.org/testapache2211/opentestrepo1.git"); |
| return publicRepoEvent; |
| } |
| |
| @Test(timeout = TIMEOUT) |
| public void testPythonCartridgeAgent() { |
| Thread communicatorThread = new Thread(new Runnable() { |
| @Override |
| public void run() { |
| List<String> outputLines = new ArrayList<String>(); |
| while (!outputStream.isClosed()) { |
| List<String> newLines = getNewLines(outputLines, outputStream.toString()); |
| if (newLines.size() > 0) { |
| for (String line : newLines) { |
| if (line.contains("Subscribed to 'topology/#'")) { |
| sleep(1000); |
| // Send complete topology event |
| log.info("Publishing complete topology event..."); |
| Topology topology = createTestTopology(); |
| CompleteTopologyEvent completeTopologyEvent = new CompleteTopologyEvent(topology); |
| publishEvent(completeTopologyEvent); |
| log.info("Complete topology event published"); |
| |
| sleep(3000); |
| // Publish member initialized event |
| log.info("Publishing member initialized event..."); |
| MemberInitializedEvent memberInitializedEvent = new MemberInitializedEvent( |
| SERVICE_NAME, CLUSTER_ID, CLUSTER_INSTANCE_ID, MEMBER_ID, NETWORK_PARTITION_ID, |
| PARTITION_ID |
| ); |
| publishEvent(memberInitializedEvent); |
| log.info("Member initialized event published"); |
| |
| // Simulate server socket |
| startServerSocket(8080); |
| } |
| if (line.contains("Artifact repository found")) { |
| // Send artifact updated event |
| publishEvent(artifactUpdatedEvent); |
| } |
| |
| if (line.contains("Exception in thread") || line.contains("ERROR")) { |
| //throw new RuntimeException(line); |
| } |
| log.info(line); |
| } |
| } |
| sleep(100); |
| } |
| } |
| }); |
| |
| communicatorThread.start(); |
| |
| while (!instanceActivated) { |
| // wait until the instance activated event is received. |
| sleep(2000); |
| } |
| |
| assertTrue("Instance started event was not received", instanceStarted); |
| assertTrue("Instance activated event was not received", instanceActivated == this.expectedResult); |
| } |
| |
| /** |
| * Publish messaging event |
| * |
| * @param event |
| */ |
| private void publishEvent(Event event) { |
| String topicName = MessagingUtil.getMessageTopicName(event); |
| EventPublisher eventPublisher = EventPublisherPool.getPublisher(topicName); |
| eventPublisher.publish(event); |
| } |
| |
| /** |
| * Start server socket |
| * |
| * @param port |
| */ |
| private void startServerSocket(final int port) { |
| Thread socketThread = new Thread(new Runnable() { |
| @Override |
| public void run() { |
| try { |
| ServerSocket serverSocket = new ServerSocket(port); |
| serverSocketList.add(serverSocket); |
| log.info("Server socket started on port: " + port); |
| serverSocket.accept(); |
| } |
| catch (IOException e) { |
| String message = "Could not start server socket: [port] " + port; |
| log.error(message, e); |
| throw new RuntimeException(message, e); |
| } |
| } |
| }); |
| socketThread.start(); |
| } |
| |
| /** |
| * Create test topology |
| * |
| * @return |
| */ |
| private Topology createTestTopology() { |
| Topology topology = new Topology(); |
| Service service = new Service(SERVICE_NAME, ServiceType.SingleTenant); |
| topology.addService(service); |
| |
| Cluster cluster = new Cluster(service.getServiceName(), CLUSTER_ID, DEPLOYMENT_POLICY_NAME, |
| AUTOSCALING_POLICY_NAME, APP_ID); |
| service.addCluster(cluster); |
| |
| Member member = new Member(service.getServiceName(), cluster.getClusterId(), MEMBER_ID, |
| CLUSTER_INSTANCE_ID, NETWORK_PARTITION_ID, PARTITION_ID, LoadBalancingIPType.Private, |
| System.currentTimeMillis()); |
| |
| member.setDefaultPrivateIP("10.0.0.1"); |
| member.setDefaultPublicIP("20.0.0.1"); |
| Properties properties = new Properties(); |
| properties.setProperty("prop1", "value1"); |
| member.setProperties(properties); |
| member.setStatus(MemberStatus.Created); |
| cluster.addMember(member); |
| |
| return topology; |
| } |
| |
| /** |
| * Return new lines found in the output |
| * |
| * @param currentOutputLines current output lines |
| * @param output output |
| * @return |
| */ |
| private List<String> getNewLines(List<String> currentOutputLines, String output) { |
| List<String> newLines = new ArrayList<String>(); |
| |
| if (StringUtils.isNotBlank(output)) { |
| String[] lines = output.split(NEW_LINE); |
| if (lines != null) { |
| for (String line : lines) { |
| if (!currentOutputLines.contains(line)) { |
| currentOutputLines.add(line); |
| newLines.add(line); |
| } |
| } |
| } |
| } |
| return newLines; |
| } |
| |
| /** |
| * Sleep current thread |
| * |
| * @param time |
| */ |
| private void sleep(long time) { |
| try { |
| Thread.sleep(time); |
| } |
| catch (InterruptedException ignore) { |
| } |
| } |
| |
| /** |
| * Copy python agent distribution to a new folder, extract it and copy sample configuration files |
| * |
| * @return |
| */ |
| private String setupPythonAgent() { |
| try { |
| log.info("Setting up python cartridge agent..."); |
| String srcAgentPath = getResourcesFolderPath() + "/../../src/main/python/cartridge.agent/cartridge.agent"; |
| String destAgentPath = |
| getResourcesFolderPath() + File.separator + ".." + File.separator + PYTHON_AGENT_DIR_NAME + |
| "/cartridge.agent"; |
| FileUtils.copyDirectory(new File(srcAgentPath), new File(destAgentPath)); |
| |
| String srcAgentConfPath = getResourcesFolderPath() + "/agent.conf"; |
| String destAgentConfPath = destAgentPath + "/agent.conf"; |
| FileUtils.copyFile(new File(srcAgentConfPath), new File(destAgentConfPath)); |
| |
| String srcLoggingIniPath = getResourcesFolderPath() + "/logging.ini"; |
| String destLoggingIniPath = destAgentPath + "/logging.ini"; |
| FileUtils.copyFile(new File(srcLoggingIniPath), new File(destLoggingIniPath)); |
| |
| String srcPayloadPath = getResourcesFolderPath() + "/payload"; |
| String destPayloadPath = destAgentPath + "/payload"; |
| FileUtils.copyDirectory(new File(srcPayloadPath), new File(destPayloadPath)); |
| |
| log.info("Changing extension scripts permissions"); |
| File extensionsPath = new File(destAgentPath + "/extensions/bash"); |
| File[] extensions = extensionsPath.listFiles(); |
| for (File extension : extensions) { |
| extension.setExecutable(true); |
| } |
| |
| log.info("Python cartridge agent setup completed"); |
| |
| return destAgentPath; |
| } |
| catch (Exception e) { |
| String message = "Could not copy cartridge agent distribution"; |
| log.error(message, e); |
| throw new RuntimeException(message, e); |
| } |
| } |
| |
| /** |
| * Execute shell command |
| * |
| * @param commandText |
| */ |
| private ByteArrayOutputStreamLocal executeCommand(final String commandText) { |
| final ByteArrayOutputStreamLocal outputStream = new ByteArrayOutputStreamLocal(); |
| try { |
| CommandLine commandline = CommandLine.parse(commandText); |
| DefaultExecutor exec = new DefaultExecutor(); |
| PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream); |
| exec.setWorkingDirectory(new File( |
| getResourcesFolderPath() + File.separator + ".." + File.separator + PYTHON_AGENT_DIR_NAME)); |
| exec.setStreamHandler(streamHandler); |
| ExecuteWatchdog watchdog = new ExecuteWatchdog(TIMEOUT); |
| exec.setWatchdog(watchdog); |
| exec.execute(commandline, new ExecuteResultHandler() { |
| @Override |
| public void onProcessComplete(int i) { |
| log.info(commandText + " process completed"); |
| } |
| |
| @Override |
| public void onProcessFailed(ExecuteException e) { |
| log.error(commandText + " process failed", e); |
| } |
| }); |
| executorList.put(commandText, exec); |
| return outputStream; |
| } |
| catch (Exception e) { |
| log.error(outputStream.toString(), e); |
| throw new RuntimeException(e); |
| } |
| } |
| |
| /** |
| * Get resources folder path |
| * |
| * @return |
| */ |
| private static String getResourcesFolderPath() { |
| String path = PythonCartridgeAgentTest.class.getResource(File.separator).getPath(); |
| return StringUtils.removeEnd(path, File.separator); |
| } |
| |
| /** |
| * Implements ByteArrayOutputStream.isClosed() method |
| */ |
| private class ByteArrayOutputStreamLocal extends ByteArrayOutputStream { |
| private boolean closed; |
| |
| @Override |
| public void close() throws IOException { |
| super.close(); |
| closed = true; |
| } |
| |
| public boolean isClosed() { |
| return closed; |
| } |
| } |
| } |