blob: bb28c5297396d62003aab8d4cfbd7530974d9d86 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import assert from 'node:assert/strict';
import { type ClientConfig, SingleClient } from './client/index.js';
import { groupConsumerStream } from './stream/consumer-stream.js';
import { PollingStrategy, type PollMessagesResponse } from './wire/index.js';
const streamName = 'debug-snd-stream';
const topicName = 'debug-snd-topic';
const groupName = 'debug-snd-group';
const stream = {
name: streamName
};
const topic = {
streamId: streamName,
name: topicName,
partitionCount: 1,
compressionAlgorithm: 1
};
const opt: ClientConfig = {
transport: 'TCP' as const,
options: {
port: 8090,
host: '127.0.0.1' ,
// allowHalfOpen: true,
// keepAlive: true
},
credentials: { username: 'iggy', password: 'iggy' },
reconnect: {
enabled: true,
interval: 10 * 1000,
maxRetries: 10
}
};
const c = new SingleClient(opt);
const cleanup = async () => {
assert.ok(await c.stream.delete({ streamId: streamName }));
assert.ok(await c.session.logout());
c.destroy();
}
const msg = {
streamId: streamName,
topicId: topicName,
messages: [
{ payload: 'yolo msg 2' },
{ payload: 'yolo msg v4' },
{ payload: 'yolo msg v7' },
],
};
try {
await c.stream.create(stream);
console.log('server stream CREATED::', { streamName });
await c.topic.create(topic);
console.log('server topic CREATED::', { topicName });
// send
setInterval(async () => {
assert.ok(await c.message.send(msg));
console.log('message SEND::', { msg })
}, 3000);
// POLL MESSAGE
const pollReq = {
groupName,
streamId: streamName,
topicId: topicName,
pollingStrategy: PollingStrategy.Next,
count: 1,
interval: 5000
};
const cs = await groupConsumerStream(opt)(pollReq);
const dataCb = (d: PollMessagesResponse) => {
console.log('cli/DATA POLLED:', d);
// recv += d.count;
// if (recv === ct)
// str.destroy();
};
cs.on('data', dataCb);
cs.on('error', (err) => {
console.error('cli/=>>Stream ERROR:: // DESTROY ', err)
cs.destroy(err);
});
cs.on('end', async () => {
console.log('cli/=>>Stream END::')
cs.destroy();
await cleanup();
});
cs.on('close', async () => {
console.log('cli/=>>Stream CLOSE::')
// await cleanup();
});
} catch (err) {
console.error('client catchALL END', err)
await cleanup();
}
// process.on('SIGINT', async () => {
// await cleanup();
// console.log('cleanup OK, exiting...');
// process.exit()
// });