blob: e7ed526475f46cadcc262caa44c27b0e88b2ee04 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Client, Partitioning } from "apache-iggy";
import debug from "debug";
import {
PARTITION_COUNT,
BATCHES_LIMIT,
MESSAGES_PER_BATCH,
initSystem,
} from "../utils";
const log = debug("iggy:examples:basic-producer");
function parseArgs() {
console.log = (...args) => process.stdout.write(args.join(" ") + "\n");
const args = process.argv.slice(2);
const connectionString = args[0] || "iggy+tcp://iggy:iggy@127.0.0.1:8090";
if (args.length > 0 && (args[0] === "-h" || args[0] === "--help")) {
log("Usage: node producer.js [connection_string]");
log("Example: node producer.js iggy+tcp://iggy:iggy@127.0.0.1:8090");
process.exit(0);
}
return { connectionString };
}
async function produceMessages(
client: Client,
stream: Awaited<ReturnType<typeof initSystem>>["stream"],
topic: Awaited<ReturnType<typeof initSystem>>["topic"],
) {
const interval = 500; // 500 milliseconds
log(
"Messages will be sent to stream: %d, topic: %d, partition: %d with interval %d ms.",
interval,
);
let currentId = 0;
let sentBatches = 0;
for (; sentBatches < BATCHES_LIMIT; ) {
const messages = Array.from({ length: MESSAGES_PER_BATCH }).map(() => {
currentId++;
return {
//optional message id can be used for deduplication
id: currentId,
//optional headers can be used to store metadata
headers: [],
payload: `message-${currentId}`,
};
});
try {
await client.message.send({
streamId: stream.id,
topicId: topic.id,
messages,
partition: Partitioning.PartitionId(
topic.partitions[Math.floor(Math.random() * topic.partitions.length)]
.id,
),
});
} catch (error) {
log("Error sending messages: %o", error);
log(
"This might be due to server version compatibility. The stream and topic creation worked successfully.",
);
log(
"Please check the Iggy server version and ensure it supports the SendMessages command.",
);
} finally {
sentBatches++;
log("Sent messages: %o", messages);
await new Promise((resolve) => setTimeout(resolve, interval));
}
}
log("Sent %d batches of messages, exiting.", sentBatches);
}
async function cleanup(
client: Client,
streamId: number | string,
topicId: number | string,
) {
log(
"Cleaning up: deleting topic ID %d and stream ID %d...",
topicId,
streamId,
);
try {
await client.topic.delete({
streamId: streamId,
topicId: topicId,
partitionsCount: PARTITION_COUNT,
});
log("Topic deleted successfully.");
} catch (error) {
log("Error deleting topic: %o", error);
}
try {
await client.stream.delete({ streamId });
log("Stream deleted successfully.");
} catch (error) {
log("Error deleting stream: %o", error);
}
}
async function main() {
const args = parseArgs();
log("Using connection string: %s", args.connectionString);
// Parse connection string (simplified parsing for this example)
const url = new URL(args.connectionString.replace("iggy+tcp://", "http://"));
const host = url.hostname;
const port = parseInt(url.port) || 8090;
const username = url.username || "iggy";
const password = url.password || "iggy";
const client = new Client({
transport: "TCP",
options: {
port,
host,
keepAlive: true,
},
reconnect: {
enabled: true,
interval: 5000,
maxRetries: 5,
},
heartbeatInterval: 5000,
credentials: { username, password },
});
let streamId = null;
// iggy will create topic with id 0 by default
let topicId = 0;
try {
log("Basic producer has started, selected transport: TCP");
log("Connecting to Iggy server...");
// Client connects automatically when first command is called
log("Connected successfully.");
// Login will be handled automatically by the client on first command
const { stream, topic } = await initSystem(client);
streamId = stream.id;
topicId = topic.id;
//ping before producing messages
const pong = await client.system.ping();
log("Ping successful.", pong);
const stats = await client.system.getStats();
log("System stats: %o", stats);
log("Stream ID: %s, Topic ID: %s", streamId, topicId);
await produceMessages(client, stream, topic);
} catch (error) {
log("Error in main: %o", error);
await client.destroy();
log("Disconnected from server.");
process.exitCode = 1;
} finally {
if (streamId) {
await cleanup(client, streamId, topicId);
}
await client.destroy();
log("Disconnected from server.");
}
}
await main();