| /**************************************************************** |
| * Licensed to the Apache Software Foundation (ASF) under one * |
| * or more contributor license agreements. See the NOTICE file * |
| * distributed with this work for additional information * |
| * regarding copyright ownership. The ASF licenses this file * |
| * to you under the Apache License, Version 2.0 (the * |
| * "License"); you may not use this file except in compliance * |
| * with the License. You may obtain a copy of the License at * |
| * * |
| * http://www.apache.org/licenses/LICENSE-2.0 * |
| * * |
| * Unless required by applicable law or agreed to in writing, * |
| * software distributed under the License is distributed on an * |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * |
| * KIND, either express or implied. See the License for the * |
| * specific language governing permissions and limitations * |
| * under the License. * |
| ****************************************************************/ |
| package org.apache.james.postage; |
| |
| import java.io.File; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Iterator; |
| import java.util.LinkedHashSet; |
| import java.util.List; |
| import java.util.Set; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.james.cli.probe.ServerProbe; |
| import org.apache.james.cli.probe.impl.JmxServerProbe; |
| import org.apache.james.postage.client.POP3Client; |
| import org.apache.james.postage.client.SMTPClient; |
| import org.apache.james.postage.configuration.MailSender; |
| import org.apache.james.postage.configuration.PostageConfiguration; |
| import org.apache.james.postage.configuration.SendProfile; |
| import org.apache.james.postage.execution.SampleController; |
| import org.apache.james.postage.jmx.JVMResourceSampler; |
| import org.apache.james.postage.result.PostageRunnerResult; |
| import org.apache.james.postage.result.PostageRunnerResultImpl; |
| import org.apache.james.postage.smtpserver.SMTPMailSink; |
| |
| /** |
| * Central controlling class for the testing process. starts all workers, collects data and stops when time is out.<br/> |
| * relates to one and only one Scenario section from the configuration file. |
| */ |
| public class PostageRunner implements Runnable { |
| |
| private static Log log = LogFactory.getLog(PostageRunner.class); |
| |
| public static final int PHASE_CREATED = 0; |
| public static final int PHASE_STARTING = 1; |
| public static final int PHASE_RUNNING = 2; |
| public static final int PHASE_ABORTED = 3; |
| public static final int PHASE_COMPLETED = 4; |
| |
| private int currentPhase = PHASE_CREATED; |
| |
| private final PostageConfiguration postageConfiguration; |
| private final PostageRunnerResult results = new PostageRunnerResultImpl(); |
| |
| private POP3Client inboundMailingChecker; |
| private SampleController inboundMailingController; |
| |
| private SMTPMailSink smtpMailSink; |
| private SampleController outboundMailingInterceptorController; |
| |
| private List<SampleController> sendControllers = new ArrayList<SampleController>(); |
| |
| private JVMResourceSampler jvmResourceSampler = null; |
| private SampleController jvmResourceController = null; |
| |
| private int minutesRunning = 0; |
| |
| /** |
| * defines the prefix for every mail id generated by the current runner, so they can be distinguished from |
| * next runs. the runner instance is responsable to initializing this value! |
| * NOTE: this construct becomes a problem when two runners are running in parallel |
| */ |
| private static String messageIdPrefix = null; |
| |
| public static String getMessageIdPrefix() { |
| return messageIdPrefix + "-"; |
| } |
| |
| /** |
| * sends messages to James in two ways: |
| * 1. internal users relay to internal or external users using (inbound) SMTP |
| * 2. external users send mail to internal users using (inbound) SMTP |
| * |
| * the correct mail delivery is checked in two ways: |
| * 1. by checking internal users mails using POP3 |
| * 2. by checking mail to external users by receiving all mail forwarded by James to outbound/forwarded SMTP |
| * |
| * @param postageConfiguration |
| */ |
| public PostageRunner(PostageConfiguration postageConfiguration) { |
| this.messageIdPrefix = "" + System.currentTimeMillis(); |
| |
| this.postageConfiguration = postageConfiguration; |
| |
| int totalMailsPerMin = this.postageConfiguration.getTotalMailsPerMin(); |
| int durationMinutes = this.postageConfiguration.getDurationMinutes(); |
| |
| this.postageConfiguration.addDescriptionItem("mails_per_min", "" + totalMailsPerMin); |
| this.postageConfiguration.addDescriptionItem("totally_running_min", "" + durationMinutes); |
| this.postageConfiguration.addDescriptionItem("totally_mails_target", "" + totalMailsPerMin * durationMinutes); |
| |
| this.results.setEnvironmentDescription(this.postageConfiguration.getDescriptionItems()); |
| } |
| |
| private void execute() { |
| if (this.postageConfiguration != null) this.currentPhase = PHASE_STARTING; |
| |
| // do initialisation, check if all services can be connected |
| try { |
| setupInternalUserAccounts(); |
| setupExternalUserAccounts(); |
| setupInboundMailing(); |
| setupInboundMailingChecker(); |
| setupForwardedMailInterceptor(); |
| setupJMXRemoting(); |
| prepareResultFile(getCanonicalMailResultFileName()); |
| prepareResultFile(getCanonicalJVMStatisticsFileName()); |
| prepareResultFile(getCanonicalErrorsFileName()); |
| } catch (StartupException e) { |
| log.fatal("could not even start the runner successfully", e); |
| return; |
| } |
| |
| |
| this.currentPhase = PHASE_RUNNING; |
| |
| log.info("starting scenario " + this.postageConfiguration.getId()); |
| |
| // fork the timeout controller thread. it issues the oneMinute checkpoint event, too. |
| startTimer(); |
| |
| // start all threads |
| try { |
| recordData(); |
| } catch (Exception e) { |
| log.error("recording data was aborted!", e); |
| } |
| |
| // has to be set by method stopRecording() |
| // this.currentPhase = PHASE_COMPLETED; |
| |
| // writeMatchedMailResults (remaining) collected data |
| log.info("completing by writing data for scenario " + this.postageConfiguration.getId()); |
| writeData(false); |
| } |
| |
| private void prepareResultFile(String canonicalMailResultFileName) { |
| File writeCandidate = new File(canonicalMailResultFileName); |
| if (writeCandidate.exists()) { |
| // rename existing result file from previous run |
| // to something like "result___.cvs.64906993" to make place for new results |
| writeCandidate.renameTo(new File(canonicalMailResultFileName + "." + writeCandidate.lastModified())); |
| } |
| } |
| |
| /** |
| * for checking the running status of this PostageRunner. |
| * @return one of the values PHASE_CREATED (0), PHASE_STARTING (1), PHASE_RUNNING (2), PHASE_ABORTED (3), PHASE_COMPLETED (4) |
| */ |
| public int getCurrentPhase() { |
| return this.currentPhase; |
| } |
| |
| public void run() { |
| execute(); |
| } |
| |
| public PostageRunnerResult getResult() { |
| return this.results; |
| } |
| |
| /** |
| * set up a thread issueing one-minute events and finally shutting down data recording when time has run out. |
| */ |
| private void startTimer() { |
| new Thread( |
| new Runnable() { |
| public void run() { |
| try { |
| int durationMinutes = postageConfiguration.getDurationMinutes(); |
| log.info("running for " + durationMinutes + " minute(s)"); |
| for (int i = 0; i < durationMinutes; i++) { |
| Thread.sleep(60*1000); |
| oneMinuteCheckpoint(); |
| } |
| stopRecording(); |
| } catch (InterruptedException e) { |
| ; // exit |
| } |
| } |
| |
| } |
| ).start(); |
| } |
| |
| /** |
| * called after each fully completed minute in the running phase |
| */ |
| private void oneMinuteCheckpoint() { |
| this.minutesRunning++; |
| log.info("reached checkpoint after " + this.minutesRunning + " of " |
| + this.postageConfiguration.getDurationMinutes() + " minute(s) running."); |
| |
| //TODO do this in a separate thread? |
| writeData(true); |
| } |
| |
| private void stopRecording() { |
| log.info("stopping"); |
| if (this.sendControllers != null) { |
| Iterator<SampleController> iterator = this.sendControllers.iterator(); |
| while (iterator.hasNext()) { |
| SampleController sendController = iterator.next(); |
| sendController.stop(); |
| } |
| } |
| if (this.inboundMailingController != null) this.inboundMailingController.stop(); |
| |
| if (this.outboundMailingInterceptorController != null) this.outboundMailingInterceptorController.stop(); |
| this.currentPhase = PHASE_COMPLETED; |
| } |
| |
| /** |
| * interrupt the runner from outside |
| */ |
| public void terminate() { |
| stopRecording(); |
| this.currentPhase = PHASE_ABORTED; |
| writeData(false); |
| } |
| |
| private void writeData(boolean flushMatchedMailOnly) { |
| logElapsedData(); |
| |
| String filenameMailResult = getCanonicalMailResultFileName(); |
| String filenameJVMStatistics = getCanonicalJVMStatisticsFileName(); |
| String filenameErrors = getCanonicalErrorsFileName(); |
| this.results.writeResults(filenameMailResult, filenameJVMStatistics, filenameErrors, flushMatchedMailOnly); |
| } |
| |
| public String getCanonicalMailResultFileName() { |
| return "postage_mailResults." + this.postageConfiguration.getId() + ".csv"; |
| } |
| |
| public String getCanonicalJVMStatisticsFileName() { |
| return "postage_jvmStatistics." + this.postageConfiguration.getId() + ".csv"; |
| } |
| |
| public String getCanonicalErrorsFileName() { |
| return "postage_errors." + this.postageConfiguration.getId() + ".csv"; |
| } |
| |
| private void logElapsedData() { |
| log.info("unmatched messages: " + this.results.getUnmatchedMails()); |
| log.info("matched messages: " + this.results.getMatchedMails()); |
| log.info("valid matches: " + this.results.getValidMails()); |
| log.info("recorded errors: " + this.results.getErrorCount()); |
| } |
| |
| private void recordData() { |
| |
| Iterator<SampleController> iterator = this.sendControllers.iterator(); |
| while (iterator.hasNext()) { |
| SampleController sendController = iterator.next(); |
| sendController.runThreaded(); |
| } |
| |
| this.inboundMailingController = new SampleController(this.inboundMailingChecker, this.postageConfiguration.getTestserverPOP3FetchesPerMinute()); |
| this.inboundMailingController.runThreaded(); |
| |
| this.outboundMailingInterceptorController = new SampleController(this.smtpMailSink, 10, this.postageConfiguration.getTestserverSMTPForwardingWaitSeconds()); |
| this.outboundMailingInterceptorController.runThreaded(); |
| |
| if (this.jvmResourceSampler != null) { |
| this.jvmResourceController = new SampleController(this.jvmResourceSampler, 4); |
| this.jvmResourceController.runThreaded(); |
| } |
| |
| while(this.currentPhase == PHASE_RUNNING) { |
| try { |
| Thread.sleep(50); |
| } catch (InterruptedException e) { |
| ; // leave |
| } |
| } |
| |
| if (this.currentPhase == PHASE_COMPLETED) { |
| // walk through all internal users and check for un-matched mails |
| log.info("checking all internal accounts for unmatched mail..."); |
| this.inboundMailingChecker.doMatchMailForAllUsers(); |
| log.info("...done checking internal accounts"); |
| } else { |
| // if we didn't COMPLETE, we'd better skip this |
| log.info("skip checking internal accounts for unmatched mail."); |
| } |
| } |
| |
| private void setupExternalUserAccounts() { |
| int externalUserCount = this.postageConfiguration.getExternalUsers().getCount(); |
| String externalUsernamePrefix = this.postageConfiguration.getExternalUsers().getNamePrefix(); |
| |
| ArrayList<String> externalUsers = new ArrayList<String>(); |
| for (int i = 1; i <= externalUserCount; i++) { |
| String username = externalUsernamePrefix + i; |
| externalUsers.add(username); |
| } |
| this.postageConfiguration.getExternalUsers().setExistingUsers(externalUsers); |
| } |
| |
| /** |
| * sets up the profile where mail is send from external users to internal users |
| * @throws StartupException |
| */ |
| private void setupInboundMailing() throws StartupException { |
| if (this.postageConfiguration.getTestserverPortSMTPInbound() <= 0) return; |
| |
| Iterator<SendProfile> profileIterator = this.postageConfiguration.getProfiles().iterator(); |
| while (profileIterator.hasNext()) { |
| SendProfile sendProfile = profileIterator.next(); |
| Iterator<MailSender> mailSenderIterator = sendProfile.mailSenderIterator(); |
| while (mailSenderIterator.hasNext()) { |
| MailSender mailSender = mailSenderIterator.next(); |
| int sendPerMinute = mailSender.getSendPerMinute(); |
| |
| if (sendPerMinute < 1) continue; |
| |
| SMTPClient smtpClient = new SMTPClient(this.postageConfiguration.getTestserverHost(), |
| this.postageConfiguration.getTestserverPortSMTPInbound(), |
| this.postageConfiguration.getInternalUsers(), |
| this.postageConfiguration.getExternalUsers(), |
| this.results, |
| mailSender |
| ); |
| |
| boolean available = smtpClient.checkAvailability(); |
| log.info("availability of inbound mailing " + (available ? "": "NOT ") + "verified"); |
| if (!available) continue; |
| |
| SampleController sendController = new SampleController(smtpClient, sendPerMinute); |
| this.sendControllers.add(sendController); |
| } |
| } |
| |
| } |
| |
| |
| /** |
| * sets up the part for checking accounts via POP3, which are then aligned with sent test mails |
| * @throws StartupException |
| */ |
| private void setupInboundMailingChecker() throws StartupException { |
| if (this.postageConfiguration.getTestserverPortPOP3() <= 0) return; |
| |
| this.inboundMailingChecker = new POP3Client(this.postageConfiguration.getTestserverHost(), |
| this.postageConfiguration.getTestserverPortPOP3(), |
| this.postageConfiguration.getInternalUsers(), |
| this.results |
| ); |
| this.inboundMailingChecker.checkAvailability(); |
| boolean available = this.inboundMailingChecker.checkAvailability(); |
| if (available) { |
| log.info("availability of checking for inbound mailing (POP3) verified"); |
| } |
| } |
| |
| /** |
| * This method makes sure the test accounts exist in the target James Server. |
| * If the account does not already exist then the account is created; |
| * if the account does exist, the method checks configuration |
| * to see if the account should be re-used or removed and re-added. |
| * |
| * @throws StartupException |
| */ |
| private void setupInternalUserAccounts() throws StartupException { |
| |
| try { |
| |
| String host = this.postageConfiguration.getTestserverHost(); |
| int jmxPort = this.postageConfiguration.getTestserverPortJMXRemoting(); |
| int internalUserCount = this.postageConfiguration.getInternalUsers().getCount(); |
| String internalUsernamePrefix = this.postageConfiguration.getInternalUsers().getNamePrefix(); |
| String internalDomain = this.postageConfiguration.getInternalUsers().getDomain(); |
| String internalPassword = this.postageConfiguration.getInternalUsers().getPassword(); |
| |
| // Make connection to the James server via JMX |
| log.info("Connecting to host: " + host + ":" + jmxPort); |
| ServerProbe serverManager = new JmxServerProbe(host, jmxPort); |
| |
| if (! serverManager.containsDomain(internalDomain)) { |
| serverManager.addDomain(internalDomain); |
| } |
| |
| // Collect the set of users already provisioned |
| Set<String> existingUsers = new LinkedHashSet<String>(Arrays.asList(serverManager.listUsers())); |
| log.info("Existing users: " + existingUsers.toString()); |
| |
| // Check all the test accounts we need; if they don't exist then |
| // create them; if they do exist then check if we should reuse or recreate |
| ArrayList<String> internalUsers = new ArrayList<String>(); |
| for (int i = 1; i <= internalUserCount; i++) { |
| String username = internalUsernamePrefix + i; |
| String useremail = username + "@" + internalDomain; |
| if (existingUsers.contains(useremail)) { |
| log.info("user already exists: " + useremail); |
| if (!this.postageConfiguration.isInternalReuseExisting()) { |
| serverManager.removeUser(useremail); |
| serverManager.addUser(useremail, internalPassword); |
| log.info("user deleted and re-created: " + useremail); |
| } |
| serverManager.setPassword(useremail, internalPassword); |
| } else { |
| log.info("Adding user: " + useremail); |
| serverManager.addUser(useremail, internalPassword); |
| } |
| internalUsers.add(username); |
| } |
| |
| this.postageConfiguration.getInternalUsers().setExistingUsers(internalUsers); |
| |
| } catch (Exception e) { |
| throw new StartupException("error setting up internal user accounts", e); |
| } |
| |
| } |
| |
| private void setupForwardedMailInterceptor() throws StartupException { |
| SMTPMailSink smtpMailSink = new SMTPMailSink(); |
| smtpMailSink.setSmtpListenerPort(this.postageConfiguration.getTestserverPortSMTPForwarding()); |
| smtpMailSink.setResults(this.results); |
| try { |
| smtpMailSink.initialize(); |
| } catch (Exception e) { |
| throw new StartupException("failed to setup",e); |
| } |
| this.smtpMailSink = smtpMailSink; |
| log.info("forwarded mail interceptor is set up."); |
| } |
| |
| |
| private void setupJMXRemoting() throws StartupException { |
| int jmxPort = this.postageConfiguration.getTestserverPortJMXRemoting(); |
| if (jmxPort <= 0) { |
| return; |
| } |
| JVMResourceSampler jvmResourceSampler = new JVMResourceSampler("localhost", jmxPort, this.results); |
| try { |
| jvmResourceSampler.connectRemoteJamesJMXServer(); |
| log.info("connected to remote JMX"); |
| this.jvmResourceSampler = jvmResourceSampler; |
| } catch (Exception e) { |
| throw new StartupException("failed to setup JMX remoting for JVM resource sampling", e); |
| } |
| } |
| |
| } |