| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.oozie.service; |
| |
| import junit.framework.Assert; |
| import org.apache.oozie.ErrorCode; |
| import org.apache.oozie.command.CommandException; |
| import org.apache.oozie.command.PreconditionException; |
| import org.apache.oozie.command.XCommand; |
| import org.apache.oozie.test.XTestCase; |
| import org.apache.oozie.util.XCallable; |
| import org.apache.oozie.util.XLog; |
| |
| import com.google.common.collect.Multimap; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.UUID; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| public class TestCallableQueueService extends XTestCase { |
| static AtomicLong EXEC_ORDER = new AtomicLong(); |
| private static XLog log = XLog.getLog(TestCallableQueueService.class); |
| private AtomicLong counter = new AtomicLong(); |
| private CountDownLatch finished = new CountDownLatch(1); |
| |
| public static class MyCallable implements XCallable<Void> { |
| String type; |
| int priority; |
| long executed = 0; |
| int wait; |
| long order; |
| long created = System.currentTimeMillis(); |
| private String name = "myCallable"; |
| private String key = null; |
| |
| public MyCallable() { |
| this(0, 0); |
| } |
| |
| @Override |
| public String getName() { |
| return name; |
| } |
| |
| @Override |
| public String getType() { |
| return type; |
| } |
| |
| public MyCallable(String type, int priority, int wait) { |
| this.type = type; |
| this.priority = priority; |
| this.wait = wait; |
| this.key = name + "_" + UUID.randomUUID(); |
| } |
| |
| public MyCallable(String key, String type, int priority, int wait) { |
| this.type = type; |
| this.priority = priority; |
| this.wait = wait; |
| this.key = key; |
| } |
| |
| public MyCallable(int priority, int wait) { |
| this("type", priority, wait); |
| } |
| |
| @Override |
| public int getPriority() { |
| return this.priority; |
| } |
| |
| @Override |
| public long getCreatedTime() { |
| return created; |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder sb = new StringBuilder(); |
| sb.append("Type:").append(getType()); |
| sb.append(",Priority:").append(getPriority()); |
| sb.append(",Key:").append(getKey()); |
| sb.append(",Wait:").append(wait); |
| return sb.toString(); |
| } |
| |
| public Void call() throws Exception { |
| order = EXEC_ORDER.getAndIncrement(); |
| Thread.sleep(wait); |
| executed = System.currentTimeMillis(); |
| return null; |
| } |
| |
| @Override |
| public String getKey() { |
| return this.key; |
| } |
| |
| @Override |
| public String getEntityKey() { |
| return null; |
| } |
| |
| @Override |
| public void setInterruptMode(boolean mode) { |
| } |
| |
| @Override |
| public boolean inInterruptMode() { |
| return false; |
| } |
| } |
| |
| public static class ExtendedXCommand extends XCommand<Void> { |
| private boolean lockRequired = true; |
| public String lockKey; |
| public long wait; |
| long executed; |
| |
| public ExtendedXCommand(String key, String type, int priority, int wait, String lockKey, boolean lockRequired) { |
| super(key, type, priority, false); |
| this.lockRequired = lockRequired; |
| this.lockKey = lockKey; |
| this.wait = wait; |
| } |
| |
| public ExtendedXCommand(String key, String type, int priority, int wait, String lockKey) { |
| super(key, type, priority, false); |
| this.lockKey = lockKey; |
| this.wait = wait; |
| } |
| |
| @Override |
| protected boolean isLockRequired() { |
| return this.lockRequired; |
| } |
| |
| @Override |
| protected boolean isReQueueRequired() { |
| return false; |
| } |
| |
| @Override |
| public String getEntityKey() { |
| return this.lockKey; |
| } |
| |
| @Override |
| protected void eagerLoadState() { |
| } |
| |
| @Override |
| protected void eagerVerifyPrecondition() throws CommandException { |
| } |
| |
| @Override |
| protected void loadState() { |
| } |
| |
| @Override |
| protected void verifyPrecondition() throws CommandException { |
| } |
| |
| @Override |
| protected Void execute() throws CommandException { |
| if (executed == 0) { |
| try { |
| Thread.sleep(this.wait); |
| } |
| catch (InterruptedException exp) { |
| throw new CommandException(ErrorCode.ETEST, "invalid_id"); |
| } |
| executed = System.currentTimeMillis(); |
| ; |
| } |
| return null; |
| } |
| } |
| |
| @Override |
| protected void setUp() throws Exception { |
| super.setUp(); |
| new Services().init(); |
| } |
| |
| @Override |
| protected void tearDown() throws Exception { |
| Services.get().destroy(); |
| super.tearDown(); |
| } |
| |
| public void testQueuing() throws Exception { |
| CallableQueueService queueservice = Services.get().get(CallableQueueService.class); |
| |
| final MyCallable callable = new MyCallable(); |
| queueservice.queue(callable); |
| waitFor(1000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| return callable.executed != 0; |
| } |
| }); |
| assertTrue(callable.executed != 0); |
| } |
| |
| public void testDelayedQueuing() throws Exception { |
| CallableQueueService queueservice = Services.get().get(CallableQueueService.class); |
| |
| final MyCallable callable = new MyCallable(); |
| long scheduled = System.currentTimeMillis(); |
| queueservice.queue(callable, 1000); |
| waitFor(3000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| return callable.executed != 0; |
| } |
| }); |
| assertTrue(callable.executed >= scheduled + 1000); |
| } |
| |
| public void testPriorityExecution() throws Exception { |
| EXEC_ORDER = new AtomicLong(); |
| Services.get().destroy(); |
| setSystemProperty(CallableQueueService.CONF_THREADS, "1"); |
| new Services().init(); |
| |
| CallableQueueService queueservice = Services.get().get(CallableQueueService.class); |
| |
| final MyCallable callable1 = new MyCallable(0, 200); |
| final MyCallable callable2 = new MyCallable(0, 200); |
| final MyCallable callable3 = new MyCallable(0, 200); |
| final MyCallable callableLow = new MyCallable(); |
| final MyCallable callableHigh = new MyCallable(1, 10); |
| |
| queueservice.queue(callable1); |
| queueservice.queue(callable2); |
| queueservice.queue(callable3); |
| queueservice.queue(callableLow); |
| queueservice.queue(callableHigh); |
| |
| waitFor(3000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| return callable1.executed != 0 && callable2.executed != 0 && callable3.executed != 0 && |
| callableLow.executed != 0 && callableHigh.executed != 0; |
| } |
| }); |
| assertTrue(callable1.executed >= 0); |
| assertTrue(callable2.executed >= 0); |
| assertTrue(callable3.executed >= 0); |
| assertTrue(callableLow.executed >= 0); |
| assertTrue(callableHigh.executed >= 0); |
| assertTrue(callableHigh.order < callableLow.order); |
| } |
| |
| public void testQueueSerial() throws Exception { |
| EXEC_ORDER = new AtomicLong(); |
| final MyCallable callable1 = new MyCallable(0, 10); |
| final MyCallable callable2 = new MyCallable(0, 10); |
| final MyCallable callable3 = new MyCallable(0, 10); |
| |
| CallableQueueService queueservice = Services.get().get(CallableQueueService.class); |
| |
| queueservice.queueSerial(Arrays.asList(callable1, callable2, callable3)); |
| waitFor(100, new Predicate() { |
| public boolean evaluate() throws Exception { |
| return callable1.executed != 0 && callable2.executed != 0 && callable3.executed != 0; |
| } |
| }); |
| assertEquals(0, callable1.order); |
| assertEquals(1, callable2.order); |
| assertEquals(2, callable3.order); |
| } |
| |
| public static class CLCallable implements XCallable<Void> { |
| |
| @Override |
| public String getName() { |
| return "name"; |
| } |
| |
| @Override |
| public int getPriority() { |
| return 0; |
| } |
| |
| @Override |
| public String getType() { |
| return "type"; |
| } |
| |
| @Override |
| public String getKey() { |
| return "name" + "_" + UUID.randomUUID(); |
| } |
| |
| @Override |
| public String getEntityKey() { |
| return null; |
| } |
| |
| @Override |
| public long getCreatedTime() { |
| return 0; |
| } |
| |
| @Override |
| public void setInterruptMode(boolean mode) { |
| } |
| |
| @Override |
| public boolean inInterruptMode() { |
| return false; |
| } |
| |
| @Override |
| public Void call() throws Exception { |
| incr(); |
| Thread.sleep(100); |
| decr(); |
| return null; |
| } |
| |
| private static AtomicInteger counter = new AtomicInteger(); |
| private static int max = 0; |
| |
| private void incr() { |
| counter.incrementAndGet(); |
| max = Math.max(max, counter.intValue()); |
| } |
| |
| private void decr() { |
| counter.decrementAndGet(); |
| } |
| |
| public static int getConcurrency() { |
| return max; |
| } |
| |
| public static void resetConcurrency() { |
| max = 0; |
| } |
| |
| } |
| |
| public void testConcurrencyLimit() throws Exception { |
| CLCallable.resetConcurrency(); |
| final CallableQueueService queueservice = Services.get().get(CallableQueueService.class); |
| |
| for (int i = 0; i < 10; i++) { |
| queueservice.queue(new CLCallable(), 10); |
| } |
| |
| float originalRatio = XTestCase.WAITFOR_RATIO; |
| try{ |
| XTestCase.WAITFOR_RATIO = 1; |
| waitFor(2000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| return queueservice.queueSize() == 0; |
| } |
| }); |
| } |
| finally { |
| XTestCase.WAITFOR_RATIO = originalRatio; |
| } |
| |
| System.out.println("Callable Queue Size :" + queueservice.queueSize()); |
| System.out.println("CLCallable Concurrency :" + CLCallable.getConcurrency()); |
| |
| assertTrue(CLCallable.getConcurrency() <= 3); |
| } |
| |
| /** |
| * When using config 'oozie.service.CallableQueueService.callable.next.eligible' true, the next other type of callables |
| * should be invoked when top one in the queue is reached max concurrency. |
| * |
| * @throws Exception |
| */ |
| public void testConcurrencyReachedAndChooseNextEligible() throws Exception { |
| Services.get().destroy(); |
| setSystemProperty(CallableQueueService.CONF_CALLABLE_NEXT_ELIGIBLE, "true"); |
| new Services().init(); |
| |
| CLCallable.resetConcurrency(); |
| final CallableQueueService queueservice = Services.get().get(CallableQueueService.class); |
| |
| final MyCallable callable1 = new MyCallable(0, 100); |
| final MyCallable callable2 = new MyCallable(0, 100); |
| final MyCallable callable3 = new MyCallable(0, 100); |
| final MyCallable callable4 = new MyCallable(0, 100); |
| final MyCallable callable5 = new MyCallable(0, 100); |
| final MyCallable callable6 = new MyCallable(0, 100); |
| List<MyCallable> callables = Arrays.asList(callable1, callable2, callable3, callable4, callable5, callable6); |
| |
| final MyCallable callableOther = new MyCallable("other", 0, 100); |
| long now = System.currentTimeMillis(); |
| queueservice.queue(callableOther, 15); |
| |
| for (MyCallable c : callables) { |
| queueservice.queue(c, 10); |
| } |
| |
| waitFor(3000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| boolean completed = true; |
| |
| for (MyCallable callable : callables) { |
| completed &= (callable.executed != 0); |
| } |
| |
| completed &= (callableOther.executed != 0); |
| |
| return completed; |
| } |
| }); |
| |
| long last = Long.MIN_VALUE; |
| for (MyCallable c : callables) { |
| System.out.println("Callable C executed :" + c.executed); |
| assertTrue(c.executed != 0); |
| last = Math.max(last, c.executed); |
| } |
| System.out.println("Callable callableOther executed :" + callableOther.executed); |
| |
| assertTrue(callableOther.executed < last); |
| assertTrue(callableOther.executed > (now + 115)); |
| } |
| |
| public void testSerialConcurrencyLimit() throws Exception { |
| EXEC_ORDER = new AtomicLong(); |
| final MyCallable callable1 = new MyCallable("TestSerialConcurrencyLimit", 0, 100); |
| final MyCallable callable2 = new MyCallable("TestSerialConcurrencyLimit", 0, 100); |
| final MyCallable callable3 = new MyCallable("TestSerialConcurrencyLimit", 0, 100); |
| final MyCallable callable4 = new MyCallable("TestSerialConcurrencyLimit", 0, 100); |
| final MyCallable callable5 = new MyCallable("TestSerialConcurrencyLimit", 0, 100); |
| |
| List<MyCallable> callables = Arrays.asList(callable1, callable2, callable3, callable4, callable5); |
| |
| CallableQueueService queueservice = Services.get().get(CallableQueueService.class); |
| |
| String type = "SerialConcurrencyLimit"; |
| for (MyCallable c : callables) { |
| queueservice.queueSerial(Arrays.asList(c, new MyCallable(type = type + "x", 0, 0))); |
| } |
| |
| waitFor(3000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| return callable1.executed != 0 && callable2.executed != 0 && callable3.executed != 0 && |
| callable4.executed != 0 && callable5.executed != 0; |
| } |
| }); |
| |
| long first = Long.MAX_VALUE; |
| for (MyCallable c : callables) { |
| assertTrue(c.executed != 0); |
| first = Math.min(first, c.executed); |
| } |
| |
| int secondBatch = 0; |
| for (MyCallable c : callables) { |
| if (c.executed - first > 0) { |
| secondBatch++; |
| } |
| } |
| assertTrue(secondBatch >= 2); |
| } |
| |
| public void testConcurrency() throws Exception { |
| EXEC_ORDER = new AtomicLong(); |
| final MyCallable callable1 = new MyCallable("TestConcurrency", 0, 100); |
| final MyCallable callable2 = new MyCallable("TestConcurrency", 0, 100); |
| final MyCallable callable3 = new MyCallable("TestConcurrency", 0, 100); |
| final MyCallable callable4 = new MyCallable("TestConcurrency", 0, 100); |
| final MyCallable callable5 = new MyCallable("TestConcurrency", 0, 100); |
| |
| List<MyCallable> callables = Arrays.asList(callable1, callable2, callable3, callable4, callable5); |
| |
| CallableQueueService queueservice = Services.get().get(CallableQueueService.class); |
| |
| for (MyCallable c : callables) { |
| queueservice.queue(c); |
| } |
| |
| waitFor(3000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| return callable1.executed != 0 && callable2.executed != 0 && callable3.executed != 0 && |
| callable4.executed != 0 && callable5.executed != 0; |
| } |
| }); |
| |
| long first = Long.MAX_VALUE; |
| for (MyCallable c : callables) { |
| assertTrue(c.executed != 0); |
| first = Math.min(first, c.executed); |
| } |
| |
| int secondBatch = 0; |
| for (MyCallable c : callables) { |
| if (c.executed - first > 0) { |
| secondBatch++; |
| } |
| } |
| assertTrue(secondBatch >= 2); |
| } |
| |
| public void testQueueUniquenessWithSameKey() throws Exception { |
| EXEC_ORDER = new AtomicLong(); |
| final MyCallable callable1 = new MyCallable("QueueUniquenessWithSameKey", "QueueUniquenessWithSameKey", 0, 100); |
| final MyCallable callable2 = new MyCallable("QueueUniquenessWithSameKey", "QueueUniquenessWithSameKey", 0, 100); |
| final MyCallable callable3 = new MyCallable("QueueUniquenessWithSameKey", "QueueUniquenessWithSameKey", 0, 100); |
| |
| List<MyCallable> callables = Arrays.asList(callable1, callable2, callable3); |
| |
| CallableQueueService queueservice = Services.get().get(CallableQueueService.class); |
| |
| for (MyCallable c : callables) { |
| queueservice.queue(c); |
| } |
| |
| waitFor(200, new Predicate() { |
| public boolean evaluate() throws Exception { |
| return callable1.executed != 0 && callable2.executed == 0 && callable3.executed == 0; |
| } |
| }); |
| |
| assertTrue(callable1.executed != 0); |
| assertTrue(callable2.executed == 0); |
| assertTrue(callable3.executed == 0); |
| } |
| |
| public void testQueueUniquenessWithSameKeyInComposite() throws Exception { |
| EXEC_ORDER = new AtomicLong(); |
| final MyCallable callable1 = new MyCallable( |
| "QueueUniquenessWithSameKeyInComposite", "QueueUniquenessWithSameKeyInComposite", 0, 200); |
| final MyCallable callable2 = new MyCallable( |
| "QueueUniquenessWithSameKeyInComposite", "QueueUniquenessWithSameKeyInComposite", 0, 200); |
| final MyCallable callable3 = new MyCallable( |
| "QueueUniquenessWithSameKeyInComposite", "QueueUniquenessWithSameKeyInComposite", 0, 200); |
| |
| List<MyCallable> callables = Arrays.asList(callable1, callable2, callable3); |
| |
| CallableQueueService queueservice = Services.get().get(CallableQueueService.class); |
| |
| String type = "QueueUniquenessWithSameKeyInComposite"; |
| for (MyCallable c : callables) { |
| queueservice.queueSerial(Arrays.asList(c, new MyCallable(type = type + "x", 0, 0)), 200); |
| } |
| |
| waitFor(2000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| return callable1.executed != 0 && callable2.executed == 0 && callable3.executed == 0; |
| } |
| }); |
| |
| assertTrue(callable1.executed != 0); |
| assertTrue(callable2.executed == 0); |
| assertTrue(callable3.executed == 0); |
| } |
| |
| public void testQueueUniquenessWithSameKeyInOneComposite() throws Exception { |
| EXEC_ORDER = new AtomicLong(); |
| final MyCallable callable1 = new MyCallable( |
| "QueueUniquenessWithSameKeyInOneComposite", "QueueUniquenessWithSameKeyInOneComposite", 0, 100); |
| final MyCallable callable2 = new MyCallable( |
| "QueueUniquenessWithSameKeyInOneComposite", "QueueUniquenessWithSameKeyInOneComposite", 0, 100); |
| final MyCallable callable3 = new MyCallable( |
| "QueueUniquenessWithSameKeyInOneComposite", "QueueUniquenessWithSameKeyInOneComposite", 0, 100); |
| |
| CallableQueueService queueservice = Services.get().get(CallableQueueService.class); |
| |
| queueservice.queueSerial(Arrays.asList(callable1, callable2, callable3)); |
| |
| waitFor(200, new Predicate() { |
| public boolean evaluate() throws Exception { |
| return callable1.executed != 0 && callable2.executed == 0 && callable3.executed == 0; |
| } |
| }); |
| |
| assertTrue(callable1.executed != 0); |
| assertTrue(callable2.executed == 0); |
| assertTrue(callable3.executed == 0); |
| } |
| |
| public void testQueueUniquenessWithDiffKey() throws Exception { |
| EXEC_ORDER = new AtomicLong(); |
| final MyCallable callable1 = new MyCallable("QueueUniquenessWithDiffKey1", "QueueUniquenessWithDiffKey", 0, 100); |
| final MyCallable callable2 = new MyCallable("QueueUniquenessWithDiffKey2", "QueueUniquenessWithDiffKey", 0, 100); |
| final MyCallable callable3 = new MyCallable("QueueUniquenessWithDiffKey3", "QueueUniquenessWithDiffKey", 0, 100); |
| |
| List<MyCallable> callables = Arrays.asList(callable1, callable2, callable3); |
| |
| CallableQueueService queueservice = Services.get().get(CallableQueueService.class); |
| |
| for (MyCallable c : callables) { |
| queueservice.queue(c); |
| } |
| |
| waitFor(200, new Predicate() { |
| public boolean evaluate() throws Exception { |
| return callable1.executed != 0 && callable2.executed != 0 && callable3.executed != 0; |
| } |
| }); |
| |
| assertTrue(callable1.executed != 0); |
| assertTrue(callable2.executed != 0); |
| assertTrue(callable3.executed != 0); |
| } |
| |
| public void testQueueUniquenessWithDiffKeyInComposite() throws Exception { |
| EXEC_ORDER = new AtomicLong(); |
| final MyCallable callable1 = new MyCallable( |
| "QueueUniquenessWithDiffKeyInComposite1", "QueueUniquenessWithDiffKeyInComposite", 0, 100); |
| final MyCallable callable2 = new MyCallable( |
| "QueueUniquenessWithDiffKeyInComposite2", "QueueUniquenessWithDiffKeyInComposite", 0, 100); |
| final MyCallable callable3 = new MyCallable( |
| "QueueUniquenessWithDiffKeyInComposite3", "QueueUniquenessWithDiffKeyInComposite", 0, 100); |
| |
| List<MyCallable> callables = Arrays.asList(callable1, callable2, callable3); |
| |
| CallableQueueService queueservice = Services.get().get(CallableQueueService.class); |
| |
| String type = "QueueUniquenessWithDiffKeyInComposite"; |
| for (MyCallable c : callables) { |
| queueservice.queueSerial(Arrays.asList(c, new MyCallable(type = type + "x", 0, 0))); |
| } |
| |
| waitFor(200, new Predicate() { |
| public boolean evaluate() throws Exception { |
| return callable1.executed != 0 && callable2.executed != 0 && callable3.executed != 0; |
| } |
| }); |
| |
| assertTrue(callable1.executed != 0); |
| assertTrue(callable2.executed != 0); |
| assertTrue(callable3.executed != 0); |
| } |
| |
| public void testQueueUniquenessWithDiffKeyInOneComposite() throws Exception { |
| EXEC_ORDER = new AtomicLong(); |
| final MyCallable callable1 = new MyCallable( |
| "QueueUniquenessWithDiffKeyInOneComposite1", "QueueUniquenessWithDiffKeyInOneComposite", 0, 100); |
| final MyCallable callable2 = new MyCallable( |
| "QueueUniquenessWithDiffKeyInOneComposite2", "QueueUniquenessWithDiffKeyInOneComposite", 0, 100); |
| final MyCallable callable3 = new MyCallable( |
| "QueueUniquenessWithDiffKeyInOneComposite3", "QueueUniquenessWithDiffKeyInOneComposite", 0, 100); |
| |
| CallableQueueService queueservice = Services.get().get(CallableQueueService.class); |
| |
| queueservice.queueSerial(Arrays.asList(callable1, callable2, callable3)); |
| |
| waitFor(200, new Predicate() { |
| public boolean evaluate() throws Exception { |
| return callable1.executed != 0 && callable2.executed != 0 && callable3.executed != 0; |
| } |
| }); |
| |
| assertTrue(callable1.executed != 0); |
| assertTrue(callable2.executed != 0); |
| assertTrue(callable3.executed != 0); |
| } |
| |
| /** |
| * Testing the interrupts by introducing an interrupt command within a set |
| * of 10 commands and assuring it will be executed first |
| */ |
| public void testInterrupt() throws Exception { |
| EXEC_ORDER = new AtomicLong(); |
| Services.get().destroy(); |
| setSystemProperty(CallableQueueService.CONF_THREADS, "1"); |
| setSystemProperty(CallableQueueService.CONF_CALLABLE_INTERRUPT_TYPES, "testKill"); |
| new Services().init(); |
| |
| CallableQueueService queueservice = Services.get().get(CallableQueueService.class); |
| final ExtendedXCommand initialCallable = new ExtendedXCommand("initialKey", "initialType", 2, 200, |
| "initialLockKey"); |
| final List<ExtendedXCommand> callables = new ArrayList<ExtendedXCommand>(); |
| for (int i = 0; i < 10; i++) { |
| callables.add(new ExtendedXCommand("key" + i, "type" + i, 1, 100, "lockKey")); |
| } |
| |
| final ExtendedXCommand intCallable = new ExtendedXCommand("keyInt", "testKill", 0, 200, "lockKey"); |
| |
| queueservice.queue(initialCallable); |
| for (int i = 0; i < 10; i++) { |
| queueservice.queue(callables.get(i)); |
| } |
| |
| queueservice.queue(intCallable); |
| |
| waitFor(3000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| boolean retValue = initialCallable.executed != 0 && intCallable.executed != 0; |
| for (ExtendedXCommand c : callables) { |
| retValue = retValue && c.executed != 0; |
| } |
| return retValue; |
| } |
| }); |
| |
| assertTrue(initialCallable.executed > 0); |
| assertTrue(intCallable.executed > 0); |
| assertTrue(intCallable.executed < callables.get(5).executed); |
| } |
| |
| /* |
| * Introducing an interrupt with different keys and assure it will be |
| * executed in order regardless of the existence of an interrupt command in |
| * the mix. |
| */ |
| public void testInterruptsWithDistinguishedLockKeys() throws Exception { |
| EXEC_ORDER = new AtomicLong(); |
| Services.get().destroy(); |
| setSystemProperty(CallableQueueService.CONF_THREADS, "1"); |
| setSystemProperty(CallableQueueService.CONF_CALLABLE_INTERRUPT_TYPES, "testKill"); |
| new Services().init(); |
| |
| CallableQueueService queueservice = Services.get().get(CallableQueueService.class); |
| |
| final ExtendedXCommand initialCallable = new ExtendedXCommand("initialKey", "initialType", 2, 200, |
| "initialLockKey"); |
| final List<ExtendedXCommand> callables = new ArrayList<ExtendedXCommand>(); |
| for (int i = 0; i < 10; i++) { |
| callables.add(new ExtendedXCommand("key" + i, "type" + i, 1, 100, "lockKey" + i)); |
| } |
| |
| final ExtendedXCommand intCallable = new ExtendedXCommand("keyInt", "testKill", 0, 100, "lockKey"); |
| |
| queueservice.queue(initialCallable); |
| for (int i = 0; i < 10; i++) { |
| queueservice.queue(callables.get(i)); |
| } |
| |
| queueservice.queue(intCallable); |
| |
| waitFor(6000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| boolean retValue = initialCallable.executed != 0 && intCallable.executed != 0; |
| for (ExtendedXCommand c : callables) { |
| retValue = retValue && c.executed != 0; |
| } |
| return retValue; |
| } |
| }); |
| |
| assertTrue(initialCallable.executed > 0); |
| assertTrue(intCallable.executed > 0); |
| assertTrue(intCallable.executed > callables.get(5).executed); |
| } |
| |
| /* |
| * assuring an interrupt command will be executed before a composite |
| * callable with the same lock key |
| */ |
| public void testInterruptsWithCompositeCallable() throws Exception { |
| EXEC_ORDER = new AtomicLong(); |
| Services.get().destroy(); |
| setSystemProperty(CallableQueueService.CONF_THREADS, "1"); |
| setSystemProperty(CallableQueueService.CONF_CALLABLE_INTERRUPT_TYPES, "testKill"); |
| new Services().init(); |
| |
| CallableQueueService queueservice = Services.get().get(CallableQueueService.class); |
| final ExtendedXCommand initialCallable = new ExtendedXCommand("initialKey", "initialType", 2, 200, |
| "initialLockKey"); |
| final List<ExtendedXCommand> callables = new ArrayList<ExtendedXCommand>(); |
| |
| for (int i = 0; i < 10; i++) { |
| callables.add(new ExtendedXCommand("key" + i, "type" + i, 1, 100, "lockKey")); |
| } |
| |
| final ExtendedXCommand intCallable = new ExtendedXCommand("key5", "testKill", 0, 200, "lockKey"); |
| |
| queueservice.queue(initialCallable); |
| queueservice.queueSerial((List<? extends XCallable<?>>) (callables), 0); |
| queueservice.queue(intCallable); |
| |
| waitFor(3000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| boolean retValue = initialCallable.executed != 0 && intCallable.executed != 0; |
| for (ExtendedXCommand c : callables) { |
| retValue = retValue && c.executed != 0; |
| } |
| return retValue; |
| } |
| }); |
| |
| assertTrue(initialCallable.executed > 0); |
| assertTrue(intCallable.executed > 0); |
| for (ExtendedXCommand c : callables) { |
| assertTrue(intCallable.executed < c.executed); |
| } |
| } |
| |
| /* |
| * Testing an interrupt commands inside a composite callable Assuring it is |
| * executed before the others |
| */ |
| public void testInterruptsInCompositeCallable() throws Exception { |
| EXEC_ORDER = new AtomicLong(); |
| Services.get().destroy(); |
| setSystemProperty(CallableQueueService.CONF_THREADS, "1"); |
| setSystemProperty(CallableQueueService.CONF_CALLABLE_INTERRUPT_TYPES, "testKill"); |
| new Services().init(); |
| |
| CallableQueueService queueservice = Services.get().get(CallableQueueService.class); |
| final ExtendedXCommand initialCallable = new ExtendedXCommand("initialKey", "initialType", 2, 200, |
| "initialLockKey"); |
| final List<ExtendedXCommand> callables = new ArrayList<ExtendedXCommand>(); |
| |
| for (int i = 0; i < 5; i++) { |
| callables.add(new ExtendedXCommand("key" + i, "type" + i, 1, 100, "lockKey")); |
| } |
| callables.add(new ExtendedXCommand("key" + 5, "testKill", 1, 100, "lockKey")); |
| for (int i = 6; i < 10; i++) { |
| callables.add(new ExtendedXCommand("key" + i, "type" + i, 1, 100, "lockKey")); |
| } |
| |
| queueservice.queue(initialCallable); |
| queueservice.queueSerial((List<? extends XCallable<?>>) (callables), 0); |
| |
| waitFor(3000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| boolean retValue = initialCallable.executed != 0; |
| for (ExtendedXCommand c : callables) { |
| retValue = retValue && c.executed != 0; |
| } |
| return retValue; |
| } |
| }); |
| |
| assertTrue(initialCallable.executed > 0); |
| assertTrue(callables.get(1).executed > callables.get(5).executed); |
| } |
| |
| /* |
| * Assuring the interrupts will not be inserted in the map when it reached |
| * the max size |
| */ |
| public void testMaxInterruptMapSize() throws Exception { |
| EXEC_ORDER = new AtomicLong(); |
| Services.get().destroy(); |
| setSystemProperty(CallableQueueService.CONF_THREADS, "1"); |
| setSystemProperty(CallableQueueService.CONF_CALLABLE_INTERRUPT_TYPES, "testKill"); |
| setSystemProperty(CallableQueueService.CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE, "0"); |
| new Services().init(); |
| |
| CallableQueueService queueservice = Services.get().get(CallableQueueService.class); |
| |
| final ExtendedXCommand initialCallable = new ExtendedXCommand("initialKey", "initialType", 2, 100, |
| "initialLockKey"); |
| final List<ExtendedXCommand> callables = new ArrayList<ExtendedXCommand>(); |
| for (int i = 0; i < 10; i++) { |
| callables.add(new ExtendedXCommand("key" + i, "type" + i, 1, 100, "lockKey")); |
| } |
| |
| final ExtendedXCommand intCallable = new ExtendedXCommand("keyInt", "testKill", 0, 100, "lockKey"); |
| |
| queueservice.queue(initialCallable); |
| for (int i = 0; i < 10; i++) { |
| queueservice.queue(callables.get(i)); |
| } |
| |
| queueservice.queue(intCallable); |
| |
| waitFor(5000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| boolean retValue = initialCallable.executed != 0 && intCallable.executed != 0; |
| for (ExtendedXCommand c : callables) { |
| retValue = retValue && c.executed != 0; |
| } |
| return retValue; |
| } |
| }); |
| |
| assertTrue(initialCallable.executed > 0); |
| assertTrue(intCallable.executed > 0); |
| assertTrue(intCallable.executed > callables.get(5).executed); |
| } |
| |
| public void testRemoveUniqueCallables() throws Exception { |
| XCommand<?> command = new XCommand<Object>("Test", "type", 100) { |
| @Override |
| protected boolean isLockRequired() { |
| return false; |
| } |
| |
| @Override |
| public String getEntityKey() { |
| return "TEST"; |
| } |
| |
| @Override |
| protected void loadState() throws CommandException { |
| } |
| |
| @Override |
| protected void verifyPrecondition() throws CommandException, PreconditionException { |
| } |
| |
| @Override |
| protected Object execute() throws CommandException { |
| return null; |
| } |
| }; |
| Services.get().destroy(); |
| setSystemProperty(CallableQueueService.CONF_THREADS, "1"); |
| new Services().init(); |
| |
| CallableQueueService queueservice = Services.get().get(CallableQueueService.class); |
| List<String> uniquesBefore = queueservice.getUniqueDump(); |
| try { |
| queueservice.queue(command); |
| fail("Expected illegal argument exception: priority = 100"); |
| } |
| catch (Exception e) { |
| assertTrue(e.getCause() != null && e.getCause() instanceof IllegalArgumentException); |
| } |
| List<String> uniquesAfter = queueservice.getUniqueDump(); |
| uniquesAfter.removeAll(uniquesBefore); |
| assertTrue(uniquesAfter.toString(), uniquesAfter.isEmpty()); |
| } |
| |
| public void testMaxConcurrencyReached() throws Exception { |
| Services.get().destroy(); |
| setSystemProperty(CallableQueueService.CONF_QUEUE_SIZE, "100000"); |
| new Services().init(); |
| |
| int partitions = 10; |
| int taskPerPartition = 10000; |
| |
| final int taskCount = partitions * taskPerPartition; |
| |
| List<DummyCallable> callables = new ArrayList<>(taskCount); |
| |
| for (int i = 0; i < partitions; i++) { |
| String type = String.valueOf(i); |
| for (int j = 0; j < taskPerPartition; j++) { |
| String key = type + "_" + UUID.randomUUID().toString(); |
| DummyCallable dc = new DummyCallable(taskCount, key, type, 0, 0); |
| callables.add(dc); |
| } |
| } |
| |
| CallableQueueService queueservice = Services.get().get(CallableQueueService.class); |
| |
| for (int i = 0; i < taskCount; i++) { |
| queueservice.queue(callables.get(i)); |
| } |
| |
| try { |
| finished.await(100, TimeUnit.SECONDS); |
| } catch (Exception e) { |
| log.error("Error", e); |
| fail("Exception during test: " + e.getMessage()); |
| } |
| |
| assertEquals("Not all callables have been executed", counter.get(), taskCount); |
| } |
| |
| public void testQueueSizeWithDelayedElements() throws InterruptedException { |
| final int taskCount = 10_000; |
| |
| List<DummyCallable> callables = new ArrayList<>(taskCount); |
| for (int i = 0; i < taskCount; i++) { |
| String keyAndType = String.valueOf(i); |
| DummyCallable dc = new DummyCallable(taskCount, keyAndType, keyAndType, 0, 0); |
| callables.add(dc); |
| } |
| |
| CallableQueueService queueservice = Services.get().get(CallableQueueService.class); |
| |
| for (int i = 0; i < taskCount; i++) { |
| queueservice.queue(callables.get(i), 2000); |
| } |
| |
| int queueSizeAfterSubmission = queueservice.queueSize(); |
| |
| try { |
| finished.await(10, TimeUnit.SECONDS); |
| } catch (Exception e) { |
| log.error("Error", e); |
| fail("Exception during test: " + e.getMessage()); |
| } |
| |
| assertEquals("Queue size after submission", taskCount, queueSizeAfterSubmission); |
| assertEquals("Queue size after execution", 0, queueservice.queueSize()); |
| } |
| |
| public void testQueueSizeAfterNormalSubmission() throws InterruptedException { |
| final int taskCount = 10_000; |
| |
| List<DummyCallable> callables = new ArrayList<>(taskCount); |
| for (int i = 0; i < taskCount; i++) { |
| String keyAndType = String.valueOf(i); |
| DummyCallable dc = new DummyCallable(taskCount, keyAndType, keyAndType, 0, 0); |
| callables.add(dc); |
| } |
| |
| CallableQueueService queueservice = Services.get().get(CallableQueueService.class); |
| |
| for (int i = 0; i < taskCount; i++) { |
| queueservice.queue(callables.get(i)); |
| } |
| |
| // Not an exact number - it's close to 10,000 but keeps fluctuating |
| // We can still verify that it's larger than a certain number though |
| int queueSizeAfterSubmission = queueservice.queueSize(); |
| try { |
| finished.await(10, TimeUnit.SECONDS); |
| } catch (Exception e) { |
| log.error("Error", e); |
| fail("Exception during test: " + e.getMessage()); |
| } |
| // It's necessary because after finished.await() returns, the last XCallable |
| // could still be running |
| waitFor(1000, new Predicate() { |
| @Override |
| public boolean evaluate() throws Exception { |
| return queueservice.queueSize() == 0; |
| } |
| }); |
| |
| assertTrue("Too few elements in the queue: " + queueSizeAfterSubmission + ", should be >9000", |
| queueSizeAfterSubmission > 9000); |
| assertEquals("Queue size after execution", 0, queueservice.queueSize()); |
| } |
| |
| public void testQueueSizeWhenMaxConcurrencyIsReached() throws InterruptedException { |
| int partitions = 10; |
| int taskPerPartition = 1000; |
| |
| final int taskCount = partitions * taskPerPartition; |
| |
| List<DummyCallable> callables = new ArrayList<>(taskCount); |
| |
| for (int i = 0; i < partitions; i++) { |
| String type = String.valueOf(i); |
| for (int j = 0; j < taskPerPartition; j++) { |
| String key = type + "_" + UUID.randomUUID().toString(); |
| DummyCallable dc = new DummyCallable(taskCount, key, type, 0, 0); |
| callables.add(dc); |
| } |
| } |
| |
| CallableQueueService queueservice = Services.get().get(CallableQueueService.class); |
| |
| for (int i = 0; i < taskCount; i++) { |
| queueservice.queue(callables.get(i)); |
| } |
| |
| // Not an exact number - it's close to 10,000 but keeps fluctuating |
| // We can still verify that it's larger than a certain number though |
| int queueSizeAfterSubmission = queueservice.queueSize(); |
| try { |
| finished.await(10, TimeUnit.SECONDS); |
| } catch (Exception e) { |
| log.error("Error", e); |
| fail("Exception during test: " + e.getMessage()); |
| } |
| // It's necessary because after finished.await() returns, the last XCallable |
| // could still be running |
| waitFor(1000, new Predicate() { |
| @Override |
| public boolean evaluate() throws Exception { |
| return queueservice.queueSize() == 0; |
| } |
| }); |
| |
| assertTrue("Too few elements in the queue: " + queueSizeAfterSubmission + ", should be >9000", |
| queueSizeAfterSubmission > 9000); |
| assertEquals("Queue size after execution", 0, queueservice.queueSize()); |
| } |
| |
| private class DummyCallable extends MyCallable { |
| private final int taskCount; |
| |
| public DummyCallable(int taskCount, String key, String type, int priority, int wait) { |
| super(key, type, priority, wait); |
| this.taskCount = taskCount; |
| } |
| |
| public Void call() throws Exception { |
| if (counter.incrementAndGet() == taskCount) { |
| finished.countDown(); |
| } |
| |
| return null; |
| } |
| } |
| |
| private class BookingCallable extends MyCallable { |
| private final int taskCount; |
| private final Multimap<Integer, Long> executions; |
| |
| public BookingCallable(Multimap<Integer, Long> executions, |
| int taskCount, |
| String key, |
| String type, |
| int priority, |
| int wait) { |
| super(key, type, priority, wait); |
| this.taskCount = taskCount; |
| this.executions = executions; |
| } |
| |
| public Void call() throws Exception { |
| executions.put(getPriority(), System.currentTimeMillis()); |
| if (counter.incrementAndGet() == taskCount) { |
| finished.countDown(); |
| } |
| return null; |
| } |
| } |
| } |