blob: 9ea7d32af3b6ef1dae8fa6050c7985d691af80ef [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import assert from 'node:assert/strict';
import { Client } from './client/index.js';
import { uuidv7, uuidv4 } from 'uuidv7'
import { groupConsumerStream } from './stream/consumer-stream.js';
import { PollingStrategy, type PollMessagesResponse } from './wire/index.js';
const streamName = 'debug-stream';
const topicName = 'debug-topic';
const groupName = 'debug-group';
const stream = {
name: 'debug-send-message-stream'
};
const topic = {
streamId: streamName,
name: topicName,
partitionCount: 1,
compressionAlgorithm: 1
};
const opt = {
transport: 'TCP' as const,
options: { port: 8090, host: '127.0.0.1', allowHalfOpen: true, keepAlive: true },
credentials: { username: 'iggy', password: 'iggy' },
reconnect: {
enabled: true,
interval: 10 * 1000,
maxRetries: 10
},
heartbeatInterval: 5 * 1000
};
const c = new Client(opt);
const cleanup = async () => {
assert.ok(await c.stream.delete({ streamId: streamName }));
assert.ok(await c.session.logout());
c.destroy();
}
const msg = {
streamId: streamName,
topicId: topicName,
messages: [
{ payload: 'yolo msg 2' },
{ id: 0 as const, payload: 'nooooooooo' },
{ id: 0n as const, payload: 'aye' },
{ id: uuidv4(), payload: 'yolo msg v4' },
{ id: uuidv7(), payload: 'yolo msg v7' },
],
};
try {
await c.stream.create(stream);
console.log('server stream CREATED::', { streamName });
await c.topic.create(topic);
console.log('server topic CREATED::', { topicName });
// send
assert.ok(await c.message.send(msg));
console.log('message SEND::', { msg })
// POLL MESSAGE
const pollReq = {
groupName,
streamId: streamName,
topicId: topicName,
pollingStrategy: PollingStrategy.Next,
count: 5,
interval: 5000
};
console.log('poll req', pollReq);
const cs = await groupConsumerStream(opt)(pollReq);
const dataCb = (d: PollMessagesResponse) => {
console.log('cli/DATA POLLED:', d);
};
cs.on('data', dataCb);
cs.on('error', (err) => {
console.error('cli/=>>Stream ERROR:: // DESTROY ', err)
cs.destroy(err);
});
cs.on('end', async () => {
console.log('cli/=>>Stream END::')
cs.destroy();
await cleanup();
});
cs.on('close', async () => {
console.log('cli/=>>Stream CLOSE::')
// await cleanup();
});
} catch (err) {
console.log('catch cleanup', err);
await cleanup();
}