blob: 0a89ff0239a2d9d913bd5445fe1da889fb6973d6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.htrace.impl;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.Semaphore;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.htrace.core.HTraceConfiguration;
import org.apache.htrace.core.Span;
import org.apache.htrace.core.SpanId;
import org.apache.htrace.core.TracerId;
import org.apache.htrace.impl.HTracedSpanReceiver.FaultInjector;
import org.apache.htrace.util.TestUtil;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
public class TestHTracedReceiver {
private static final Log LOG = LogFactory.getLog(TestHTracedReceiver.class);
@BeforeClass
public static void beforeClass() {
// Allow setting really small buffer sizes for testing purposes.
// We do not allow setting such small sizes in production.
Conf.BUFFER_SIZE_MIN = 0;
}
@Rule
public TestRule watcher = new TestWatcher() {
protected void starting(Description description) {
LOG.info("*** Starting junit test: " + description.getMethodName());
}
protected void finished(Description description) {
LOG.info("*** Finished junit test: " + description.getMethodName());
}
};
@Test(timeout = 60000)
public void testGetServerVersionJson() throws Exception {
HTracedProcess ht = new HTracedProcess.Builder().build();
try {
String response = ht.getServerVersionJson();
assertTrue(response.contains("ReleaseVersion"));
} finally {
ht.destroy();
}
}
private void waitForSpans(final HTracedProcess ht, Span[] spans)
throws Exception {
waitForSpans(ht, spans, spans.length);
}
private void waitForSpans(final HTracedProcess ht, Span[] spans,
int numSpans) throws Exception {
final LinkedList<SpanId> spanIds = new LinkedList<SpanId>();
for (int i = 0; i < numSpans; i++) {
spanIds.add(spans[i].getSpanId());
}
boolean success = false;
try {
TestUtil.waitFor(new TestUtil.Supplier<Boolean>() {
@Override
public Boolean get() {
for (Iterator<SpanId> iter = spanIds.iterator();
iter.hasNext(); ) {
SpanId spanId = iter.next();
try {
if (ht.getSpan(spanId) == null) {
return false;
}
} catch (InterruptedException e) {
LOG.error("Got InterruptedException while looking for " +
"span ID " + spanId, e);
Thread.currentThread().interrupt();
} catch (Exception e) {
LOG.error("Got error looking for span ID " + spanId, e);
return false;
}
iter.remove();
}
return true;
}
}, 10, 30000);
success = true;
} finally {
if (!success) {
String prefix = "";
StringBuilder idStringBld = new StringBuilder();
for (Iterator<SpanId> iter = spanIds.iterator();
iter.hasNext(); ) {
idStringBld.append(prefix);
idStringBld.append(iter.next());
prefix = ",";
}
LOG.error("Unable to find span IDs " + idStringBld.toString());
}
}
}
/**
* Test that we can send spans via the HRPC interface.
*/
@Test(timeout = 10000) //60000)
public void testSendSpansViaPacked() throws Exception {
final Random rand = new Random(123);
final HTracedProcess ht = new HTracedProcess.Builder().build();
try {
HTraceConfiguration conf = HTraceConfiguration.fromMap(
new HashMap<String, String>() {{
put(TracerId.TRACER_ID_KEY, "testSendSpansViaPacked");
put(Conf.ADDRESS_KEY, ht.getHrpcAddr());
put(Conf.PACKED_KEY, "true");
put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "100");
put(Conf.ERROR_LOG_PERIOD_MS_KEY, "0");
}});
HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf);
Span[] spans = TestUtil.randomSpans(rand, 10);
for (Span span : spans) {
rcvr.receiveSpan(span);
}
waitForSpans(ht, spans);
rcvr.close();
} finally {
ht.destroy();
}
}
/**
* Test that when the SpanReceiver is closed, we send any spans we have
* buffered via the HRPC interface.
*/
@Test(timeout = 60000)
public void testSendSpansViaPackedAndClose() throws Exception {
final Random rand = new Random(456);
final HTracedProcess ht = new HTracedProcess.Builder().build();
try {
HTraceConfiguration conf = HTraceConfiguration.fromMap(
new HashMap<String, String>() {{
put(TracerId.TRACER_ID_KEY, "testSendSpansViaPackedAndClose");
put(Conf.ADDRESS_KEY, ht.getHrpcAddr());
put(Conf.PACKED_KEY, "true");
put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "60000");
}});
HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf);
Span[] spans = TestUtil.randomSpans(rand, 10);
for (Span span : spans) {
rcvr.receiveSpan(span);
}
rcvr.close();
waitForSpans(ht, spans);
} finally {
ht.destroy();
}
}
/**
* Test that we can send spans via the REST interface.
*/
@Test(timeout = 60000)
public void testSendSpansViaRest() throws Exception {
final Random rand = new Random(789);
final HTracedProcess ht = new HTracedProcess.Builder().build();
try {
HTraceConfiguration conf = HTraceConfiguration.fromMap(
new HashMap<String, String>() {{
put(TracerId.TRACER_ID_KEY, "testSendSpansViaRest");
put(Conf.ADDRESS_KEY, ht.getHttpAddr());
put(Conf.PACKED_KEY, "false");
put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "100");
}});
HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf);
Span[] spans = TestUtil.randomSpans(rand, 10);
for (Span span : spans) {
rcvr.receiveSpan(span);
}
waitForSpans(ht, spans);
rcvr.close();
} finally {
ht.destroy();
}
}
/**
* Test that when the SpanReceiver is closed, we send any spans we have
* buffered via the REST interface.
*/
@Test(timeout = 60000)
public void testSendSpansViaRestAndClose() throws Exception {
final Random rand = new Random(321);
final HTracedProcess ht = new HTracedProcess.Builder().build();
try {
HTraceConfiguration conf = HTraceConfiguration.fromMap(
new HashMap<String, String>() {{
put(TracerId.TRACER_ID_KEY, "testSendSpansViaRestAndClose");
put(Conf.ADDRESS_KEY, ht.getHttpAddr());
put(Conf.PACKED_KEY, "false");
put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "60000");
}});
HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf);
Span[] spans = TestUtil.randomSpans(rand, 10);
for (Span span : spans) {
rcvr.receiveSpan(span);
}
rcvr.close();
waitForSpans(ht, spans);
} finally {
ht.destroy();
}
}
private static class Mutable<T> {
private T t;
Mutable(T t) {
this.t = t;
}
void set(T t) {
this.t = t;
}
T get() {
return this.t;
}
}
private static class TestHandleContentLengthTriggerInjector
extends HTracedSpanReceiver.FaultInjector {
final Semaphore threadStartSem = new Semaphore(0);
int contentLengthOnTrigger = 0;
@Override
public synchronized void handleContentLengthTrigger(int len) {
contentLengthOnTrigger = len;
}
@Override
public void handleThreadStart() throws Exception {
threadStartSem.acquire();
}
public synchronized int getContentLengthOnTrigger() {
return contentLengthOnTrigger;
}
}
/**
* Test that filling up one of the buffers causes us to trigger a flush and
* start using the other buffer, when using PackedBufferManager.
* This also tests that PackedBufferManager can correctly handle a buffer
* getting full.
*/
@Test(timeout = 60000)
public void testFullBufferCausesPackedThreadTrigger() throws Exception {
final Random rand = new Random(321);
final HTracedProcess ht = new HTracedProcess.Builder().build();
try {
HTraceConfiguration conf = HTraceConfiguration.fromMap(
new HashMap<String, String>() {{
put(TracerId.TRACER_ID_KEY,
"testFullBufferCausesPackedThreadTrigger");
put(Conf.ADDRESS_KEY, ht.getHrpcAddr());
put(Conf.PACKED_KEY, "true");
put(Conf.BUFFER_SIZE_KEY, "16384");
put(Conf.BUFFER_SEND_TRIGGER_FRACTION_KEY, "0.95");
}});
TestHandleContentLengthTriggerInjector injector =
new TestHandleContentLengthTriggerInjector();
HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector);
Span[] spans = TestUtil.randomSpans(rand, 47);
for (Span span : spans) {
rcvr.receiveSpan(span);
}
Assert.assertTrue("The wakePostSpansThread should have been " +
"triggered by the spans added so far. " +
"contentLengthOnTrigger = " + injector.getContentLengthOnTrigger(),
injector.getContentLengthOnTrigger() > 16000);
injector.threadStartSem.release();
rcvr.close();
waitForSpans(ht, spans, 45);
} finally {
ht.destroy();
}
}
/**
* Test that filling up one of the buffers causes us to trigger a flush and
* start using the other buffer, when using RestBufferManager.
* This also tests that RestBufferManager can correctly handle a buffer
* getting full.
*/
@Test(timeout = 60000)
public void testFullBufferCausesRestThreadTrigger() throws Exception {
final Random rand = new Random(321);
final HTracedProcess ht = new HTracedProcess.Builder().build();
try {
HTraceConfiguration conf = HTraceConfiguration.fromMap(
new HashMap<String, String>() {{
put(TracerId.TRACER_ID_KEY,
"testFullBufferCausesRestThreadTrigger");
put(Conf.ADDRESS_KEY, ht.getHttpAddr());
put(Conf.PACKED_KEY, "false");
put(Conf.BUFFER_SIZE_KEY, "16384");
put(Conf.BUFFER_SEND_TRIGGER_FRACTION_KEY, "0.95");
}});
TestHandleContentLengthTriggerInjector injector =
new TestHandleContentLengthTriggerInjector();
HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector);
Span[] spans = TestUtil.randomSpans(rand, 34);
for (Span span : spans) {
rcvr.receiveSpan(span);
}
Assert.assertTrue("The wakePostSpansThread should have been " +
"triggered by the spans added so far. " +
"contentLengthOnTrigger = " + injector.getContentLengthOnTrigger(),
injector.getContentLengthOnTrigger() > 16000);
injector.threadStartSem.release();
rcvr.close();
waitForSpans(ht, spans, 33);
} finally {
ht.destroy();
}
}
/**
* A FaultInjector that causes all flushes to fail until a specified
* number of milliseconds have passed.
*/
private static class TestInjectFlushFaults
extends HTracedSpanReceiver.FaultInjector {
private long remainingFaults;
TestInjectFlushFaults(long remainingFaults) {
this.remainingFaults = remainingFaults;
}
@Override
public synchronized void handleFlush() throws IOException {
if (remainingFaults > 0) {
remainingFaults--;
throw new IOException("Injected IOException into flush " +
"code path.");
}
}
}
/**
* Test that even if the flush fails, the system stays stable and we can
* still close the span receiver.
*/
@Test(timeout = 60000)
public void testPackedThreadHandlesFlushFailure() throws Exception {
final Random rand = new Random(321);
final HTracedProcess ht = new HTracedProcess.Builder().build();
try {
HTraceConfiguration conf = HTraceConfiguration.fromMap(
new HashMap<String, String>() {{
put(TracerId.TRACER_ID_KEY, "testPackedThreadHandlesFlushFailure");
put(Conf.ADDRESS_KEY, ht.getHrpcAddr());
put(Conf.PACKED_KEY, "true");
}});
TestInjectFlushFaults injector = new TestInjectFlushFaults(Long.MAX_VALUE);
HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector);
Span[] spans = TestUtil.randomSpans(rand, 15);
for (Span span : spans) {
rcvr.receiveSpan(span);
}
rcvr.close();
} finally {
ht.destroy();
}
}
/**
* Test that even if the flush fails, the system stays stable and we can
* still close the span receiver.
*/
@Test(timeout = 60000)
public void testRestThreadHandlesFlushFailure() throws Exception {
final Random rand = new Random(321);
final HTracedProcess ht = new HTracedProcess.Builder().build();
try {
HTraceConfiguration conf = HTraceConfiguration.fromMap(
new HashMap<String, String>() {{
put(TracerId.TRACER_ID_KEY, "testRestThreadHandlesFlushFailure");
put(Conf.ADDRESS_KEY, ht.getHttpAddr());
put(Conf.PACKED_KEY, "false");
}});
TestInjectFlushFaults injector = new TestInjectFlushFaults(Long.MAX_VALUE);
HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector);
Span[] spans = TestUtil.randomSpans(rand, 15);
for (Span span : spans) {
rcvr.receiveSpan(span);
}
rcvr.close();
} finally {
ht.destroy();
}
}
/**
* A FaultInjector that causes all flushes to fail until a specified
* number of milliseconds have passed.
*/
private static class WaitForFlushes
extends HTracedSpanReceiver.FaultInjector {
final Semaphore flushSem;
WaitForFlushes(int numFlushes) {
this.flushSem = new Semaphore(-numFlushes);
}
@Override
public void handleFlush() throws IOException {
flushSem.release();
}
}
/**
* Test that the packed code works when performing multiple flushes.
*/
@Test(timeout = 60000)
public void testMultiplePackedFlushes() throws Exception {
final Random rand = new Random(123);
final HTracedProcess ht = new HTracedProcess.Builder().build();
try {
HTraceConfiguration conf = HTraceConfiguration.fromMap(
new HashMap<String, String>() {{
put(TracerId.TRACER_ID_KEY, "testMultiplePackedFlushes");
put(Conf.ADDRESS_KEY, ht.getHrpcAddr());
put(Conf.PACKED_KEY, "true");
put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "1");
}});
WaitForFlushes injector = new WaitForFlushes(5);
HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector);
Span[] spans = TestUtil.randomSpans(rand, 3);
while (true) {
for (Span span : spans) {
rcvr.receiveSpan(span);
}
if (injector.flushSem.availablePermits() >= 0) {
break;
}
Thread.sleep(1);
}
waitForSpans(ht, spans, 3);
rcvr.close();
} finally {
ht.destroy();
}
}
/**
* Test that the REST code works when performing multiple flushes.
*/
@Test(timeout = 60000)
public void testMultipleRestFlushes() throws Exception {
final Random rand = new Random(123);
final HTracedProcess ht = new HTracedProcess.Builder().build();
try {
HTraceConfiguration conf = HTraceConfiguration.fromMap(
new HashMap<String, String>() {{
put(TracerId.TRACER_ID_KEY, "testMultipleRestFlushes");
put(Conf.ADDRESS_KEY, ht.getHttpAddr());
put(Conf.PACKED_KEY, "false");
put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "1");
}});
WaitForFlushes injector = new WaitForFlushes(5);
HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector);
Span[] spans = TestUtil.randomSpans(rand, 3);
while (true) {
for (Span span : spans) {
rcvr.receiveSpan(span);
}
if (injector.flushSem.availablePermits() >= 0) {
break;
}
Thread.sleep(1);
}
waitForSpans(ht, spans, 3);
rcvr.close();
} finally {
ht.destroy();
}
}
/**
* Test that the packed code works when performing multiple flushes.
*/
@Test(timeout = 60000)
public void testPackedRetryAfterFlushError() throws Exception {
final Random rand = new Random(123);
final HTracedProcess ht = new HTracedProcess.Builder().build();
try {
HTraceConfiguration conf = HTraceConfiguration.fromMap(
new HashMap<String, String>() {{
put(TracerId.TRACER_ID_KEY, "testPackedRetryAfterFlushError");
put(Conf.ADDRESS_KEY, ht.getHrpcAddr());
put(Conf.PACKED_KEY, "true");
put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "1000");
put(Conf.FLUSH_RETRY_DELAYS_KEY, "100,100,100,100,100,100,100");
}});
TestInjectFlushFaults injector = new TestInjectFlushFaults(5);
HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector);
Span[] spans = TestUtil.randomSpans(rand, 3);
for (Span span : spans) {
rcvr.receiveSpan(span);
}
waitForSpans(ht, spans);
rcvr.close();
} finally {
ht.destroy();
}
}
/**
* Test that the REST code works when performing multiple flushes.
*/
@Test(timeout = 60000)
public void testRestRetryAfterFlushError() throws Exception {
final Random rand = new Random(123);
final HTracedProcess ht = new HTracedProcess.Builder().build();
try {
HTraceConfiguration conf = HTraceConfiguration.fromMap(
new HashMap<String, String>() {{
put(TracerId.TRACER_ID_KEY, "testRestRetryAfterFlushError");
put(Conf.ADDRESS_KEY, ht.getHttpAddr());
put(Conf.PACKED_KEY, "false");
put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "1000");
put(Conf.FLUSH_RETRY_DELAYS_KEY, "100,100,100,100,100,100,100");
}});
TestInjectFlushFaults injector = new TestInjectFlushFaults(5);
HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector);
Span[] spans = TestUtil.randomSpans(rand, 3);
for (Span span : spans) {
rcvr.receiveSpan(span);
}
waitForSpans(ht, spans);
rcvr.close();
} finally {
ht.destroy();
}
}
}