The Apache Iggy C# SDK provides a comprehensive client library for interacting with Iggy message streaming servers. It offers a modern, async-first API with support for multiple transport protocols and comprehensive message streaming capabilities.
Install the NuGet package:
dotnet add package Apache.Iggy
The SDK supports two transport protocols:
The SDK is built around the IIggyClient interface. To create a client instance:
var loggerFactory = LoggerFactory.Create(builder => { builder .AddFilter("Apache.Iggy", LogLevel.Information) .AddConsole(); }); var client = IggyClientFactory.CreateClient(new IggyClientConfigurator { BaseAddress = "127.0.0.1:8090", Protocol = Protocol.Tcp }, loggerFactory); await client.ConnectAsync();
The ILoggerFactory is required and used throughout the SDK for diagnostics and debugging.
The IggyClientConfigurator provides comprehensive configuration options:
var client = IggyClientFactory.CreateClient(new IggyClientConfigurator { BaseAddress = "127.0.0.1:8090", Protocol = Protocol.Tcp, // Buffer sizes (optional, default: 4096) ReceiveBufferSize = 4096, SendBufferSize = 4096, // TLS/SSL configuration TlsSettings = new TlsConfiguration { Enabled = true, Hostname = "iggy", CertificatePath = "/path/to/cert" }, // Automatic reconnection with exponential backoff ReconnectionSettings = new ReconnectionSettings { Enabled = true, MaxRetries = 3, // 0 = infinite retries InitialDelay = TimeSpan.FromSeconds(5), MaxDelay = TimeSpan.FromSeconds(30), WaitAfterReconnect = TimeSpan.FromSeconds(1), UseExponentialBackoff = true, BackoffMultiplier = 2.0 }, // Auto-login after connection AutoLoginSettings = new AutoLoginSettings { Enabled = true, Username = "your_username", Password = "your_password" } }, loggerFactory); await client.ConnectAsync();
Begin by using the root account (note: the root account cannot be removed or updated):
var response = await client.LoginUser("iggy", "iggy");
Create new users with customizable permissions:
var permissions = new Permissions { Global = new GlobalPermissions { ManageServers = true, ManageUsers = true, ManageStreams = true, ManageTopics = true, PollMessages = true, ReadServers = true, ReadStreams = true, ReadTopics = true, ReadUsers = true, SendMessages = true } }; await client.CreateUser("test_user", "secure_password", UserStatus.Active, permissions); // Login with the new user var loginResponse = await client.LoginUser("test_user", "secure_password");
Create and use Personal Access Tokens (PAT) for programmatic access:
// Create a PAT var patResponse = await client.CreatePersonalAccessTokenAsync("api-token", 3600); // Login with PAT await client.LoginWithPersonalAccessToken(patResponse.Token);
await client.CreateStreamAsync("my-stream");
You can reference streams by either numeric ID or name:
var streamById = Identifier.Numeric(0); var streamByName = Identifier.String("my-stream");
Every stream contains topics for organizing messages:
var streamId = Identifier.String("my-stream"); await client.CreateTopicAsync( streamId, name: "my-topic", partitionsCount: 3, compressionAlgorithm: CompressionAlgorithm.None, replicationFactor: 1, messageExpiry: 0, // 0 = never expire maxTopicSize: 0 // 0 = unlimited );
Note: Stream and topic names use hyphens instead of spaces. Iggy automatically replaces spaces with hyphens.
Send messages using the publisher interface:
var streamId = Identifier.String("my-stream"); var topicId = Identifier.String("my-topic"); var messages = new List<Message> { new(Guid.NewGuid(), "Hello, Iggy!"u8.ToArray()), new(1, "Another message"u8.ToArray()) }; await client.SendMessagesAsync( streamId, topicId, Partitioning.None(), // balanced partitioning messages );
Control which partition receives each message:
// Balanced partitioning (default) Partitioning.None() // Send to specific partition Partitioning.PartitionId(1) // Key-based partitioning (string) Partitioning.EntityIdString("user-123") // Key-based partitioning (integer) Partitioning.EntityIdInt(12345) // Key-based partitioning (GUID) Partitioning.EntityIdGuid(Guid.NewGuid())
Add custom headers to messages with typed values:
var headers = new Dictionary<HeaderKey, HeaderValue> { { new HeaderKey { Value = "correlation_id" }, HeaderValue.FromString("req-123") }, { new HeaderKey { Value = "priority" }, HeaderValue.FromInt32(1) }, { new HeaderKey { Value = "timeout" }, HeaderValue.FromInt64(5000) }, { new HeaderKey { Value = "confidence" }, HeaderValue.FromFloat(0.95f) }, { new HeaderKey { Value = "is_urgent" }, HeaderValue.FromBool(true) }, { new HeaderKey { Value = "request_id" }, HeaderValue.FromGuid(Guid.NewGuid()) } }; var messages = new List<Message> { new(Guid.NewGuid(), "Message with headers"u8.ToArray(), headers) }; await client.SendMessagesAsync( streamId, topicId, Partitioning.PartitionId(1), messages );
Coordinate message consumption across multiple consumers:
var groupResponse = await client.CreateConsumerGroupAsync( Identifier.String("my-stream"), Identifier.String("my-topic"), "my-consumer-group" );
Note: Join/Leave operations are only supported on TCP protocol and will throw FeatureUnavailableException on HTTP.
// Join a consumer group await client.JoinConsumerGroupAsync( Identifier.String("my-stream"), Identifier.String("my-topic"), Identifier.Numeric(1) // consumer ID ); // Leave a consumer group await client.LeaveConsumerGroupAsync( Identifier.String("my-stream"), Identifier.String("my-topic"), Identifier.Numeric(1) // consumer ID );
Fetch a batch of messages:
var polledMessages = await client.PollMessagesAsync(new MessageFetchRequest { StreamId = streamId, TopicId = topicId, Consumer = Consumer.New(1), // or Consumer.Group("my-consumer-group") for consumer group Count = 10, PartitionId = 0, // optional, null for consumer group PollingStrategy = PollingStrategy.Next(), AutoCommit = true }); foreach (var message in polledMessages.Messages) { Console.WriteLine($"Message: {Encoding.UTF8.GetString(message.Payload)}"); }
Control where message consumption starts:
// Start from a specific offset PollingStrategy.Offset(1000) // Start from a specific timestamp (microseconds since epoch) PollingStrategy.Timestamp(1699564800000000) // Start from the first message PollingStrategy.First() // Start from the last message PollingStrategy.Last() // Start from the next unread message PollingStrategy.Next()
Store the current consumer position:
await client.StoreOffsetAsync( Identifier.String("my-stream"), Identifier.String("my-topic"), Identifier.Numeric(1), // consumer ID 0, // partition ID 42 // offset value );
Get the current stored offset:
var offsetInfo = await client.GetOffsetAsync( Identifier.String("my-stream"), Identifier.String("my-topic"), Identifier.Numeric(1), // consumer ID 0 // partition ID ); Console.WriteLine($"Current offset: {offsetInfo.Offset}");
Clear stored offsets:
await client.DeleteOffsetAsync( Identifier.String("my-stream"), Identifier.String("my-topic"), Identifier.Numeric(1), // consumer ID 0 // partition ID );
Get cluster metadata and node information:
var metadata = await client.GetClusterMetadataAsync();
Retrieve server performance metrics:
var stats = await client.GetStatsAsync();
Verify server connectivity:
await client.PingAsync();
Get information about connected clients:
var clients = await client.GetClientsAsync(); var currentClient = await client.GetMeAsync();
Subscribe to connection events:
// Subscribe to connection events client.SubscribeConnectionEvents(async connectionState => { Console.WriteLine($"Current connection state: {connectionState.CurrentState}"); await SaveConnectionStateLog(connectionState.CurrentState); }); // Unsubscribe client.UnsubscribeConnectionEvents(handler);
High-level publisher with background sending, retries, and encryption:
var publisher = IggyPublisherBuilder.Create( client, Identifier.String("my-stream"), Identifier.String("my-topic") ) .WithBackgroundSending(enabled: true, batchSize: 100) .WithRetry(maxAttempts: 3) .Build(); await publisher.InitAsync(); var messages = new List<Message> { new(Guid.NewGuid(), "Message 1"u8.ToArray()), new(0, "Message 2"u8.ToArray()) }; await publisher.SendMessages(messages); // Wait for all messages to be sent await publisher.WaitUntilAllSends(); await publisher.DisposeAsync();
For automatic object serialization, use the typed variant:
class OrderSerializer : ISerializer<Order> { public byte[] Serialize(Order data) => Encoding.UTF8.GetBytes(JsonSerializer.Serialize(data)); } var publisher = IggyPublisherBuilder<Order>.Create( client, Identifier.String("orders-stream"), Identifier.String("orders-topic"), new OrderSerializer() ).Build(); await publisher.InitAsync(); await publisher.SendAsync(new List<Order> { /* ... */ });
High-level consumer with automatic offset management and consumer groups:
var consumer = IggyConsumerBuilder.Create( client, Identifier.String("my-stream"), Identifier.String("my-topic"), Consumer.New(1) ) .WithPollingStrategy(PollingStrategy.Next()) .WithBatchSize(10) .WithAutoCommitMode(AutoCommitMode.Auto) .Build(); await consumer.InitAsync(); await foreach (var message in consumer.ReceiveAsync()) { var payload = Encoding.UTF8.GetString(message.Message.Payload); Console.WriteLine($"Offset {message.CurrentOffset}: {payload}"); }
For consumer groups (load-balanced across multiple consumers):
var consumer = IggyConsumerBuilder.Create( client, Identifier.String("my-stream"), Identifier.String("my-topic"), Consumer.Group("my-group") ) .WithConsumerGroup("my-group", createIfNotExists: true) .WithPollingStrategy(PollingStrategy.Next()) .WithAutoCommitMode(AutoCommitMode.AfterReceive) .Build(); await consumer.InitAsync(); await foreach (var message in consumer.ReceiveAsync()) { Console.WriteLine($"Partition {message.PartitionId}: {message.Message.Payload}"); } await consumer.DisposeAsync();
For automatic deserialization:
class OrderDeserializer : IDeserializer<OrderEvent> { public OrderEvent Deserialize(byte[] data) => JsonSerializer.Deserialize<OrderEvent>(Encoding.UTF8.GetString(data))!; } var consumer = IggyConsumerBuilder<OrderEvent>.Create( client, Identifier.String("orders-stream"), Identifier.String("orders-topic"), Consumer.Group("order-processors"), new OrderDeserializer() ) .WithAutoCommitMode(AutoCommitMode.Auto) .Build(); await consumer.InitAsync(); await foreach (var message in consumer.ReceiveDeserializedAsync()) { if (message.Status == MessageStatus.Success) { Console.WriteLine($"Order: {message.Data?.OrderId}"); } }
The SDK provides the following main interfaces:
Additionally, builder-based APIs are available:
Examples are located in examples/csharp/ in root iggy directory.
cargo run --bin iggy-server
dotnet run -c Release --project Iggy_SDK.Examples.GettingStarted.Producer
dotnet run -c Release --project Iggy_SDK.Examples.GettingStarted.Consumer
Integration tests are located in Iggy_SDK.Tests.Integration/. Tests can run against:
IGGY_SERVER_HOST environment variable)cargo build docker build --no-cache -f core/server/Dockerfile --platform linux/amd64 --target runtime-prebuilt --build-arg PREBUILT_IGGY_SERVER=target/debug/iggy-server --build-arg PREBUILT_IGGY_CLI=target/debug/iggy -t local-iggy-server .
dotnet build foreign/csharp/Iggy_SDK.Tests.Integration
export IGGY_SERVER_DOCKER_IMAGE=local-iggy-server dotnet test foreign/csharp/Iggy_SDK.Tests.Integration --no-build --verbosity diagnostic
ASP.NET Core Dependency Injection