C# SDK for Iggy

Nuget (with prereleases)

Overview

The Apache Iggy C# SDK provides a comprehensive client library for interacting with Iggy message streaming servers. It offers a modern, async-first API with support for multiple transport protocols and comprehensive message streaming capabilities.

Getting Started

Installation

Install the NuGet package:

dotnet add package Apache.Iggy

Supported Protocols

The SDK supports two transport protocols:

  • TCP - Binary protocol for optimal performance and lower latency (recommended)
  • HTTP - RESTful JSON API for stateless operations

Creating a Client

The SDK is built around the IIggyClient interface. To create a client instance:

var loggerFactory = LoggerFactory.Create(builder =>
{
    builder
        .AddFilter("Apache.Iggy", LogLevel.Information)
        .AddConsole();
});

var client = IggyClientFactory.CreateClient(new IggyClientConfigurator
{
    BaseAddress = "127.0.0.1:8090",
    Protocol = Protocol.Tcp
}, loggerFactory);

await client.ConnectAsync();

The ILoggerFactory is required and used throughout the SDK for diagnostics and debugging.

Configuration

The IggyClientConfigurator provides comprehensive configuration options:

var client = IggyClientFactory.CreateClient(new IggyClientConfigurator
{
    BaseAddress = "127.0.0.1:8090",
    Protocol = Protocol.Tcp,

    // Buffer sizes (optional, default: 4096)
    ReceiveBufferSize = 4096,
    SendBufferSize = 4096,

    // TLS/SSL configuration
    TlsSettings = new TlsConfiguration
    {
        Enabled = true,
        Hostname = "iggy",
        CertificatePath = "/path/to/cert"
    },

    // Automatic reconnection with exponential backoff
    ReconnectionSettings = new ReconnectionSettings
    {
        Enabled = true,
        MaxRetries = 3,              // 0 = infinite retries
        InitialDelay = TimeSpan.FromSeconds(5),
        MaxDelay = TimeSpan.FromSeconds(30),
        WaitAfterReconnect = TimeSpan.FromSeconds(1),
        UseExponentialBackoff = true,
        BackoffMultiplier = 2.0
    },

    // Auto-login after connection
    AutoLoginSettings = new AutoLoginSettings
    {
        Enabled = true,
        Username = "your_username",
        Password = "your_password"
    }
}, loggerFactory);

await client.ConnectAsync();

Authentication

User Login

Begin by using the root account (note: the root account cannot be removed or updated):

var response = await client.LoginUser("iggy", "iggy");

Creating Users

Create new users with customizable permissions:

var permissions = new Permissions
{
    Global = new GlobalPermissions
    {
        ManageServers = true,
        ManageUsers = true,
        ManageStreams = true,
        ManageTopics = true,
        PollMessages = true,
        ReadServers = true,
        ReadStreams = true,
        ReadTopics = true,
        ReadUsers = true,
        SendMessages = true
    }
};

await client.CreateUser("test_user", "secure_password", UserStatus.Active, permissions);

// Login with the new user
var loginResponse = await client.LoginUser("test_user", "secure_password");

Personal Access Tokens

Create and use Personal Access Tokens (PAT) for programmatic access:

// Create a PAT
var patResponse = await client.CreatePersonalAccessTokenAsync("api-token", 3600);

// Login with PAT
await client.LoginWithPersonalAccessToken(patResponse.Token);

Streams and Topics

Creating Streams

await client.CreateStreamAsync("my-stream");

You can reference streams by either numeric ID or name:

var streamById = Identifier.Numeric(0);
var streamByName = Identifier.String("my-stream");

Creating Topics

Every stream contains topics for organizing messages:

var streamId = Identifier.String("my-stream");

await client.CreateTopicAsync(
    streamId,
    name: "my-topic",
    partitionsCount: 3,
    compressionAlgorithm: CompressionAlgorithm.None,
    replicationFactor: 1,
    messageExpiry: 0,  // 0 = never expire
    maxTopicSize: 0    // 0 = unlimited
);

Note: Stream and topic names use hyphens instead of spaces. Iggy automatically replaces spaces with hyphens.

Publishing Messages

Sending Messages

Send messages using the publisher interface:

var streamId = Identifier.String("my-stream");
var topicId = Identifier.String("my-topic");

var messages = new List<Message>
{
    new(Guid.NewGuid(), "Hello, Iggy!"u8.ToArray()),
    new(1, "Another message"u8.ToArray())
};

await client.SendMessagesAsync(
    streamId,
    topicId,
    Partitioning.None(),  // balanced partitioning
    messages
);

Partitioning Strategies

Control which partition receives each message:

// Balanced partitioning (default)
Partitioning.None()

// Send to specific partition
Partitioning.PartitionId(1)

// Key-based partitioning (string)
Partitioning.EntityIdString("user-123")

// Key-based partitioning (integer)
Partitioning.EntityIdInt(12345)

// Key-based partitioning (GUID)
Partitioning.EntityIdGuid(Guid.NewGuid())

User-Defined Headers

Add custom headers to messages with typed values:

var headers = new Dictionary<HeaderKey, HeaderValue>
{
    { new HeaderKey { Value = "correlation_id" }, HeaderValue.FromString("req-123") },
    { new HeaderKey { Value = "priority" }, HeaderValue.FromInt32(1) },
    { new HeaderKey { Value = "timeout" }, HeaderValue.FromInt64(5000) },
    { new HeaderKey { Value = "confidence" }, HeaderValue.FromFloat(0.95f) },
    { new HeaderKey { Value = "is_urgent" }, HeaderValue.FromBool(true) },
    { new HeaderKey { Value = "request_id" }, HeaderValue.FromGuid(Guid.NewGuid()) }
};

var messages = new List<Message>
{
    new(Guid.NewGuid(), "Message with headers"u8.ToArray(), headers)
};

await client.SendMessagesAsync(
    streamId,
    topicId,
    Partitioning.PartitionId(1),
    messages
);

Consumer Groups

Creating Consumer Groups

Coordinate message consumption across multiple consumers:

var groupResponse = await client.CreateConsumerGroupAsync(
    Identifier.String("my-stream"),
    Identifier.String("my-topic"),
    "my-consumer-group"
);

Joining and Leaving Groups

Note: Join/Leave operations are only supported on TCP protocol and will throw FeatureUnavailableException on HTTP.

// Join a consumer group
await client.JoinConsumerGroupAsync(
    Identifier.String("my-stream"),
    Identifier.String("my-topic"),
    Identifier.Numeric(1)  // consumer ID
);

// Leave a consumer group
await client.LeaveConsumerGroupAsync(
    Identifier.String("my-stream"),
    Identifier.String("my-topic"),
    Identifier.Numeric(1)  // consumer ID
);

Consuming Messages

Fetching Messages

Fetch a batch of messages:

var polledMessages = await client.PollMessagesAsync(new MessageFetchRequest
{
    StreamId = streamId,
    TopicId = topicId,
    Consumer = Consumer.New(1), // or Consumer.Group("my-consumer-group") for consumer group
    Count = 10,
    PartitionId = 0, // optional, null for consumer group
    PollingStrategy = PollingStrategy.Next(),
    AutoCommit = true
});

foreach (var message in polledMessages.Messages)
{
    Console.WriteLine($"Message: {Encoding.UTF8.GetString(message.Payload)}");
}

Polling Strategies

Control where message consumption starts:

// Start from a specific offset
PollingStrategy.Offset(1000)

// Start from a specific timestamp (microseconds since epoch)
PollingStrategy.Timestamp(1699564800000000)

// Start from the first message
PollingStrategy.First()

// Start from the last message
PollingStrategy.Last()

// Start from the next unread message
PollingStrategy.Next()

Offset Management

Storing Offsets

Store the current consumer position:

await client.StoreOffsetAsync(
    Identifier.String("my-stream"),
    Identifier.String("my-topic"),
    Identifier.Numeric(1),  // consumer ID
    0,                      // partition ID
    42                      // offset value
);

Retrieving Offsets

Get the current stored offset:

var offsetInfo = await client.GetOffsetAsync(
    Identifier.String("my-stream"),
    Identifier.String("my-topic"),
    Identifier.Numeric(1),  // consumer ID
    0                       // partition ID
);

Console.WriteLine($"Current offset: {offsetInfo.Offset}");

Deleting Offsets

Clear stored offsets:

await client.DeleteOffsetAsync(
    Identifier.String("my-stream"),
    Identifier.String("my-topic"),
    Identifier.Numeric(1),  // consumer ID
    0                       // partition ID
);

System Operations

Cluster Information

Get cluster metadata and node information:

var metadata = await client.GetClusterMetadataAsync();

Server Statistics

Retrieve server performance metrics:

var stats = await client.GetStatsAsync();

Health Checks

Verify server connectivity:

await client.PingAsync();

Client Information

Get information about connected clients:

var clients = await client.GetClientsAsync();
var currentClient = await client.GetMeAsync();

Event Subscription

Subscribe to connection events:

// Subscribe to connection events
client.SubscribeConnectionEvents(async connectionState =>
{
    Console.WriteLine($"Current connection state: {connectionState.CurrentState}");

    await SaveConnectionStateLog(connectionState.CurrentState);
});

// Unsubscribe
client.UnsubscribeConnectionEvents(handler);

Advanced: IggyPublisher

High-level publisher with background sending, retries, and encryption:

var publisher = IggyPublisherBuilder.Create(
    client,
    Identifier.String("my-stream"),
    Identifier.String("my-topic")
)
.WithBackgroundSending(enabled: true, batchSize: 100)
.WithRetry(maxAttempts: 3)
.Build();

await publisher.InitAsync();

var messages = new List<Message>
{
    new(Guid.NewGuid(), "Message 1"u8.ToArray()),
    new(0, "Message 2"u8.ToArray())
};

await publisher.SendMessages(messages);

// Wait for all messages to be sent
await publisher.WaitUntilAllSends();
await publisher.DisposeAsync();

For automatic object serialization, use the typed variant:

class OrderSerializer : ISerializer<Order>
{
    public byte[] Serialize(Order data) =>
        Encoding.UTF8.GetBytes(JsonSerializer.Serialize(data));
}

var publisher = IggyPublisherBuilder<Order>.Create(
    client,
    Identifier.String("orders-stream"),
    Identifier.String("orders-topic"),
    new OrderSerializer()
).Build();

await publisher.InitAsync();
await publisher.SendAsync(new List<Order> { /* ... */ });

Advanced: IggyConsumer

High-level consumer with automatic offset management and consumer groups:

var consumer = IggyConsumerBuilder.Create(
    client,
    Identifier.String("my-stream"),
    Identifier.String("my-topic"),
    Consumer.New(1)
)
.WithPollingStrategy(PollingStrategy.Next())
.WithBatchSize(10)
.WithAutoCommitMode(AutoCommitMode.Auto)
.Build();

await consumer.InitAsync();

await foreach (var message in consumer.ReceiveAsync())
{
    var payload = Encoding.UTF8.GetString(message.Message.Payload);
    Console.WriteLine($"Offset {message.CurrentOffset}: {payload}");
}

For consumer groups (load-balanced across multiple consumers):

var consumer = IggyConsumerBuilder.Create(
    client,
    Identifier.String("my-stream"),
    Identifier.String("my-topic"),
    Consumer.Group("my-group")
)
.WithConsumerGroup("my-group", createIfNotExists: true)
.WithPollingStrategy(PollingStrategy.Next())
.WithAutoCommitMode(AutoCommitMode.AfterReceive)
.Build();

await consumer.InitAsync();

await foreach (var message in consumer.ReceiveAsync())
{
    Console.WriteLine($"Partition {message.PartitionId}: {message.Message.Payload}");
}

await consumer.DisposeAsync();

For automatic deserialization:

class OrderDeserializer : IDeserializer<OrderEvent>
{
    public OrderEvent Deserialize(byte[] data) =>
        JsonSerializer.Deserialize<OrderEvent>(Encoding.UTF8.GetString(data))!;
}

var consumer = IggyConsumerBuilder<OrderEvent>.Create(
    client,
    Identifier.String("orders-stream"),
    Identifier.String("orders-topic"),
    Consumer.Group("order-processors"),
    new OrderDeserializer()
)
.WithAutoCommitMode(AutoCommitMode.Auto)
.Build();

await consumer.InitAsync();

await foreach (var message in consumer.ReceiveDeserializedAsync())
{
    if (message.Status == MessageStatus.Success)
    {
        Console.WriteLine($"Order: {message.Data?.OrderId}");
    }
}

API Reference

The SDK provides the following main interfaces:

  • IIggyClient - Main client interface (aggregates all features)
  • IIggyPublisher - High-level message publishing interface
  • IIggyConsumer - High-level message consumption interface
  • IIggyStream - Stream management
  • IIggyTopic - Topic management
  • IIggyOffset - Offset management
  • IIggyConsumerGroup - Consumer group operations
  • IIggyPartition - Partition operations
  • IIggyUsers - User and authentication management
  • IIggySystem - System and cluster operations
  • IIggyPersonalAccessToken - Personal access token management

Additionally, builder-based APIs are available:

  • IggyPublisherBuilder / IggyPublisherBuilder - Fluent publisher configuration
  • IggyConsumerBuilder / IggyConsumerBuilder - Fluent consumer configuration

Running Examples

Examples are located in examples/csharp/ in root iggy directory.

  • Start the Iggy server:
cargo run --bin iggy-server
  • Run the producer example:
dotnet run -c Release --project Iggy_SDK.Examples.GettingStarted.Producer
  • Run the consumer example:
dotnet run -c Release --project Iggy_SDK.Examples.GettingStarted.Consumer

Integration Tests

Integration tests are located in Iggy_SDK.Tests.Integration/. Tests can run against:

  • A dockerized Iggy server with TestContainers
  • A local Iggy server (set IGGY_SERVER_HOST environment variable)

Requirements

  • .NET 8 SDK
  • Docker (for TestContainers tests)

Running Integration Tests Locally

1. Dockerization

cargo build

docker build --no-cache -f core/server/Dockerfile --platform linux/amd64 --target runtime-prebuilt --build-arg PREBUILT_IGGY_SERVER=target/debug/iggy-server --build-arg PREBUILT_IGGY_CLI=target/debug/iggy -t local-iggy-server .

2. Build the Test Project

dotnet build foreign/csharp/Iggy_SDK.Tests.Integration

3. Run test

export IGGY_SERVER_DOCKER_IMAGE=local-iggy-server
dotnet test foreign/csharp/Iggy_SDK.Tests.Integration --no-build --verbosity diagnostic

Useful Resources

ROADMAP - TODO

  • [ ] Error handling with status codes and descriptions
  • [ ] Add support for ASP.NET Core Dependency Injection