blob: bdd00a930409a04e491d91be6de161f2dcf6fffe [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import crypto from 'crypto';
import { Client } from 'apache-iggy';
import debug from 'debug';
export const log = debug('iggy:examples');
export const PARTITION_COUNT = 5;
export const STREAM_ID = 0;
export const TOPIC_ID = 0;
export const PARTITION_ID = 0;
export const BATCHES_LIMIT = 5;
export const MESSAGES_PER_BATCH = 10;
export async function initSystem(client: Client) {
log('Creating stream with random name...');
console.table(await client.stream.list());
const stream = await client.stream.create({
name: `sample-stream-${crypto.randomBytes(8).toString('hex')}`,
});
log('Stream was created successfully. Stream ID: %s', stream?.id);
log('Creating topic in stream ID %s...', stream.id);
const topic = await client.topic.create({
streamId: stream.id,
name: `sample-topic-${crypto.randomBytes(4).toString('hex')}`,
partitionCount: PARTITION_COUNT,
compressionAlgorithm: 1, // None
replicationFactor: 1,
});
log('Topic was created successfully.', 'Topic ID: %s', topic?.id);
return {
stream,
topic,
};
}
export async function cleanup(client: Client, streamId: number | string, topicId: number | string) {
log('Cleaning up: deleting topic ID %d and stream ID %d...', topicId, streamId);
try {
await client.topic.delete({
streamId: streamId,
topicId: topicId,
partitionsCount: PARTITION_COUNT,
});
log('Topic deleted successfully.');
} catch (error) {
log('Error deleting topic: %o', error);
}
try {
await client.stream.delete({ streamId });
log('Stream deleted successfully.');
} catch (error) {
log('Error deleting stream: %o', error);
}
}
export const sleep = (interval: number) => new Promise(resolve => setTimeout(resolve, interval));
export function parseArgs() {
console.log = (...args) => process.stdout.write(args.join(' ') + '\n');
const args = process.argv.slice(2);
const connectionString = args[0] || 'iggy+tcp://iggy:iggy@127.0.0.1:8090';
if (args.length > 0 && (args[0] === '-h' || args[0] === '--help')) {
log('Usage: node producer.ts [connection_string]');
log('Example: node producer.ts iggy+tcp://iggy:iggy@127.0.0.1:8090');
process.exit(0);
}
return { connectionString };
}