blob: 7f6319ba8dc579dc939dec8266d4619764d5c707 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/* $Id$ */
package org.apache.fop.threading;
import java.io.File;
import java.io.OutputStream;
import java.text.DecimalFormat;
import java.util.Collections;
import java.util.List;
import javax.xml.transform.Source;
import javax.xml.transform.Templates;
import javax.xml.transform.TransformerConfigurationException;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.stream.StreamSource;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.output.CountingOutputStream;
import org.apache.commons.io.output.NullOutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.fop.activity.ContainerUtil;
import org.apache.fop.activity.Initializable;
import org.apache.fop.configuration.Configurable;
import org.apache.fop.configuration.Configuration;
import org.apache.fop.configuration.ConfigurationException;
/**
* Testbed for multi-threading tests. The class can run a configurable set of task a number of
* times in a configurable number of threads to easily reproduce multi-threading issues.
*/
public class FOPTestbed
implements Configurable, Initializable {
private static final Log LOG = LogFactory.getLog(FOPTestbed.class);
private int repeat;
private List taskList = new java.util.ArrayList();
private int threads;
private File outputDir;
private Configuration fopCfg;
private Processor foprocessor;
private boolean writeToDevNull;
private int counter;
private List results = Collections.synchronizedList(new java.util.LinkedList());
/** {@inheritDoc} */
public void configure(Configuration configuration) throws ConfigurationException {
this.threads = configuration.getChild("threads").getValueAsInteger(10);
this.outputDir = new File(configuration.getChild("output-dir").getValue());
this.writeToDevNull = configuration.getChild("devnull").getValueAsBoolean(false);
Configuration tasks = configuration.getChild("tasks");
this.repeat = tasks.getAttributeAsInteger("repeat", 1);
Configuration[] entries = tasks.getChildren("task");
for (Configuration entry : entries) {
this.taskList.add(new TaskDef(entry));
}
this.fopCfg = configuration.getChild("processor");
}
/** {@inheritDoc} */
public void initialize() throws Exception {
this.foprocessor = createFOProcessor();
}
/**
* Starts the stress test.
*/
public void doStressTest() {
LOG.info("Starting stress test...");
long start = System.currentTimeMillis();
this.counter = 0;
//Initialize threads
ThreadGroup workerGroup = new ThreadGroup("FOP workers");
List threadList = new java.util.LinkedList();
for (int ti = 0; ti < this.threads; ti++) {
TaskRunner runner = new TaskRunner();
// ContainerUtil.enableLogging(runner, logger);
Thread thread = new Thread(workerGroup, runner, "Worker- " + ti);
threadList.add(thread);
}
//Start threads
for (Object aThreadList : threadList) {
((Thread) aThreadList).start();
}
//Wait for threads to end
while (threadList.size() > 0) {
Thread t = (Thread)threadList.get(0);
if (!t.isAlive()) {
threadList.remove(0);
continue;
}
try {
Thread.sleep(100);
} catch (InterruptedException ie) {
//ignore
}
}
long duration = System.currentTimeMillis() - start;
report(duration);
}
private void report(long duration) {
int count = this.results.size();
int failures = 0;
long bytesWritten = 0;
System.out.println("Report on " + count + " tasks:");
for (Object result : this.results) {
Result res = (Result) result;
if (res.failure != null) {
System.out.println("FAIL: " + (res.end - res.start) + " " + res.task);
System.out.println(" -> " + res.failure.getMessage());
failures++;
} else {
System.out.println("good: " + (res.end - res.start) + " " + res.filesize
+ " " + res.task);
bytesWritten += res.filesize;
}
}
System.out.println("Stress test duration: " + duration + "ms");
if (failures > 0) {
System.out.println(failures + " failures of " + count + " documents!!!");
} else {
float mb = 1024f * 1024f;
System.out.println("Bytes written: " + (bytesWritten / mb) + " MB, "
+ (bytesWritten * 1000 / duration) + " bytes / sec");
System.out.println("NO failures with " + count + " documents.");
}
}
private class TaskRunner implements Runnable {
public void run() {
try {
for (int r = 0; r < repeat; r++) {
for (Object aTaskList : taskList) {
TaskDef def = (TaskDef) aTaskList;
final Task task = new Task(def, counter++, foprocessor);
// ContainerUtil.enableLogging(task, logger);
task.execute();
}
}
} catch (Exception e) {
LOG.error("Thread ended with an exception", e);
}
}
}
/**
* Creates a new FOProcessor.
* @return the newly created instance
*/
public Processor createFOProcessor() {
try {
Class clazz = Class.forName(this.fopCfg.getAttribute("class",
"org.apache.fop.threading.FOProcessorImpl"));
Processor fop = (Processor)clazz.getDeclaredConstructor().newInstance();
// ContainerUtil.enableLogging(fop, logger);
ContainerUtil.configure(fop, this.fopCfg);
ContainerUtil.initialize(fop);
return fop;
} catch (Exception e) {
throw new RuntimeException("Error creating FO Processor", e);
}
}
private class TaskDef {
private String fo;
private String xml;
private String xslt;
private Templates templates;
public TaskDef(String fo) {
this.fo = fo;
}
public TaskDef(Configuration cfg) throws ConfigurationException {
this.fo = cfg.getAttribute("fo", null);
if (this.fo == null) {
this.xml = cfg.getAttribute("xml");
this.xslt = cfg.getAttribute("xslt", null);
if (this.xslt != null) {
TransformerFactory factory = TransformerFactory.newInstance();
Source xsltSource = new StreamSource(new File(xslt));
try {
this.templates = factory.newTemplates(xsltSource);
} catch (TransformerConfigurationException tce) {
throw new ConfigurationException("Invalid XSLT", tce);
}
}
}
}
public String getFO() {
return this.fo;
}
public String getXML() {
return this.xml;
}
public Templates getTemplates() {
return this.templates;
}
public String toString() {
StringBuffer sb = new StringBuffer();
if (this.fo != null) {
sb.append("fo=");
sb.append(this.fo);
} else {
sb.append("xml=");
sb.append(this.xml);
sb.append(" xslt=");
sb.append(this.xslt);
}
return sb.toString();
}
}
private class Task {
private TaskDef def;
private int num;
private Processor fop;
public Task(TaskDef def, int num, Processor fop) {
this.def = def;
this.num = num;
this.fop = fop;
}
public void execute() throws Exception {
LOG.info("Processing: " + def);
long start = System.currentTimeMillis();
try {
DecimalFormat df = new DecimalFormat("00000");
File outfile = new File(outputDir, df.format(num) + fop.getTargetFileExtension());
OutputStream out;
if (writeToDevNull) {
out = new NullOutputStream();
} else {
out = new java.io.FileOutputStream(outfile);
out = new java.io.BufferedOutputStream(out);
}
CountingOutputStream cout = new CountingOutputStream(out);
try {
Source src;
Templates templates;
if (def.getFO() != null) {
src = new StreamSource(new File(def.getFO()));
templates = null;
} else {
src = new StreamSource(new File(def.getXML()));
templates = def.getTemplates();
}
fop.process(src, templates, cout);
} finally {
IOUtils.closeQuietly(cout);
}
results.add(new Result(def, start, System.currentTimeMillis(),
cout.getByteCount()));
} catch (Exception e) {
results.add(new Result(def, start, System.currentTimeMillis(), e));
throw e;
}
}
}
private static class Result {
private TaskDef task;
private long start;
private long end;
private long filesize;
private Throwable failure;
public Result(TaskDef task, long start, long end, long filesize) {
this(task, start, end, null);
this.filesize = filesize;
}
public Result(TaskDef task, long start, long end, Throwable failure) {
this.task = task;
this.start = start;
this.end = end;
this.failure = failure;
}
}
}