blob: b1134c998d111d567ee6570d2246c7be87c01b5f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using Apache.Arrow.Adbc.Tracing;
using Apache.Arrow.Ipc;
using OpenTelemetry;
using OpenTelemetry.Trace;
using Xunit;
using Xunit.Abstractions;
namespace Apache.Arrow.Adbc.Tests.Tracing
{
public class TracingTests(ITestOutputHelper? outputHelper) : IDisposable
{
private const string SourceTagName = "sourceTagName";
private const string SourceTagValue = "sourceTagValue";
private const string TraceParent = "00-3236da27af79882bd317c4d1c3776982-a3cc9bd52ccd58e6-01";
private readonly ITestOutputHelper? _outputHelper = outputHelper;
private bool _disposed;
[Fact]
internal void CanStartActivity()
{
string activitySourceName = NewName();
Queue<Activity> exportedActivities = new();
using TracerProvider provider = Sdk.CreateTracerProviderBuilder()
.AddSource(activitySourceName)
.AddTestActivityQueueExporter(exportedActivities)
.Build();
var testClass = new TraceProducer(activitySourceName);
testClass.MethodWithNoInstrumentation();
Assert.Empty(exportedActivities);
testClass.MethodWithActivity();
Assert.True(exportedActivities.Count > 0);
long currLength = exportedActivities.Count;
testClass.MethodWithNoInstrumentation();
Assert.Equal(currLength, exportedActivities.Count);
int lineCount = 0;
foreach (var exportedActivity in exportedActivities)
{
lineCount++;
Assert.NotNull(exportedActivity);
Assert.Contains(nameof(TraceProducer.MethodWithActivity), exportedActivity.OperationName);
Assert.DoesNotContain(nameof(TraceProducer.MethodWithNoInstrumentation), exportedActivity.OperationName);
}
Assert.Equal(1, lineCount);
}
[Fact]
internal void CanAddEvent()
{
string activitySourceName = NewName();
Queue<Activity> exportedActivities = new();
using TracerProvider provider = Sdk.CreateTracerProviderBuilder()
.AddSource(activitySourceName)
.AddTestActivityQueueExporter(exportedActivities)
.Build();
var testClass = new TraceProducer(activitySourceName);
testClass.MethodWithNoInstrumentation();
Assert.Empty(exportedActivities);
string eventName = NewName();
testClass.MethodWithEvent(eventName);
Assert.True(exportedActivities.Count > 0);
long currLength = exportedActivities.Count;
testClass.MethodWithNoInstrumentation();
Assert.Equal(currLength, exportedActivities.Count);
int lineCount = 0;
foreach (var exportedActivity in exportedActivities)
{
lineCount++;
Assert.NotNull(exportedActivity);
Assert.Contains(nameof(TraceProducer.MethodWithEvent), exportedActivity.OperationName);
Assert.DoesNotContain(nameof(TraceProducer.MethodWithNoInstrumentation), exportedActivity.OperationName);
Assert.Contains(eventName, exportedActivity.Events.FirstOrDefault().Name);
}
}
[Fact]
internal void CanAddActivityWithDepth()
{
string activitySourceName = NewName();
Queue<Activity> exportedActivities = new();
using TracerProvider provider = Sdk.CreateTracerProviderBuilder()
.AddSource(activitySourceName)
.AddTestActivityQueueExporter(exportedActivities)
.Build();
var testClass = new TraceProducer(activitySourceName);
const int recurseCount = 5;
testClass.MethodWithActivityRecursive(nameof(TraceProducer.MethodWithActivityRecursive), recurseCount);
int lineCount = 0;
foreach (var exportedActivity in exportedActivities)
{
lineCount++;
Assert.NotNull(exportedActivity);
Assert.Contains(nameof(TraceProducer.MethodWithActivityRecursive), exportedActivity.OperationName);
Assert.DoesNotContain(nameof(TraceProducer.MethodWithNoInstrumentation), exportedActivity.OperationName);
Assert.NotNull(exportedActivity);
}
Assert.Equal(recurseCount, lineCount);
}
[Fact]
internal void CanAddTraceParent()
{
string activitySourceName = NewName();
Queue<Activity> exportedActivities = new();
using TracerProvider provider1 = Sdk.CreateTracerProviderBuilder()
.AddSource(activitySourceName)
.AddTestActivityQueueExporter(exportedActivities)
.Build();
var testClass = new TraceProducer(activitySourceName);
testClass.MethodWithNoInstrumentation();
Assert.Empty(exportedActivities);
const string eventNameWithParent = "eventNameWithParent";
const string eventNameWithoutParent = "eventNameWithoutParent";
testClass.MethodWithActivity(eventNameWithoutParent);
Assert.True(exportedActivities.Count() > 0);
const int withParentCountExpected = 10;
for (int i = 0; i < withParentCountExpected; i++)
{
testClass.MethodWithActivity(eventNameWithParent, TraceParent);
}
testClass.MethodWithActivity(eventNameWithoutParent);
Assert.True(exportedActivities.Count() > 0);
int lineCount = 0;
int withParentCount = 0;
int withoutParentCount = 0;
foreach (var exportedActivity in exportedActivities)
{
lineCount++;
Assert.NotNull(exportedActivity);
if (exportedActivity.OperationName.Contains(eventNameWithoutParent))
{
withoutParentCount++;
Assert.Null(exportedActivity.ParentId);
}
else if (exportedActivity.OperationName.Contains(eventNameWithParent))
{
withParentCount++;
Assert.Equal(TraceParent, exportedActivity.ParentId);
}
}
Assert.Equal(2, withoutParentCount);
Assert.Equal(withParentCountExpected, withParentCount);
}
[Fact]
internal void CanListenAndFilterActivitySourceTagsUsingActivityTrace()
{
string activitySourceName = NewName();
Queue<Activity> exportedActivities = new();
using (ActivityListener activityListener = new()
{
ShouldListenTo = source =>
{
return source.Name == activitySourceName
&& source.Tags?.Any(t => t.Key == SourceTagName && t.Value?.Equals(SourceTagValue) == true) == true;
},
Sample = (ref ActivityCreationOptions<ActivityContext> options) => ActivitySamplingResult.AllDataAndRecorded,
ActivityStopped = activity => exportedActivities.Enqueue(activity)
})
{
ActivitySource.AddActivityListener(activityListener);
var testClass = new TraceProducer(activitySourceName);
testClass.MethodWithActivity();
}
Assert.Single(exportedActivities);
}
[Fact]
internal void CanListenAndFilterActivitySourceTagsUsingTracingConnection()
{
string activitySourceName = NewName();
Queue<Activity> exportedActivities = new();
var testClass = new MyTracingConnection(new Dictionary<string, string>(), activitySourceName);
using (ActivityListener activityListener = new()
{
ShouldListenTo = source =>
{
return source.Name == testClass.ActivitySourceName
&& source.Tags?.Any(t => t.Key == SourceTagName && t.Value?.Equals(SourceTagValue) == true) == true;
},
Sample = (ref ActivityCreationOptions<ActivityContext> options) => ActivitySamplingResult.AllDataAndRecorded,
ActivityStopped = activity => exportedActivities.Enqueue(activity)
})
{
ActivitySource.AddActivityListener(activityListener);
testClass.MethodWithActivity();
}
Assert.Single(exportedActivities);
}
[Fact]
internal async Task CanDetectInvalidAsyncCall()
{
string activitySourceName = NewName();
Queue<Activity> exportedActivities = new();
var testClass = new MyTracingConnection(new Dictionary<string, string>(), activitySourceName);
using (ActivityListener activityListener = new()
{
ShouldListenTo = source =>
{
return source.Name == testClass.ActivitySourceName
&& source.Tags?.Any(t => t.Key == SourceTagName && t.Value?.Equals(SourceTagValue) == true) == true;
},
Sample = (ref ActivityCreationOptions<ActivityContext> options) => ActivitySamplingResult.AllDataAndRecorded,
ActivityStopped = activity => exportedActivities.Enqueue(activity)
})
{
ActivitySource.AddActivityListener(activityListener);
await Assert.ThrowsAnyAsync<InvalidOperationException>(testClass.MethodWithInvalidAsyncTraceActivity1);
await Assert.ThrowsAnyAsync<InvalidOperationException>(testClass.MethodWithInvalidAsyncTraceActivity2);
await Assert.ThrowsAnyAsync<InvalidOperationException>(async () => await testClass.MethodWithInvalidAsyncTraceActivity3());
await Assert.ThrowsAnyAsync<InvalidOperationException>(async () => await testClass.MethodWithInvalidAsyncTraceActivity4());
await Assert.ThrowsAnyAsync<InvalidOperationException>(testClass.MethodWithInvalidAsyncTraceActivity5);
}
}
internal static string NewName() => Guid.NewGuid().ToString().Replace("-", "").ToLower();
protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
}
_disposed = true;
}
}
public void Dispose()
{
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
private class TraceProducer : IDisposable
{
private readonly ActivityTrace _trace;
private bool _isDisposed;
internal TraceProducer(string? activitySourceName = default, string? traceParent = default)
{
IEnumerable<KeyValuePair<string, object?>>? tags = [new(SourceTagName, SourceTagValue)];
_trace = new ActivityTrace(activitySourceName, traceParent: traceParent, tags: tags);
}
internal void MethodWithNoInstrumentation()
{
}
internal void MethodWithActivity()
{
_trace.TraceActivity(_ => { });
}
internal void MethodWithActivity(string activityName, string? traceParent = default)
{
_trace.TraceActivity(activity => { }, activityName: activityName, traceParent: traceParent);
}
internal void MethodWithActivityRecursive(string activityName, int recurseCount)
{
_trace.TraceActivity(_ =>
{
recurseCount--;
if (recurseCount > 0)
{
MethodWithActivityRecursive(activityName, recurseCount);
}
}, activityName: activityName + recurseCount.ToString());
}
internal void MethodWithEvent(string eventName)
{
_trace.TraceActivity((activity) => activity?.AddEvent(eventName));
}
internal void MethodWithAllProperties(
string activityName,
string eventName,
IReadOnlyList<KeyValuePair<string, object?>> tags,
string traceParent)
{
_trace.TraceActivity(activity =>
{
foreach (KeyValuePair<string, object?> tag in tags)
{
activity?.AddTag(tag.Key, tag.Value)
.AddBaggage(tag.Key, tag.Value?.ToString());
}
activity?.AddEvent(eventName, tags)
.AddLink(traceParent, tags);
}, activityName: activityName, traceParent: traceParent);
}
protected virtual void Dispose(bool disposing)
{
if (!_isDisposed && disposing)
{
_trace.Dispose();
_isDisposed = true;
}
}
public void Dispose()
{
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
}
private class MyTracingConnection(IReadOnlyDictionary<string, string> properties, string assemblyName) : TracingConnection(properties)
{
public override string AssemblyVersion => "1.0.0";
public override string AssemblyName { get; } = assemblyName;
public override IEnumerable<KeyValuePair<string, object?>>? GetActivitySourceTags(IReadOnlyDictionary<string, string> properties)
{
return [new KeyValuePair<string, object?>(SourceTagName, SourceTagValue)];
}
public void MethodWithActivity()
{
this.TraceActivity(activity =>
{
activity?.AddTag("exampleTag", "exampleValue")
.AddBaggage("exampleBaggage", "exampleBaggageValue")
.AddEvent("exampleEvent", [new KeyValuePair<string, object?>("eventTag", "eventValue")])
.AddLink(TraceParent, [new KeyValuePair<string, object?>("linkTag", "linkValue")]);
});
}
public async Task<bool> MethodWithInvalidAsyncTraceActivity1()
{
// This method is intended to demonstrate incorrect usage of TraceActivity with async methods.
return await this.TraceActivity(async activity =>
{
await Task.Delay(1);
return true;
});
}
public async Task MethodWithInvalidAsyncTraceActivity2()
{
// This method is intended to demonstrate incorrect usage of TraceActivity with async methods.
await this.TraceActivity(async activity =>
{
await Task.Delay(1);
return;
});
}
public async ValueTask<bool> MethodWithInvalidAsyncTraceActivity3()
{
// This method is intended to demonstrate incorrect usage of TraceActivity with async methods.
return await this.TraceActivity(async activity =>
{
await Task.Delay(1);
return true;
});
}
public async ValueTask MethodWithInvalidAsyncTraceActivity4()
{
// This method is intended to demonstrate incorrect usage of TraceActivity with async methods.
await this.TraceActivity(async activity =>
{
await Task.Delay(1);
return;
});
}
public async Task<bool> MethodWithInvalidAsyncTraceActivity5()
{
// This method is intended to demonstrate incorrect usage of TraceActivity with async methods.
return await this.TraceActivity(async activity =>
{
await Task.Delay(1);
return await new AwaitableBool();
});
}
public class AwaitableBool
{
public BoolAwaiter GetAwaiter()
{
return new BoolAwaiter();
}
public class BoolAwaiter : INotifyCompletion
{
public bool IsCompleted => throw new NotImplementedException();
public bool GetResult()
{
throw new NotImplementedException();
}
public void OnCompleted(Action continuation)
{
throw new NotImplementedException();
}
}
}
public override AdbcStatement CreateStatement() => throw new NotImplementedException();
public override IArrowArrayStream GetObjects(GetObjectsDepth depth, string? catalogPattern, string? dbSchemaPattern, string? tableNamePattern, IReadOnlyList<string>? tableTypes, string? columnNamePattern) => throw new NotImplementedException();
public override Schema GetTableSchema(string? catalog, string? dbSchema, string tableName) => throw new NotImplementedException();
public override IArrowArrayStream GetTableTypes() => throw new NotImplementedException();
}
internal class ActivityQueueExporter(Queue<Activity> exportedActivities) : BaseExporter<Activity>
{
private Queue<Activity> ExportedActivities { get; } = exportedActivities;
public override ExportResult Export(in Batch<Activity> batch)
{
foreach (Activity activity in batch)
{
ExportedActivities.Enqueue(activity);
}
return ExportResult.Success;
}
}
}
public static class AdbcMemoryTestExporterExtensions
{
public static TracerProviderBuilder AddTestActivityQueueExporter(this TracerProviderBuilder builder, Queue<Activity> queue)
{
return builder.AddProcessor(sp => new SimpleActivityExportProcessor(new TracingTests.ActivityQueueExporter(queue)));
}
}
}