blob: 3da202f8c3134a268629599d9f3c0fff0b9c7cee [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Client, Consumer, PollingStrategy } from 'apache-iggy';
import { log, initSystem, cleanup, BATCHES_LIMIT, MESSAGES_PER_BATCH } from '../utils';
interface OrderCreated {
orderId: string;
customerId: string;
amount: number;
}
interface OrderConfirmed {
orderId: string;
timestamp: number;
}
interface OrderRejected {
orderId: string;
reason: string;
}
const ORDER_CREATED_TYPE = 'OrderCreated';
const ORDER_CONFIRMED_TYPE = 'OrderConfirmed';
const ORDER_REJECTED_TYPE = 'OrderRejected';
function parseArgs() {
const args = process.argv.slice(2);
const connectionString = args[0] || 'iggy+tcp://iggy:iggy@127.0.0.1:8090';
if (args.length > 0 && (args[0] === '-h' || args[0] === '--help')) {
log('Usage: node consumer.ts [connection_string]');
log('Example: node consumer.ts iggy+tcp://iggy:iggy@127.0.0.1:8090');
process.exit(0);
}
return { connectionString };
}
function handleMessage(message: any): void {
// The payload can be of any type as it is a raw byte array. In this case it's a JSON string.
const payload = new TextDecoder().decode(new Uint8Array(Object.values(message.payload)));
try {
const envelope = JSON.parse(payload);
const messageType = envelope.messageType;
const data = envelope.data;
log('Handling message type: %s...', messageType);
switch (messageType) {
case ORDER_CREATED_TYPE: {
const orderCreated: OrderCreated = JSON.parse(data);
log('Order Created: %o', orderCreated);
break;
}
case ORDER_CONFIRMED_TYPE: {
const orderConfirmed: OrderConfirmed = JSON.parse(data);
log('Order Confirmed: %o', orderConfirmed);
break;
}
case ORDER_REJECTED_TYPE: {
const orderRejected: OrderRejected = JSON.parse(data);
log('Order Rejected: %o', orderRejected);
break;
}
default: {
log('Received unknown message type: %s', messageType);
}
}
} catch (error) {
log('Error handling message: %o', error);
}
}
async function consumeMessages(
client: Client,
stream: Awaited<ReturnType<typeof initSystem>>['stream'],
topic: Awaited<ReturnType<typeof initSystem>>['topic']
) {
const interval = 500; // 500 milliseconds
log(
'Messages will be consumed from stream: %d, topic: %d, partition: %d with interval %d ms.',
stream.id,
topic.id,
topic.partitions[0].id,
interval
);
let offset = 0;
let consumedBatches = 0;
while (consumedBatches < BATCHES_LIMIT) {
try {
log('Polling for messages...');
const polledMessages = await client.message.poll({
streamId: stream.id,
topicId: topic.id,
consumer: Consumer.Single,
partitionId: topic.partitions[0].id,
pollingStrategy: PollingStrategy.Offset(BigInt(offset)),
count: MESSAGES_PER_BATCH,
autocommit: false,
});
if (!polledMessages || polledMessages.messages.length === 0) {
log('No messages available.');
consumedBatches++;
await new Promise(resolve => setTimeout(resolve, interval));
continue;
}
offset += polledMessages.messages.length;
for (const message of polledMessages.messages) {
handleMessage(message);
}
log('Consumed %d message(s).', polledMessages.messages.length);
} catch (error) {
log('Error consuming messages: %o', error);
} finally {
consumedBatches++;
log('Completed poll attempt %d.', consumedBatches);
await new Promise(resolve => setTimeout(resolve, interval));
}
}
log('Consumed %d batches of messages, exiting.', consumedBatches);
}
async function main() {
const args = parseArgs();
log('Using connection string: %s', args.connectionString);
// Parse connection string (simplified parsing for this example)
const url = new URL(args.connectionString.replace('iggy+tcp://', 'http://'));
const host = url.hostname;
const port = parseInt(url.port) || 8090;
const username = url.username || 'iggy';
const password = url.password || 'iggy';
const client = new Client({
transport: 'TCP',
options: {
port,
host,
keepAlive: true,
},
reconnect: {
enabled: true,
interval: 5000,
maxRetries: 5,
},
heartbeatInterval: 5000,
credentials: { username, password },
});
let streamId = null;
let topicId = 0;
try {
log('Message headers consumer has started, selected transport: TCP');
log('Connecting to Iggy server...');
// Client connects automatically when first command is called
log('Connected successfully.');
// Login will be handled automatically by the client on first command
const { stream, topic } = await initSystem(client);
streamId = stream.id;
topicId = topic.id;
await consumeMessages(client, stream, topic);
} catch (error) {
log('Error in main: %o', error);
process.exitCode = 1;
} finally {
if (streamId !== null && topicId !== null) {
await cleanup(client, streamId, topicId);
}
await client.destroy();
log('Disconnected from server.');
}
}
process.on('unhandledRejection', (reason, promise) => {
log('Unhandled Rejection at: %o, reason: %o', promise, reason);
process.exitCode = 1;
});
await main();