blob: 6d9db79e95abcefd687fd876eec42a2134774db6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.testing;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.concurrent.CountDownLatch;
import org.apache.drill.test.BaseTestQuery;
import org.apache.drill.common.concurrent.ExtendedLatch;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
import org.apache.drill.exec.proto.UserProtos.UserProperties;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.util.Pointer;
import org.junit.Test;
public class TestCountDownLatchInjection extends BaseTestQuery {
private static final UserSession session = UserSession.Builder.newBuilder()
.withCredentials(UserCredentials.newBuilder()
.setUserName("foo")
.build())
.withUserProperties(UserProperties.getDefaultInstance())
.withOptionManager(bits[0].getContext().getOptionManager())
.build();
/**
* Class whose methods we want to simulate count down latches at run-time for testing
* purposes. The class must have access to {@link org.apache.drill.exec.ops.QueryContext} or
* {@link FragmentContextImpl}.
*/
private static class DummyClass {
private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(DummyClass.class);
private final QueryContext context;
private final CountDownLatch latch;
private final int count;
public DummyClass(final QueryContext context, final CountDownLatch latch, final int count) {
this.context = context;
this.latch = latch;
this.count = count;
}
public static final String LATCH_NAME = "<<latch>>";
/**
* Method that initializes and waits for "count" number of count down (from those many threads)
*/
public long initAndWait() throws InterruptedException {
// ... code ...
injector.getLatch(context.getExecutionControls(), LATCH_NAME).initialize(count);
// ... code ...
latch.countDown(); // trigger threads spawn
final long startTime = System.currentTimeMillis();
// simulated wait for "count" threads to count down on the same latch
injector.getLatch(context.getExecutionControls(), LATCH_NAME).await();
final long endTime = System.currentTimeMillis();
// ... code ...
return (endTime - startTime);
}
public void countDown() {
// ... code ...
injector.getLatch(context.getExecutionControls(), LATCH_NAME).countDown();
// ... code ...
}
}
private static class ThreadCreator extends Thread {
private final DummyClass dummyClass;
private final ExtendedLatch latch;
private final int count;
private final Pointer<Long> countingDownTime;
public ThreadCreator(final DummyClass dummyClass, final ExtendedLatch latch, final int count,
final Pointer<Long> countingDownTime) {
this.dummyClass = dummyClass;
this.latch = latch;
this.count = count;
this.countingDownTime = countingDownTime;
}
@Override
public void run() {
latch.awaitUninterruptibly();
final long startTime = System.currentTimeMillis();
for (int i = 0; i < count; i++) {
(new Thread() {
@Override
public void run() {
dummyClass.countDown();
}
}).start();
}
final long endTime = System.currentTimeMillis();
countingDownTime.value = (endTime - startTime);
}
}
@Test // test would hang if the correct init, wait and countdowns did not happen, and the test timeout mechanism will
// catch that case
public void latchInjected() throws InterruptedException {
final int threads = 10;
final ExtendedLatch trigger = new ExtendedLatch(1);
final Pointer<Long> countingDownTime = new Pointer<>();
final String controls = Controls.newBuilder()
.addLatch(DummyClass.class, DummyClass.LATCH_NAME)
.build();
ControlsInjectionUtil.setControls(session, controls);
final QueryContext queryContext = new QueryContext(session, bits[0].getContext(), QueryId.getDefaultInstance());
final DummyClass dummyClass = new DummyClass(queryContext, trigger, threads);
(new ThreadCreator(dummyClass, trigger, threads, countingDownTime)).start();
final long timeSpentWaiting;
try {
timeSpentWaiting = dummyClass.initAndWait();
} catch (final InterruptedException e) {
fail("Thread should not be interrupted; there is no deliberate attempt.");
return;
}
while (countingDownTime.value == null) {
Thread.sleep(100L);
}
assertTrue(timeSpentWaiting >= countingDownTime.value);
try {
queryContext.close();
} catch (final Exception e) {
fail("Failed to close query context: " + e);
}
}
}