| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.sling.event.it; |
| |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Dictionary; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Hashtable; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import org.apache.sling.discovery.TopologyEvent; |
| import org.apache.sling.discovery.TopologyEvent.Type; |
| import org.apache.sling.discovery.TopologyEventListener; |
| import org.apache.sling.discovery.TopologyView; |
| import org.apache.sling.event.impl.jobs.config.ConfigurationConstants; |
| import org.apache.sling.event.jobs.Job; |
| import org.apache.sling.event.jobs.JobManager; |
| import org.apache.sling.event.jobs.NotificationConstants; |
| import org.apache.sling.event.jobs.QueueConfiguration; |
| import org.apache.sling.event.jobs.consumer.JobConsumer; |
| import org.apache.sling.testing.tools.sling.TimeoutsProvider; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.ops4j.pax.exam.junit.PaxExam; |
| import org.osgi.framework.InvalidSyntaxException; |
| import org.osgi.framework.ServiceReference; |
| import org.osgi.framework.ServiceRegistration; |
| import org.osgi.service.event.Event; |
| import org.osgi.service.event.EventHandler; |
| |
| @RunWith(PaxExam.class) |
| public class ChaosTest extends AbstractJobHandlingTest { |
| |
| /** Duration for firing jobs in seconds. */ |
| private static final long DURATION = 1 * 60; |
| |
| private static final int NUM_ORDERED_THREADS = 3; |
| private static final int NUM_PARALLEL_THREADS = 6; |
| private static final int NUM_ROUND_THREADS = 6; |
| |
| private static final int NUM_ORDERED_TOPICS = 2; |
| private static final int NUM_PARALLEL_TOPICS = 8; |
| private static final int NUM_ROUND_TOPICS = 8; |
| |
| private static final String ORDERED_TOPIC_PREFIX = "sling/chaos/ordered/"; |
| private static final String PARALLEL_TOPIC_PREFIX = "sling/chaos/parallel/"; |
| private static final String ROUND_TOPIC_PREFIX = "sling/chaos/round/"; |
| |
| private static final String[] ORDERED_TOPICS = new String[NUM_ORDERED_TOPICS]; |
| private static final String[] PARALLEL_TOPICS = new String[NUM_PARALLEL_TOPICS]; |
| private static final String[] ROUND_TOPICS = new String[NUM_ROUND_TOPICS]; |
| |
| static { |
| for(int i=0; i<NUM_ORDERED_TOPICS; i++) { |
| ORDERED_TOPICS[i] = ORDERED_TOPIC_PREFIX + String.valueOf(i); |
| } |
| for(int i=0; i<NUM_PARALLEL_TOPICS; i++) { |
| PARALLEL_TOPICS[i] = PARALLEL_TOPIC_PREFIX + String.valueOf(i); |
| } |
| for(int i=0; i<NUM_ROUND_TOPICS; i++) { |
| ROUND_TOPICS[i] = ROUND_TOPIC_PREFIX + String.valueOf(i); |
| } |
| } |
| |
| @Override |
| @Before |
| public void setup() throws IOException { |
| super.setup(); |
| |
| // create ordered test queue |
| final org.osgi.service.cm.Configuration orderedConfig = this.configAdmin.createFactoryConfiguration("org.apache.sling.event.jobs.QueueConfiguration", null); |
| final Dictionary<String, Object> orderedProps = new Hashtable<String, Object>(); |
| orderedProps.put(ConfigurationConstants.PROP_NAME, "chaos-ordered"); |
| orderedProps.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.ORDERED.name()); |
| orderedProps.put(ConfigurationConstants.PROP_TOPICS, ORDERED_TOPICS); |
| orderedProps.put(ConfigurationConstants.PROP_RETRIES, 2); |
| orderedProps.put(ConfigurationConstants.PROP_RETRY_DELAY, 2000L); |
| orderedConfig.update(orderedProps); |
| |
| // create round robin test queue |
| final org.osgi.service.cm.Configuration rrConfig = this.configAdmin.createFactoryConfiguration("org.apache.sling.event.jobs.QueueConfiguration", null); |
| final Dictionary<String, Object> rrProps = new Hashtable<String, Object>(); |
| rrProps.put(ConfigurationConstants.PROP_NAME, "chaos-roundrobin"); |
| rrProps.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.TOPIC_ROUND_ROBIN.name()); |
| rrProps.put(ConfigurationConstants.PROP_TOPICS, ROUND_TOPICS); |
| rrProps.put(ConfigurationConstants.PROP_RETRIES, 2); |
| rrProps.put(ConfigurationConstants.PROP_RETRY_DELAY, 2000L); |
| rrProps.put(ConfigurationConstants.PROP_MAX_PARALLEL, 5); |
| rrConfig.update(rrProps); |
| |
| this.sleep(1000L); |
| } |
| |
| @Override |
| @After |
| public void cleanup() { |
| super.cleanup(); |
| } |
| |
| /** |
| * Setup consumers |
| */ |
| private void setupJobConsumers() { |
| for(int i=0; i<NUM_ORDERED_TOPICS; i++) { |
| this.registerJobConsumer(ORDERED_TOPICS[i], |
| |
| new JobConsumer() { |
| |
| @Override |
| public JobResult process(final Job job) { |
| return JobResult.OK; |
| } |
| }); |
| } |
| for(int i=0; i<NUM_PARALLEL_TOPICS; i++) { |
| this.registerJobConsumer(PARALLEL_TOPICS[i], |
| |
| new JobConsumer() { |
| |
| @Override |
| public JobResult process(final Job job) { |
| return JobResult.OK; |
| } |
| }); |
| } |
| for(int i=0; i<NUM_ROUND_TOPICS; i++) { |
| this.registerJobConsumer(ROUND_TOPICS[i], |
| |
| new JobConsumer() { |
| |
| @Override |
| public JobResult process(final Job job) { |
| return JobResult.OK; |
| } |
| }); |
| } |
| } |
| |
| private static final class CreateJobThread extends Thread { |
| |
| private final String[] topics; |
| |
| private final JobManager jobManager; |
| |
| private final Random random = new Random(); |
| |
| final Map<String, AtomicLong> created; |
| |
| final AtomicLong finishedThreads; |
| |
| public CreateJobThread(final JobManager jobManager, |
| final String[] topics, |
| final Map<String, AtomicLong> created, |
| final AtomicLong finishedThreads) { |
| this.topics = topics; |
| this.jobManager = jobManager; |
| this.created = created; |
| this.finishedThreads = finishedThreads; |
| } |
| |
| @Override |
| public void run() { |
| int index = 0; |
| final long startTime = System.currentTimeMillis(); |
| final long endTime = startTime + DURATION * 1000; |
| while ( System.currentTimeMillis() < endTime ) { |
| final String topic = topics[index]; |
| if ( jobManager.addJob(topic, null) != null ) { |
| created.get(topic).incrementAndGet(); |
| |
| index++; |
| if ( index == topics.length ) { |
| index = 0; |
| } |
| } |
| final int sleepTime = random.nextInt(200); |
| try { |
| Thread.sleep(sleepTime); |
| } catch ( final InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| finishedThreads.incrementAndGet(); |
| } |
| |
| } |
| |
| /** |
| * Setup job creation threads |
| */ |
| private void setupJobCreationThreads(final List<Thread> threads, |
| final JobManager jobManager, |
| final Map<String, AtomicLong> created, |
| final AtomicLong finishedThreads) { |
| for(int i=0;i<NUM_ORDERED_THREADS;i++) { |
| threads.add(new CreateJobThread(jobManager, ORDERED_TOPICS, created, finishedThreads)); |
| } |
| for(int i=0;i<NUM_PARALLEL_THREADS;i++) { |
| threads.add(new CreateJobThread(jobManager, PARALLEL_TOPICS, created, finishedThreads)); |
| } |
| for(int i=0;i<NUM_ROUND_THREADS;i++) { |
| threads.add(new CreateJobThread(jobManager, ROUND_TOPICS, created, finishedThreads)); |
| } |
| } |
| |
| /** |
| * Setup chaos thread(s) |
| * |
| * Chaos is right now created by sending topology changing/changed events randomly |
| */ |
| private void setupChaosThreads(final List<Thread> threads, |
| final AtomicLong finishedThreads) { |
| final List<TopologyView> views = new ArrayList<TopologyView>(); |
| // register topology listener |
| final ServiceRegistration<TopologyEventListener> reg = this.bc.registerService(TopologyEventListener.class, new TopologyEventListener() { |
| |
| @Override |
| public void handleTopologyEvent(final TopologyEvent event) { |
| if ( event.getType() == Type.TOPOLOGY_INIT ) { |
| views.add(event.getNewView()); |
| } |
| } |
| }, null); |
| while ( views.isEmpty() ) { |
| this.sleep(10); |
| } |
| reg.unregister(); |
| final TopologyView view = views.get(0); |
| |
| try { |
| final Collection<ServiceReference<TopologyEventListener>> refs = this.bc.getServiceReferences(TopologyEventListener.class, null); |
| assertNotNull(refs); |
| assertFalse(refs.isEmpty()); |
| TopologyEventListener found = null; |
| for(final ServiceReference<TopologyEventListener> ref : refs) { |
| final TopologyEventListener listener = this.bc.getService(ref); |
| if ( listener != null && listener.getClass().getName().equals("org.apache.sling.event.impl.jobs.config.TopologyHandler") ) { |
| found = listener; |
| break; |
| } |
| bc.ungetService(ref); |
| } |
| assertNotNull(found); |
| final TopologyEventListener tel = found; |
| |
| threads.add(new Thread() { |
| |
| private final Random random = new Random(); |
| |
| @Override |
| public void run() { |
| final long startTime = System.currentTimeMillis(); |
| // this thread runs 30 seconds longer than the job creation thread |
| final long endTime = startTime + (DURATION +30) * 1000; |
| while ( System.currentTimeMillis() < endTime ) { |
| final int sleepTime = random.nextInt(25) + 15; |
| try { |
| Thread.sleep(sleepTime * 1000); |
| } catch ( final InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| } |
| tel.handleTopologyEvent(new TopologyEvent(Type.TOPOLOGY_CHANGING, view, null)); |
| final int changingTime = random.nextInt(20) + 3; |
| try { |
| Thread.sleep(changingTime * 1000); |
| } catch ( final InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| } |
| tel.handleTopologyEvent(new TopologyEvent(Type.TOPOLOGY_CHANGED, view, view)); |
| } |
| tel.getClass().getName(); |
| finishedThreads.incrementAndGet(); |
| } |
| }); |
| } catch (InvalidSyntaxException e) { |
| e.printStackTrace(); |
| } |
| } |
| |
| @Test(timeout=DURATION * 16000L) |
| public void testDoChaos() throws Exception { |
| final JobManager jobManager = this.getJobManager(); |
| |
| // setup added, created and finished map |
| // added and finished are filled by notifications |
| // created is filled by the threads starting jobs |
| final Map<String, AtomicLong> added = new HashMap<String, AtomicLong>(); |
| final Map<String, AtomicLong> created = new HashMap<String, AtomicLong>(); |
| final Map<String, AtomicLong> finished = new HashMap<String, AtomicLong>(); |
| final List<String> topics = new ArrayList<String>(); |
| for(int i=0;i<NUM_ORDERED_TOPICS;i++) { |
| added.put(ORDERED_TOPICS[i], new AtomicLong()); |
| created.put(ORDERED_TOPICS[i], new AtomicLong()); |
| finished.put(ORDERED_TOPICS[i], new AtomicLong()); |
| topics.add(ORDERED_TOPICS[i]); |
| } |
| for(int i=0;i<NUM_PARALLEL_TOPICS;i++) { |
| added.put(PARALLEL_TOPICS[i], new AtomicLong()); |
| created.put(PARALLEL_TOPICS[i], new AtomicLong()); |
| finished.put(PARALLEL_TOPICS[i], new AtomicLong()); |
| topics.add(PARALLEL_TOPICS[i]); |
| } |
| for(int i=0;i<NUM_ROUND_TOPICS;i++) { |
| added.put(ROUND_TOPICS[i], new AtomicLong()); |
| created.put(ROUND_TOPICS[i], new AtomicLong()); |
| finished.put(ROUND_TOPICS[i], new AtomicLong()); |
| topics.add(ROUND_TOPICS[i]); |
| } |
| |
| final List<Thread> threads = new ArrayList<Thread>(); |
| final AtomicLong finishedThreads = new AtomicLong(); |
| |
| this.registerEventHandler("org/apache/sling/event/notification/job/*", |
| new EventHandler() { |
| |
| @Override |
| public void handleEvent(final Event event) { |
| final String topic = (String) event.getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC); |
| if ( NotificationConstants.TOPIC_JOB_FINISHED.equals(event.getTopic())) { |
| finished.get(topic).incrementAndGet(); |
| } else if ( NotificationConstants.TOPIC_JOB_ADDED.equals(event.getTopic())) { |
| added.get(topic).incrementAndGet(); |
| } |
| } |
| }); |
| |
| // setup job consumers |
| this.setupJobConsumers(); |
| |
| // setup job creation tests |
| this.setupJobCreationThreads(threads, jobManager, created, finishedThreads); |
| |
| this.setupChaosThreads(threads, finishedThreads); |
| |
| System.out.println("Starting threads..."); |
| // start threads |
| for(final Thread t : threads) { |
| t.setDaemon(true); |
| t.start(); |
| } |
| |
| System.out.println("Sleeping for " + DURATION + " seconds to wait for threads to finish..."); |
| // for sure we can sleep for the duration |
| this.sleep(DURATION * 1000); |
| |
| System.out.println("Polling for threads to finish..."); |
| // wait until threads are finished |
| while ( finishedThreads.get() < threads.size() ) { |
| this.sleep(100); |
| } |
| |
| System.out.println("Waiting for job handling to finish..."); |
| final Set<String> allTopics = new HashSet<String>(topics); |
| while ( !allTopics.isEmpty() ) { |
| final Iterator<String> iter = allTopics.iterator(); |
| while ( iter.hasNext() ) { |
| final String topic = iter.next(); |
| if ( finished.get(topic).get() == created.get(topic).get() ) { |
| iter.remove(); |
| } |
| } |
| this.sleep(100); |
| } |
| /* We could try to enable this with Oak again - but right now JR observation handler is too |
| * slow. |
| System.out.println("Checking notifications..."); |
| for(final String topic : topics) { |
| assertEquals("Checking topic " + topic, created.get(topic).get(), added.get(topic).get()); |
| } |
| */ |
| |
| } |
| } |