blob: 7e03e7b40ea68b5bde6fa6f6962d19be5615dd04 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.server.appmaster.actions
import groovy.util.logging.Slf4j
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.service.ServiceOperations
import org.apache.slider.server.appmaster.SliderAppMaster
import org.apache.slider.server.appmaster.state.AppState
import org.apache.slider.server.services.workflow.ServiceThreadFactory
import org.apache.slider.server.services.workflow.WorkflowExecutorService
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicLong
@Slf4j
//@CompileStatic
class TestActions {
QueueService queues;
WorkflowExecutorService<ExecutorService> executorService;
@Before
void createService() {
queues = new QueueService();
def conf = new Configuration()
queues.init(conf)
queues.start();
executorService = new WorkflowExecutorService<>("AmExecutor",
Executors.newCachedThreadPool(
new ServiceThreadFactory("AmExecutor", true)));
executorService.init(conf)
executorService.start();
}
@After
void destroyService() {
ServiceOperations.stop(executorService);
ServiceOperations.stop(queues);
}
@Test
public void testBasicService() throws Throwable {
queues.start();
}
@Test
public void testDelayLogic() throws Throwable {
ActionNoteExecuted action = new ActionNoteExecuted("", 1000)
long now = System.currentTimeMillis();
def delay = action.getDelay(TimeUnit.MILLISECONDS)
assert delay >= 800
assert delay <= 1800
ActionNoteExecuted a2 = new ActionNoteExecuted("a2", 10000)
assert action.compareTo(a2) < 0
assert a2.compareTo(action) > 0
assert action.compareTo(action)== 0
}
@Test
public void testActionDelayedExecutorTermination() throws Throwable {
long start = System.currentTimeMillis()
ActionStopQueue stopAction = new ActionStopQueue(1000);
queues.scheduledActions.add(stopAction);
queues.run();
AsyncAction take = queues.actionQueue.take();
assert take == stopAction
long stop = System.currentTimeMillis();
assert stop - start > 500
assert stop - start < 1500
}
@Test
public void testImmediateQueue() throws Throwable {
ActionNoteExecuted noteExecuted = new ActionNoteExecuted("executed", 0)
queues.put(noteExecuted)
queues.put(new ActionStopQueue(0))
QueueExecutor ex = new QueueExecutor(queues)
ex.run();
assert queues.actionQueue.empty
assert noteExecuted.executed.get()
}
@Test
public void testActionOrdering() throws Throwable {
ActionNoteExecuted note1 = new ActionNoteExecuted("note1", 500)
def stop = new ActionStopQueue(1500)
ActionNoteExecuted note2 = new ActionNoteExecuted("note2", 800)
List<AsyncAction> actions = [note1, stop, note2]
Collections.sort(actions)
assert actions[0] == note1
assert actions[1] == note2
assert actions[2] == stop
}
@Test
public void testDelayedQueueWithReschedule() throws Throwable {
ActionNoteExecuted note1 = new ActionNoteExecuted("note1", 500)
def stop = new ActionStopQueue(1500)
ActionNoteExecuted note2 = new ActionNoteExecuted("note2", 800)
assert note2.compareTo(stop) < 0
assert note1.nanos < note2.nanos
assert note2.nanos < stop.nanos
queues.schedule(note1)
queues.schedule(note2)
queues.schedule(stop)
// async to sync expected to run in order
runQueuesToCompletion()
assert note1.executed.get()
assert note2.executed.get()
}
public void runQueuesToCompletion() {
queues.run();
assert queues.scheduledActions.empty
assert !queues.actionQueue.empty
QueueExecutor ex = new QueueExecutor(queues)
ex.run();
// flush all stop commands from the queue
queues.flushActionQueue(ActionStopQueue.class)
assert queues.actionQueue.empty
}
@Test
public void testRenewedActionFiresOnceAtLeast() throws Throwable {
ActionNoteExecuted note1 = new ActionNoteExecuted("note1", 500)
RenewingAction renewer = new RenewingAction(
note1,
500,
100,
TimeUnit.MILLISECONDS,
3)
queues.schedule(renewer);
def stop = new ActionStopQueue(4, TimeUnit.SECONDS)
queues.schedule(stop);
// this runs all the delayed actions FIRST, so can't be used
// to play tricks of renewing actions ahead of the stop action
runQueuesToCompletion()
assert renewer.executionCount == 1
assert note1.executionCount == 1
// assert the renewed item is back in
assert queues.scheduledActions.contains(renewer)
}
@Test
public void testRenewingActionOperations() throws Throwable {
ActionNoteExecuted note1 = new ActionNoteExecuted("note1", 500)
RenewingAction renewer = new RenewingAction(
note1,
100,
100,
TimeUnit.MILLISECONDS,
3)
queues.renewing("note", renewer)
assert queues.removeRenewingAction("note")
queues.stop()
queues.waitForServiceToStop(10000)
}
public class ActionNoteExecuted extends AsyncAction {
public final AtomicBoolean executed = new AtomicBoolean(false);
public final AtomicLong executionTimeNanos = new AtomicLong()
private final AtomicLong executionCount = new AtomicLong()
public ActionNoteExecuted(String text, int delay) {
super(text, delay);
}
@Override
public void execute(
SliderAppMaster appMaster,
QueueAccess queueService,
AppState appState) throws Exception {
log.info("Executing $name");
executed.set(true);
executionTimeNanos.set(System.nanoTime())
executionCount.incrementAndGet()
log.info(this.toString())
synchronized (this) {
this.notify();
}
}
@Override
String toString() {
return super.toString() +
" executed=${executed.get()}; count=${executionCount.get()};"
}
long getExecutionCount() {
return executionCount.get()
}
}
}