blob: f7d3840e097d9a3cee6e461167af0cbe66d15a20 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.htrace.impl;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import junit.framework.Assert;
import org.apache.avro.AvroRemoteException;
import org.apache.avro.ipc.Server;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.api.RpcTestUtils;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.flume.source.avro.AvroSourceProtocol;
import org.apache.flume.source.avro.Status;
import org.apache.htrace.HTraceConfiguration;
import org.apache.htrace.Span;
import org.apache.htrace.SpanReceiver;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceCreator;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestFlumeSpanReceiver {
private static final Log LOG = LogFactory.getLog(TestFlumeSpanReceiver.class);
private static final String ROOT_SPAN_DESC = "ROOT";
private SpanReceiver spanReceiver;
private Server flumeServer;
private TraceCreator traceCreator;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
}
@Test
public void testSimpleTraces() throws FlumeException,
EventDeliveryException, IOException {
AvroHandler avroHandler = null;
List<Span> spans = null;
try {
avroHandler = new AvroHandler();
startReceiver(null, avroHandler);
spans = new ArrayList<Span>();
Span rootSpan = new MilliSpan.Builder().
description(ROOT_SPAN_DESC).
traceId(1).
spanId(100).
processId("test").
begin(System.currentTimeMillis()).
build();
Span innerOne = rootSpan.child("Some good work");
Span innerTwo = innerOne.child("Some more good work");
innerTwo.stop();
spans.add(innerTwo);
innerOne.stop();
spans.add(innerOne);
rootSpan.addKVAnnotation("foo".getBytes(), "bar".getBytes());
rootSpan.addTimelineAnnotation("timeline");
rootSpan.stop();
spans.add(rootSpan);
} finally {
stopReceiver();
}
List<AvroFlumeEvent> events = avroHandler.getAllEvents();
Assert.assertEquals(spans.size(), events.size());
for (int i = 0; i < spans.size(); i ++) {
String json = new String(events.get(i).getBody().array(), Charset.forName("UTF-8"));
Assert.assertTrue(json.contains(spans.get(i).getDescription()));
}
}
@Test
public void testConcurrency() throws FlumeException,
EventDeliveryException, IOException {
try {
Map<String, String> extraConf = new HashMap<String, String>();
extraConf.put(FlumeSpanReceiver.NUM_THREADS_KEY, "5");
startReceiver(extraConf, new RpcTestUtils.OKAvroHandler());
traceCreator.createThreadedTrace();
} finally {
stopReceiver();
}
}
@Test
public void testResilience() throws FlumeException,
EventDeliveryException, IOException {
try {
startReceiver(null, new RpcTestUtils.FailedAvroHandler());
traceCreator.createThreadedTrace();
} finally {
stopReceiver();
}
}
private void startReceiver(Map<String, String> extraConf, AvroSourceProtocol avroHandler) {
// Start Flume server
Assert.assertNull(flumeServer);
flumeServer = RpcTestUtils.startServer(avroHandler);
// Create and configure span receiver
Map<String, String> conf = new HashMap<String, String>();
conf.put(FlumeSpanReceiver.FLUME_HOSTNAME_KEY, "127.0.0.1");
conf.put(FlumeSpanReceiver.FLUME_PORT_KEY, Integer.toString(flumeServer.getPort()));
if (extraConf != null) {
conf.putAll(extraConf);
}
spanReceiver = new FlumeSpanReceiver(HTraceConfiguration.fromMap(conf));
// Create trace creator, it will register our receiver
traceCreator = new TraceCreator(spanReceiver);
}
private void stopReceiver() throws IOException {
// Close span receiver
if (spanReceiver != null) {
Trace.removeReceiver(spanReceiver);
spanReceiver.close();
spanReceiver = null;
}
// Close Flume server
if (flumeServer != null) {
RpcTestUtils.stopServer(flumeServer);
flumeServer = null;
}
}
private static class AvroHandler implements AvroSourceProtocol {
private ArrayList<AvroFlumeEvent> all_events = new ArrayList<AvroFlumeEvent>();
public List<AvroFlumeEvent> getAllEvents() {
return new ArrayList<AvroFlumeEvent>(all_events);
}
@Override
public Status append(AvroFlumeEvent event) throws AvroRemoteException {
all_events.add(event);
return Status.OK;
}
@Override
public Status appendBatch(List<AvroFlumeEvent> events) throws
AvroRemoteException {
all_events.addAll(events);
return Status.OK;
}
}
}