blob: 61ef0513333bd141ec921a960ce194003ea7864f [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.postage;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.james.cli.probe.ServerProbe;
import org.apache.james.cli.probe.impl.JmxServerProbe;
import org.apache.james.postage.client.POP3Client;
import org.apache.james.postage.client.SMTPClient;
import org.apache.james.postage.configuration.MailSender;
import org.apache.james.postage.configuration.PostageConfiguration;
import org.apache.james.postage.configuration.SendProfile;
import org.apache.james.postage.execution.SampleController;
import org.apache.james.postage.jmx.JVMResourceSampler;
import org.apache.james.postage.result.PostageRunnerResult;
import org.apache.james.postage.result.PostageRunnerResultImpl;
import org.apache.james.postage.smtpserver.SMTPMailSink;
/**
* Central controlling class for the testing process. starts all workers, collects data and stops when time is out.<br/>
* relates to one and only one Scenario section from the configuration file.
*/
public class PostageRunner implements Runnable {
private static Log log = LogFactory.getLog(PostageRunner.class);
public static final int PHASE_CREATED = 0;
public static final int PHASE_STARTING = 1;
public static final int PHASE_RUNNING = 2;
public static final int PHASE_ABORTED = 3;
public static final int PHASE_COMPLETED = 4;
private int currentPhase = PHASE_CREATED;
private final PostageConfiguration postageConfiguration;
private final PostageRunnerResult results = new PostageRunnerResultImpl();
private POP3Client inboundMailingChecker;
private SampleController inboundMailingController;
private SMTPMailSink smtpMailSink;
private SampleController outboundMailingInterceptorController;
private List<SampleController> sendControllers = new ArrayList<SampleController>();
private JVMResourceSampler jvmResourceSampler = null;
private SampleController jvmResourceController = null;
private int minutesRunning = 0;
/**
* defines the prefix for every mail id generated by the current runner, so they can be distinguished from
* next runs. the runner instance is responsable to initializing this value!
* NOTE: this construct becomes a problem when two runners are running in parallel
*/
private static String messageIdPrefix = null;
public static String getMessageIdPrefix() {
return messageIdPrefix + "-";
}
/**
* sends messages to James in two ways:
* 1. internal users relay to internal or external users using (inbound) SMTP
* 2. external users send mail to internal users using (inbound) SMTP
*
* the correct mail delivery is checked in two ways:
* 1. by checking internal users mails using POP3
* 2. by checking mail to external users by receiving all mail forwarded by James to outbound/forwarded SMTP
*
* @param postageConfiguration
*/
public PostageRunner(PostageConfiguration postageConfiguration) {
this.messageIdPrefix = "" + System.currentTimeMillis();
this.postageConfiguration = postageConfiguration;
int totalMailsPerMin = this.postageConfiguration.getTotalMailsPerMin();
int durationMinutes = this.postageConfiguration.getDurationMinutes();
this.postageConfiguration.addDescriptionItem("mails_per_min", "" + totalMailsPerMin);
this.postageConfiguration.addDescriptionItem("totally_running_min", "" + durationMinutes);
this.postageConfiguration.addDescriptionItem("totally_mails_target", "" + totalMailsPerMin * durationMinutes);
this.results.setEnvironmentDescription(this.postageConfiguration.getDescriptionItems());
}
private void execute() {
if (this.postageConfiguration != null) this.currentPhase = PHASE_STARTING;
// do initialisation, check if all services can be connected
try {
setupInternalUserAccounts();
setupExternalUserAccounts();
setupInboundMailing();
setupInboundMailingChecker();
setupForwardedMailInterceptor();
setupJMXRemoting();
prepareResultFile(getCanonicalMailResultFileName());
prepareResultFile(getCanonicalJVMStatisticsFileName());
prepareResultFile(getCanonicalErrorsFileName());
} catch (StartupException e) {
log.fatal("could not even start the runner successfully", e);
return;
}
this.currentPhase = PHASE_RUNNING;
log.info("starting scenario " + this.postageConfiguration.getId());
// fork the timeout controller thread. it issues the oneMinute checkpoint event, too.
startTimer();
// start all threads
try {
recordData();
} catch (Exception e) {
log.error("recording data was aborted!", e);
}
// has to be set by method stopRecording()
// this.currentPhase = PHASE_COMPLETED;
// writeMatchedMailResults (remaining) collected data
log.info("completing by writing data for scenario " + this.postageConfiguration.getId());
writeData(false);
}
private void prepareResultFile(String canonicalMailResultFileName) {
File writeCandidate = new File(canonicalMailResultFileName);
if (writeCandidate.exists()) {
// rename existing result file from previous run
// to something like "result___.cvs.64906993" to make place for new results
writeCandidate.renameTo(new File(canonicalMailResultFileName + "." + writeCandidate.lastModified()));
}
}
/**
* for checking the running status of this PostageRunner.
* @return one of the values PHASE_CREATED (0), PHASE_STARTING (1), PHASE_RUNNING (2), PHASE_ABORTED (3), PHASE_COMPLETED (4)
*/
public int getCurrentPhase() {
return this.currentPhase;
}
public void run() {
execute();
}
public PostageRunnerResult getResult() {
return this.results;
}
/**
* set up a thread issueing one-minute events and finally shutting down data recording when time has run out.
*/
private void startTimer() {
new Thread(
new Runnable() {
public void run() {
try {
int durationMinutes = postageConfiguration.getDurationMinutes();
log.info("running for " + durationMinutes + " minute(s)");
for (int i = 0; i < durationMinutes; i++) {
Thread.sleep(60*1000);
oneMinuteCheckpoint();
}
stopRecording();
} catch (InterruptedException e) {
; // exit
}
}
}
).start();
}
/**
* called after each fully completed minute in the running phase
*/
private void oneMinuteCheckpoint() {
this.minutesRunning++;
log.info("reached checkpoint after " + this.minutesRunning + " of "
+ this.postageConfiguration.getDurationMinutes() + " minute(s) running.");
//TODO do this in a separate thread?
writeData(true);
}
private void stopRecording() {
log.info("stopping");
if (this.sendControllers != null) {
Iterator<SampleController> iterator = this.sendControllers.iterator();
while (iterator.hasNext()) {
SampleController sendController = iterator.next();
sendController.stop();
}
}
if (this.inboundMailingController != null) this.inboundMailingController.stop();
if (this.outboundMailingInterceptorController != null) this.outboundMailingInterceptorController.stop();
this.currentPhase = PHASE_COMPLETED;
}
/**
* interrupt the runner from outside
*/
public void terminate() {
stopRecording();
this.currentPhase = PHASE_ABORTED;
writeData(false);
}
private void writeData(boolean flushMatchedMailOnly) {
logElapsedData();
String filenameMailResult = getCanonicalMailResultFileName();
String filenameJVMStatistics = getCanonicalJVMStatisticsFileName();
String filenameErrors = getCanonicalErrorsFileName();
this.results.writeResults(filenameMailResult, filenameJVMStatistics, filenameErrors, flushMatchedMailOnly);
}
public String getCanonicalMailResultFileName() {
return "postage_mailResults." + this.postageConfiguration.getId() + ".csv";
}
public String getCanonicalJVMStatisticsFileName() {
return "postage_jvmStatistics." + this.postageConfiguration.getId() + ".csv";
}
public String getCanonicalErrorsFileName() {
return "postage_errors." + this.postageConfiguration.getId() + ".csv";
}
private void logElapsedData() {
log.info("unmatched messages: " + this.results.getUnmatchedMails());
log.info("matched messages: " + this.results.getMatchedMails());
log.info("valid matches: " + this.results.getValidMails());
log.info("recorded errors: " + this.results.getErrorCount());
}
private void recordData() {
Iterator<SampleController> iterator = this.sendControllers.iterator();
while (iterator.hasNext()) {
SampleController sendController = iterator.next();
sendController.runThreaded();
}
this.inboundMailingController = new SampleController(this.inboundMailingChecker, this.postageConfiguration.getTestserverPOP3FetchesPerMinute());
this.inboundMailingController.runThreaded();
this.outboundMailingInterceptorController = new SampleController(this.smtpMailSink, 10, this.postageConfiguration.getTestserverSMTPForwardingWaitSeconds());
this.outboundMailingInterceptorController.runThreaded();
if (this.jvmResourceSampler != null) {
this.jvmResourceController = new SampleController(this.jvmResourceSampler, 4);
this.jvmResourceController.runThreaded();
}
while(this.currentPhase == PHASE_RUNNING) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
; // leave
}
}
if (this.currentPhase == PHASE_COMPLETED) {
// walk through all internal users and check for un-matched mails
log.info("checking all internal accounts for unmatched mail...");
this.inboundMailingChecker.doMatchMailForAllUsers();
log.info("...done checking internal accounts");
} else {
// if we didn't COMPLETE, we'd better skip this
log.info("skip checking internal accounts for unmatched mail.");
}
}
private void setupExternalUserAccounts() {
int externalUserCount = this.postageConfiguration.getExternalUsers().getCount();
String externalUsernamePrefix = this.postageConfiguration.getExternalUsers().getNamePrefix();
ArrayList<String> externalUsers = new ArrayList<String>();
for (int i = 1; i <= externalUserCount; i++) {
String username = externalUsernamePrefix + i;
externalUsers.add(username);
}
this.postageConfiguration.getExternalUsers().setExistingUsers(externalUsers);
}
/**
* sets up the profile where mail is send from external users to internal users
* @throws StartupException
*/
private void setupInboundMailing() throws StartupException {
if (this.postageConfiguration.getTestserverPortSMTPInbound() <= 0) return;
Iterator<SendProfile> profileIterator = this.postageConfiguration.getProfiles().iterator();
while (profileIterator.hasNext()) {
SendProfile sendProfile = profileIterator.next();
Iterator<MailSender> mailSenderIterator = sendProfile.mailSenderIterator();
while (mailSenderIterator.hasNext()) {
MailSender mailSender = mailSenderIterator.next();
int sendPerMinute = mailSender.getSendPerMinute();
if (sendPerMinute < 1) continue;
SMTPClient smtpClient = new SMTPClient(this.postageConfiguration.getTestserverHost(),
this.postageConfiguration.getTestserverPortSMTPInbound(),
this.postageConfiguration.getInternalUsers(),
this.postageConfiguration.getExternalUsers(),
this.results,
mailSender
);
boolean available = smtpClient.checkAvailability();
log.info("availability of inbound mailing " + (available ? "": "NOT ") + "verified");
if (!available) continue;
SampleController sendController = new SampleController(smtpClient, sendPerMinute);
this.sendControllers.add(sendController);
}
}
}
/**
* sets up the part for checking accounts via POP3, which are then aligned with sent test mails
* @throws StartupException
*/
private void setupInboundMailingChecker() throws StartupException {
if (this.postageConfiguration.getTestserverPortPOP3() <= 0) return;
this.inboundMailingChecker = new POP3Client(this.postageConfiguration.getTestserverHost(),
this.postageConfiguration.getTestserverPortPOP3(),
this.postageConfiguration.getInternalUsers(),
this.results
);
this.inboundMailingChecker.checkAvailability();
boolean available = this.inboundMailingChecker.checkAvailability();
if (available) {
log.info("availability of checking for inbound mailing (POP3) verified");
}
}
/**
* This method makes sure the test accounts exist in the target James Server.
* If the account does not already exist then the account is created;
* if the account does exist, the method checks configuration
* to see if the account should be re-used or removed and re-added.
*
* @throws StartupException
*/
private void setupInternalUserAccounts() throws StartupException {
try {
String host = this.postageConfiguration.getTestserverHost();
int jmxPort = this.postageConfiguration.getTestserverPortJMXRemoting();
int internalUserCount = this.postageConfiguration.getInternalUsers().getCount();
String internalUsernamePrefix = this.postageConfiguration.getInternalUsers().getNamePrefix();
String internalDomain = this.postageConfiguration.getInternalUsers().getDomain();
String internalPassword = this.postageConfiguration.getInternalUsers().getPassword();
// Make connection to the James server via JMX
log.info("Connecting to host: " + host + ":" + jmxPort);
ServerProbe serverManager = new JmxServerProbe(host, jmxPort);
if (! serverManager.containsDomain(internalDomain)) {
serverManager.addDomain(internalDomain);
}
// Collect the set of users already provisioned
Set<String> existingUsers = new LinkedHashSet<String>(Arrays.asList(serverManager.listUsers()));
log.info("Existing users: " + existingUsers.toString());
// Check all the test accounts we need; if they don't exist then
// create them; if they do exist then check if we should reuse or recreate
ArrayList<String> internalUsers = new ArrayList<String>();
for (int i = 1; i <= internalUserCount; i++) {
String username = internalUsernamePrefix + i;
String useremail = username + "@" + internalDomain;
if (existingUsers.contains(useremail)) {
log.info("user already exists: " + useremail);
if (!this.postageConfiguration.isInternalReuseExisting()) {
serverManager.removeUser(useremail);
serverManager.addUser(useremail, internalPassword);
log.info("user deleted and re-created: " + useremail);
}
serverManager.setPassword(useremail, internalPassword);
} else {
log.info("Adding user: " + useremail);
serverManager.addUser(useremail, internalPassword);
}
internalUsers.add(username);
}
this.postageConfiguration.getInternalUsers().setExistingUsers(internalUsers);
} catch (Exception e) {
throw new StartupException("error setting up internal user accounts", e);
}
}
private void setupForwardedMailInterceptor() throws StartupException {
SMTPMailSink smtpMailSink = new SMTPMailSink();
smtpMailSink.setSmtpListenerPort(this.postageConfiguration.getTestserverPortSMTPForwarding());
smtpMailSink.setResults(this.results);
try {
smtpMailSink.initialize();
} catch (Exception e) {
throw new StartupException("failed to setup",e);
}
this.smtpMailSink = smtpMailSink;
log.info("forwarded mail interceptor is set up.");
}
private void setupJMXRemoting() throws StartupException {
int jmxPort = this.postageConfiguration.getTestserverPortJMXRemoting();
if (jmxPort <= 0) {
return;
}
JVMResourceSampler jvmResourceSampler = new JVMResourceSampler("localhost", jmxPort, this.results);
try {
jvmResourceSampler.connectRemoteJamesJMXServer();
log.info("connected to remote JMX");
this.jvmResourceSampler = jvmResourceSampler;
} catch (Exception e) {
throw new StartupException("failed to setup JMX remoting for JVM resource sampling", e);
}
}
}