| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| /* $Id$ */ |
| |
| package org.apache.fop.threading; |
| |
| import java.io.File; |
| import java.io.OutputStream; |
| import java.text.DecimalFormat; |
| import java.util.Collections; |
| import java.util.List; |
| |
| import javax.xml.transform.Source; |
| import javax.xml.transform.Templates; |
| import javax.xml.transform.TransformerConfigurationException; |
| import javax.xml.transform.TransformerFactory; |
| import javax.xml.transform.stream.StreamSource; |
| |
| import org.apache.commons.io.IOUtils; |
| import org.apache.commons.io.output.CountingOutputStream; |
| import org.apache.commons.io.output.NullOutputStream; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| |
| import org.apache.fop.activity.ContainerUtil; |
| import org.apache.fop.activity.Initializable; |
| import org.apache.fop.configuration.Configurable; |
| import org.apache.fop.configuration.Configuration; |
| import org.apache.fop.configuration.ConfigurationException; |
| |
| /** |
| * Testbed for multi-threading tests. The class can run a configurable set of task a number of |
| * times in a configurable number of threads to easily reproduce multi-threading issues. |
| */ |
| public class FOPTestbed |
| implements Configurable, Initializable { |
| |
| private static final Log LOG = LogFactory.getLog(FOPTestbed.class); |
| |
| private int repeat; |
| private List taskList = new java.util.ArrayList(); |
| private int threads; |
| private File outputDir; |
| private Configuration fopCfg; |
| private Processor foprocessor; |
| private boolean writeToDevNull; |
| |
| private int counter; |
| |
| private List results = Collections.synchronizedList(new java.util.LinkedList()); |
| |
| /** {@inheritDoc} */ |
| public void configure(Configuration configuration) throws ConfigurationException { |
| this.threads = configuration.getChild("threads").getValueAsInteger(10); |
| this.outputDir = new File(configuration.getChild("output-dir").getValue()); |
| this.writeToDevNull = configuration.getChild("devnull").getValueAsBoolean(false); |
| Configuration tasks = configuration.getChild("tasks"); |
| this.repeat = tasks.getAttributeAsInteger("repeat", 1); |
| Configuration[] entries = tasks.getChildren("task"); |
| for (Configuration entry : entries) { |
| this.taskList.add(new TaskDef(entry)); |
| } |
| this.fopCfg = configuration.getChild("processor"); |
| } |
| |
| /** {@inheritDoc} */ |
| public void initialize() throws Exception { |
| this.foprocessor = createFOProcessor(); |
| } |
| |
| /** |
| * Starts the stress test. |
| */ |
| public void doStressTest() { |
| LOG.info("Starting stress test..."); |
| long start = System.currentTimeMillis(); |
| this.counter = 0; |
| |
| //Initialize threads |
| ThreadGroup workerGroup = new ThreadGroup("FOP workers"); |
| List threadList = new java.util.LinkedList(); |
| for (int ti = 0; ti < this.threads; ti++) { |
| TaskRunner runner = new TaskRunner(); |
| // ContainerUtil.enableLogging(runner, logger); |
| Thread thread = new Thread(workerGroup, runner, "Worker- " + ti); |
| threadList.add(thread); |
| } |
| |
| //Start threads |
| for (Object aThreadList : threadList) { |
| ((Thread) aThreadList).start(); |
| } |
| |
| //Wait for threads to end |
| while (threadList.size() > 0) { |
| Thread t = (Thread)threadList.get(0); |
| if (!t.isAlive()) { |
| threadList.remove(0); |
| continue; |
| } |
| try { |
| Thread.sleep(100); |
| } catch (InterruptedException ie) { |
| //ignore |
| } |
| } |
| long duration = System.currentTimeMillis() - start; |
| |
| report(duration); |
| } |
| |
| private void report(long duration) { |
| int count = this.results.size(); |
| int failures = 0; |
| long bytesWritten = 0; |
| System.out.println("Report on " + count + " tasks:"); |
| for (Object result : this.results) { |
| Result res = (Result) result; |
| if (res.failure != null) { |
| System.out.println("FAIL: " + (res.end - res.start) + " " + res.task); |
| System.out.println(" -> " + res.failure.getMessage()); |
| failures++; |
| } else { |
| System.out.println("good: " + (res.end - res.start) + " " + res.filesize |
| + " " + res.task); |
| bytesWritten += res.filesize; |
| } |
| } |
| System.out.println("Stress test duration: " + duration + "ms"); |
| if (failures > 0) { |
| System.out.println(failures + " failures of " + count + " documents!!!"); |
| } else { |
| float mb = 1024f * 1024f; |
| System.out.println("Bytes written: " + (bytesWritten / mb) + " MB, " |
| + (bytesWritten * 1000 / duration) + " bytes / sec"); |
| System.out.println("NO failures with " + count + " documents."); |
| } |
| } |
| |
| private class TaskRunner implements Runnable { |
| |
| public void run() { |
| try { |
| for (int r = 0; r < repeat; r++) { |
| for (Object aTaskList : taskList) { |
| TaskDef def = (TaskDef) aTaskList; |
| final Task task = new Task(def, counter++, foprocessor); |
| // ContainerUtil.enableLogging(task, logger); |
| task.execute(); |
| } |
| } |
| } catch (Exception e) { |
| LOG.error("Thread ended with an exception", e); |
| } |
| } |
| |
| } |
| |
| /** |
| * Creates a new FOProcessor. |
| * @return the newly created instance |
| */ |
| public Processor createFOProcessor() { |
| try { |
| Class clazz = Class.forName(this.fopCfg.getAttribute("class", |
| "org.apache.fop.threading.FOProcessorImpl")); |
| Processor fop = (Processor)clazz.getDeclaredConstructor().newInstance(); |
| // ContainerUtil.enableLogging(fop, logger); |
| ContainerUtil.configure(fop, this.fopCfg); |
| ContainerUtil.initialize(fop); |
| return fop; |
| } catch (Exception e) { |
| throw new RuntimeException("Error creating FO Processor", e); |
| } |
| } |
| |
| |
| private class TaskDef { |
| private String fo; |
| private String xml; |
| private String xslt; |
| private Templates templates; |
| |
| public TaskDef(String fo) { |
| this.fo = fo; |
| } |
| |
| public TaskDef(Configuration cfg) throws ConfigurationException { |
| this.fo = cfg.getAttribute("fo", null); |
| if (this.fo == null) { |
| this.xml = cfg.getAttribute("xml"); |
| this.xslt = cfg.getAttribute("xslt", null); |
| if (this.xslt != null) { |
| TransformerFactory factory = TransformerFactory.newInstance(); |
| Source xsltSource = new StreamSource(new File(xslt)); |
| try { |
| this.templates = factory.newTemplates(xsltSource); |
| } catch (TransformerConfigurationException tce) { |
| throw new ConfigurationException("Invalid XSLT", tce); |
| } |
| } |
| } |
| } |
| |
| public String getFO() { |
| return this.fo; |
| } |
| |
| public String getXML() { |
| return this.xml; |
| } |
| |
| public Templates getTemplates() { |
| return this.templates; |
| } |
| |
| public String toString() { |
| StringBuffer sb = new StringBuffer(); |
| if (this.fo != null) { |
| sb.append("fo="); |
| sb.append(this.fo); |
| } else { |
| sb.append("xml="); |
| sb.append(this.xml); |
| sb.append(" xslt="); |
| sb.append(this.xslt); |
| } |
| return sb.toString(); |
| } |
| } |
| |
| |
| private class Task { |
| |
| private TaskDef def; |
| private int num; |
| private Processor fop; |
| |
| public Task(TaskDef def, int num, Processor fop) { |
| this.def = def; |
| this.num = num; |
| this.fop = fop; |
| } |
| |
| |
| public void execute() throws Exception { |
| LOG.info("Processing: " + def); |
| long start = System.currentTimeMillis(); |
| try { |
| DecimalFormat df = new DecimalFormat("00000"); |
| File outfile = new File(outputDir, df.format(num) + fop.getTargetFileExtension()); |
| OutputStream out; |
| if (writeToDevNull) { |
| out = new NullOutputStream(); |
| } else { |
| out = new java.io.FileOutputStream(outfile); |
| out = new java.io.BufferedOutputStream(out); |
| } |
| CountingOutputStream cout = new CountingOutputStream(out); |
| try { |
| Source src; |
| Templates templates; |
| |
| if (def.getFO() != null) { |
| src = new StreamSource(new File(def.getFO())); |
| templates = null; |
| } else { |
| src = new StreamSource(new File(def.getXML())); |
| templates = def.getTemplates(); |
| } |
| fop.process(src, templates, cout); |
| } finally { |
| IOUtils.closeQuietly(cout); |
| } |
| results.add(new Result(def, start, System.currentTimeMillis(), |
| cout.getByteCount())); |
| } catch (Exception e) { |
| results.add(new Result(def, start, System.currentTimeMillis(), e)); |
| throw e; |
| } |
| } |
| } |
| |
| private static class Result { |
| |
| private TaskDef task; |
| private long start; |
| private long end; |
| private long filesize; |
| private Throwable failure; |
| |
| public Result(TaskDef task, long start, long end, long filesize) { |
| this(task, start, end, null); |
| this.filesize = filesize; |
| } |
| |
| public Result(TaskDef task, long start, long end, Throwable failure) { |
| this.task = task; |
| this.start = start; |
| this.end = end; |
| this.failure = failure; |
| } |
| } |
| |
| } |