blob: 4b62e02b447c0b0718bd5571a6892bce8f8687b7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.jmeter.visualizers.backend;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import org.apache.jmeter.config.Arguments;
import org.apache.jmeter.engine.util.NoThreadClone;
import org.apache.jmeter.samplers.Remoteable;
import org.apache.jmeter.samplers.SampleEvent;
import org.apache.jmeter.samplers.SampleListener;
import org.apache.jmeter.samplers.SampleResult;
import org.apache.jmeter.testelement.AbstractTestElement;
import org.apache.jmeter.testelement.TestElement;
import org.apache.jmeter.testelement.TestStateListener;
import org.apache.jmeter.testelement.property.TestElementProperty;
import org.apache.jmeter.visualizers.backend.graphite.GraphiteBackendListenerClient;
import org.apache.jorphan.logging.LoggingManager;
import org.apache.log.Logger;
/**
* Async Listener that delegates SampleResult handling to implementations of {@link BackendListenerClient}
* @since 2.13
*/
public class BackendListener extends AbstractTestElement
implements Serializable, SampleListener, TestStateListener, NoThreadClone, Remoteable {
/**
*
*/
private static final class ListenerClientData {
private BackendListenerClient client;
private BlockingQueue<SampleResult> queue;
private AtomicLong queueWaits; // how many times we had to wait to queue a SampleResult
private AtomicLong queueWaitTime; // how long we had to wait (nanoSeconds)
// @GuardedBy("LOCK")
private int instanceCount; // number of active tests
private CountDownLatch latch;
}
/**
*
*/
private static final long serialVersionUID = 8184103677832024335L;
private static final Logger LOGGER = LoggingManager.getLoggerForClass();
/**
* Property key representing the classname of the BackendListenerClient to user.
*/
public static final String CLASSNAME = "classname";
/**
* Queue size
*/
public static final String QUEUE_SIZE = "QUEUE_SIZE";
/**
* Lock used to protect accumulators update + instanceCount update
*/
private static final Object LOCK = new Object();
/**
* Property key representing the arguments for the BackendListenerClient.
*/
public static final String ARGUMENTS = "arguments";
/**
* The BackendListenerClient class used by this sampler.
* Created by testStarted; copied to cloned instances.
*/
private Class<?> clientClass;
public static final String DEFAULT_QUEUE_SIZE = "5000";
// Create unique object as marker for end of queue
private transient static final SampleResult FINAL_SAMPLE_RESULT = new SampleResult();
// Name of the test element. Set up by testStarted().
private transient String myName;
// Holds listenerClientData for this test element
private transient ListenerClientData listenerClientData;
/*
* This is needed for distributed testing where there is 1 instance
* per server. But we need the total to be shared.
*/
//@GuardedBy("LOCK") - needed to ensure consistency between this and instanceCount
private static final Map<String, ListenerClientData> queuesByTestElementName =
new ConcurrentHashMap<>();
/**
* Create a BackendListener.
*/
public BackendListener() {
synchronized (LOCK) {
queuesByTestElementName.clear();
}
setArguments(new Arguments());
}
/*
* Ensure that the required class variables are cloned,
* as this is not currently done by the super-implementation.
*/
@Override
public Object clone() {
BackendListener clone = (BackendListener) super.clone();
clone.clientClass = this.clientClass;
return clone;
}
private Class<?> initClass() {
String name = getClassname().trim();
try {
return Class.forName(name, false, Thread.currentThread().getContextClassLoader());
} catch (Exception e) {
LOGGER.error(whoAmI() + "\tException initialising: " + name, e);
}
return null;
}
/**
* Generate a String identifier of this instance for debugging purposes.
*
* @return a String identifier for this sampler instance
*/
private String whoAmI() {
StringBuilder sb = new StringBuilder();
sb.append(Thread.currentThread().getName());
sb.append("@");
sb.append(Integer.toHexString(hashCode()));
sb.append("-");
sb.append(getName());
return sb.toString();
}
/* (non-Javadoc)
* @see org.apache.jmeter.samplers.SampleListener#sampleOccurred(org.apache.jmeter.samplers.SampleEvent)
*/
@Override
public void sampleOccurred(SampleEvent event) {
Arguments args = getArguments();
BackendListenerContext context = new BackendListenerContext(args);
SampleResult sr = listenerClientData.client.createSampleResult(context, event.getResult());
if(sr == null) {
if(LOGGER.isDebugEnabled()) {
LOGGER.debug(getName()+"=>Dropping SampleResult:"+event.getResult());
}
return;
}
try {
if (!listenerClientData.queue.offer(sr)){ // we failed to add the element first time
listenerClientData.queueWaits.incrementAndGet();
long t1 = System.nanoTime();
listenerClientData.queue.put(sr);
long t2 = System.nanoTime();
listenerClientData.queueWaitTime.addAndGet(t2-t1);
}
} catch (Exception err) {
LOGGER.error("sampleOccurred, failed to queue the sample", err);
}
}
/**
* Thread that dequeus data from queue to send it to {@link BackendListenerClient}
*/
private static final class Worker extends Thread {
private final ListenerClientData listenerClientData;
private final BackendListenerContext context;
private final BackendListenerClient backendListenerClient;
private Worker(BackendListenerClient backendListenerClient, Arguments arguments, ListenerClientData listenerClientData){
this.listenerClientData = listenerClientData;
// Allow BackendListenerClient implementations to get access to test element name
arguments.addArgument(TestElement.NAME, getName());
context = new BackendListenerContext(arguments);
this.backendListenerClient = backendListenerClient;
}
@Override
public void run() {
boolean isDebugEnabled = LOGGER.isDebugEnabled();
List<SampleResult> sampleResults = new ArrayList<>(listenerClientData.queue.size());
try {
try {
boolean endOfLoop = false;
while (!endOfLoop) {
if(isDebugEnabled) {
LOGGER.debug("Thread:"+Thread.currentThread().getName()+" taking SampleResult from queue:"+listenerClientData.queue.size());
}
SampleResult sampleResult = listenerClientData.queue.take();
if(isDebugEnabled) {
LOGGER.debug("Thread:"+Thread.currentThread().getName()+" took SampleResult:"+sampleResult+", isFinal:" + (sampleResult==FINAL_SAMPLE_RESULT));
}
while (!(endOfLoop = (sampleResult == FINAL_SAMPLE_RESULT)) && sampleResult != null ) { // try to process as many as possible
sampleResults.add(sampleResult);
if(isDebugEnabled) {
LOGGER.debug("Thread:"+Thread.currentThread().getName()+" polling from queue:"+listenerClientData.queue.size());
}
sampleResult = listenerClientData.queue.poll(); // returns null if nothing on queue currently
if(isDebugEnabled) {
LOGGER.debug("Thread:"+Thread.currentThread().getName()+" took from queue:"+sampleResult+", isFinal:" + (sampleResult==FINAL_SAMPLE_RESULT));
}
}
if(isDebugEnabled) {
LOGGER.debug("Thread:"+Thread.currentThread().getName()+
" exiting with FINAL EVENT:"+(sampleResult == FINAL_SAMPLE_RESULT)
+", null:" + (sampleResult==null));
}
sendToListener(backendListenerClient, context, sampleResults);
if(!endOfLoop) {
LockSupport.parkNanos(100);
}
}
} catch (InterruptedException e) {
// NOOP
}
// We may have been interrupted
sendToListener(backendListenerClient, context, sampleResults);
LOGGER.info("Worker ended");
} finally {
listenerClientData.latch.countDown();
}
}
}
/**
* Send sampleResults to {@link BackendListenerClient}
* @param backendListenerClient {@link BackendListenerClient}
* @param context {@link BackendListenerContext}
* @param sampleResults List of {@link SampleResult}
*/
static void sendToListener(
final BackendListenerClient backendListenerClient,
final BackendListenerContext context,
final List<SampleResult> sampleResults) {
if (sampleResults.size() > 0) {
backendListenerClient.handleSampleResults(sampleResults, context);
sampleResults.clear();
}
}
/**
* Returns reference to {@link BackendListener}
* @param clientClass {@link BackendListenerClient} client class
* @return BackendListenerClient reference.
*/
static BackendListenerClient createBackendListenerClientImpl(Class<?> clientClass) {
if (clientClass == null) { // failed to initialise the class
return new ErrorBackendListenerClient();
}
try {
return (BackendListenerClient) clientClass.newInstance();
} catch (Exception e) {
LOGGER.error("Exception creating: " + clientClass, e);
return new ErrorBackendListenerClient();
}
}
// TestStateListener implementation
/**
* Implements TestStateListener.testStarted()
**/
@Override
public void testStarted() {
testStarted("local"); //$NON-NLS-1$
}
/** Implements TestStateListener.testStarted(String)
**/
@Override
public void testStarted(String host) {
if(LOGGER.isDebugEnabled()){
LOGGER.debug(whoAmI() + "\ttestStarted(" + host + ")");
}
int queueSize;
final String size = getQueueSize();
try {
queueSize = Integer.parseInt(size);
} catch (NumberFormatException nfe) {
LOGGER.warn("Invalid queue size '" + size + "' defaulting to " + DEFAULT_QUEUE_SIZE);
queueSize = Integer.parseInt(DEFAULT_QUEUE_SIZE);
}
synchronized (LOCK) {
myName = getName();
listenerClientData = queuesByTestElementName.get(myName);
if (listenerClientData == null){
// We need to do this to ensure in Distributed testing
// that only 1 instance of BackendListenerClient is used
clientClass = initClass(); // may be null
BackendListenerClient backendListenerClient = createBackendListenerClientImpl(clientClass);
BackendListenerContext context = new BackendListenerContext((Arguments)getArguments().clone());
listenerClientData = new ListenerClientData();
listenerClientData.queue = new ArrayBlockingQueue<>(queueSize);
listenerClientData.queueWaits = new AtomicLong(0L);
listenerClientData.queueWaitTime = new AtomicLong(0L);
listenerClientData.latch = new CountDownLatch(1);
listenerClientData.client = backendListenerClient;
LOGGER.info(getName()+":Starting worker with class:"+clientClass +" and queue capacity:"+getQueueSize());
Worker worker = new Worker(backendListenerClient, (Arguments) getArguments().clone(), listenerClientData);
worker.setDaemon(true);
worker.start();
LOGGER.info(getName()+": Started worker with class:"+clientClass);
try {
backendListenerClient.setupTest(context);
} catch (Exception e) {
throw new java.lang.IllegalStateException("Failed calling setupTest", e);
}
queuesByTestElementName.put(myName, listenerClientData);
}
listenerClientData.instanceCount++;
}
}
/**
* Method called at the end of the test. This is called only on one instance
* of BackendListener. This method will loop through all of the other
* BackendListenerClients which have been registered (automatically in the
* constructor) and notify them that the test has ended, allowing the
* BackendListenerClients to cleanup.
* Implements TestStateListener.testEnded(String)
*/
@Override
public void testEnded(String host) {
synchronized (LOCK) {
ListenerClientData listenerClientData = queuesByTestElementName.get(myName);
if(LOGGER.isDebugEnabled()) {
LOGGER.debug("testEnded called on instance "+myName+"#"+listenerClientData.instanceCount);
}
listenerClientData.instanceCount--;
if (listenerClientData.instanceCount > 0){
// Not the last instance of myName
return;
}
}
try {
listenerClientData.queue.put(FINAL_SAMPLE_RESULT);
} catch (Exception ex) {
LOGGER.warn("testEnded() with exception:"+ex.getMessage(), ex);
}
if (listenerClientData.queueWaits.get() > 0) {
LOGGER.warn("QueueWaits: "+listenerClientData.queueWaits+"; QueueWaitTime: "+listenerClientData.queueWaitTime+
" (nanoseconds), you may need to increase queue capacity, see property 'backend_queue_capacity'");
}
try {
listenerClientData.latch.await();
BackendListenerContext context = new BackendListenerContext(getArguments());
listenerClientData.client.teardownTest(context);
} catch (Exception e) {
throw new java.lang.IllegalStateException("Failed calling teardownTest", e);
}
}
/** Implements TestStateListener.testEnded(String)
**/
@Override
public void testEnded() {
testEnded("local"); //$NON-NLS-1$
}
/**
* A {@link BackendListenerClient} implementation used for error handling. If an
* error occurs while creating the real BackendListenerClient object, it is
* replaced with an instance of this class. Each time a sample occurs with
* this class, the result is marked as a failure so the user can see that
* the test failed.
*/
static class ErrorBackendListenerClient extends AbstractBackendListenerClient {
/**
* Return SampleResult with data on error.
*
* @see BackendListenerClient#handleSampleResults(List, BackendListenerContext)
*/
@Override
public void handleSampleResults(List<SampleResult> sampleResults, BackendListenerContext context) {
LOGGER.warn("ErrorBackendListenerClient#handleSampleResult called, noop");
Thread.yield();
}
}
/* (non-Javadoc)
* @see org.apache.jmeter.samplers.SampleListener#sampleStarted(org.apache.jmeter.samplers.SampleEvent)
*/
@Override
public void sampleStarted(SampleEvent e) {
// NOOP
}
/* (non-Javadoc)
* @see org.apache.jmeter.samplers.SampleListener#sampleStopped(org.apache.jmeter.samplers.SampleEvent)
*/
@Override
public void sampleStopped(SampleEvent e) {
// NOOP
}
/**
* Set the arguments (parameters) for the BackendListenerClient to be executed
* with.
*
* @param args
* the new arguments. These replace any existing arguments.
*/
public void setArguments(Arguments args) {
// Bug 59173 - don't save new default argument
args.removeArgument(GraphiteBackendListenerClient.USE_REGEXP_FOR_SAMPLERS_LIST,
GraphiteBackendListenerClient.USE_REGEXP_FOR_SAMPLERS_LIST_DEFAULT);
setProperty(new TestElementProperty(ARGUMENTS, args));
}
/**
* Get the arguments (parameters) for the BackendListenerClient to be executed
* with.
*
* @return the arguments
*/
public Arguments getArguments() {
return (Arguments) getProperty(ARGUMENTS).getObjectValue();
}
/**
* Sets the Classname of the BackendListenerClient object
*
* @param classname
* the new Classname value
*/
public void setClassname(String classname) {
setProperty(CLASSNAME, classname);
}
/**
* Gets the Classname of the BackendListenerClient object
*
* @return the Classname value
*/
public String getClassname() {
return getPropertyAsString(CLASSNAME);
}
/**
* Sets the queue size
*
* @param queueSize the size of the queue
*
*/
public void setQueueSize(String queueSize) {
setProperty(QUEUE_SIZE, queueSize, DEFAULT_QUEUE_SIZE);
}
/**
* Gets the queue size
*
* @return int queueSize
*/
public String getQueueSize() {
return getPropertyAsString(QUEUE_SIZE, DEFAULT_QUEUE_SIZE);
}
}