blob: 273346cae0866abbb0a33515af15035c89a84501 [file] [log] [blame]
import Pulsar = require('./index');
// Maximum parameters
(async () => {
const authTls: Pulsar.AuthenticationTls = new Pulsar.AuthenticationTls({
certificatePath: '/path/to/cert.pem',
privateKeyPath: '/path/to/key.pem',
});
const authAthenz: Pulsar.AuthenticationAthenz = new Pulsar.AuthenticationAthenz('{}');
const authToken: Pulsar.AuthenticationToken = new Pulsar.AuthenticationToken({
token: 'foobar',
});
const client: Pulsar.Client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
authentication: authToken,
operationTimeoutSeconds: 30,
ioThreads: 4,
messageListenerThreads: 4,
concurrentLookupRequest: 100,
useTls: false,
tlsTrustCertsFilePath: '/path/to/ca-cert.pem',
tlsValidateHostname: false,
tlsAllowInsecureConnection: false,
statsIntervalInSeconds: 60,
log: (level: Pulsar.LogLevel, file: string, line: number, message: string) => {
switch (level) {
case Pulsar.LogLevel.DEBUG:
console.log(`[DEBUG] ${message}`);
break;
case Pulsar.LogLevel.INFO:
console.log(`[INFO] ${message}`);
break;
case Pulsar.LogLevel.WARN:
console.log(`[WARN] ${message}`);
break;
case Pulsar.LogLevel.ERROR:
console.log(`[ERROR] ${message}`);
break;
}
},
});
const producer1: Pulsar.Producer = await client.createProducer({
topic: 'persistent://public/default/my-topic',
producerName: 'producer1',
sendTimeoutMs: 10000,
initialSequenceId: 1,
maxPendingMessages: 100,
maxPendingMessagesAcrossPartitions: 1000,
blockIfQueueFull: false,
messageRoutingMode: 'UseSinglePartition',
hashingScheme: 'Murmur3_32Hash',
compressionType: 'Zlib',
batchingEnabled: false,
batchingMaxPublishDelayMs: 50,
batchingMaxMessages: 100,
properties: {
key1: 'value1',
key2: 'value2',
},
});
const producer2: Pulsar.Producer = await client.createProducer({
topic: 'persistent://public/default/my-topic',
messageRoutingMode: 'RoundRobinDistribution',
hashingScheme: 'BoostHash',
compressionType: 'LZ4',
});
const producer3: Pulsar.Producer = await client.createProducer({
topic: 'persistent://public/default/my-topic',
messageRoutingMode: 'CustomPartition',
hashingScheme: 'JavaStringHash',
compressionType: 'ZSTD',
});
const producer4: Pulsar.Producer = await client.createProducer({
topic: 'persistent://public/default/my-topic',
compressionType: 'SNAPPY',
});
const consumer1: Pulsar.Consumer = await client.subscribe({
topic: 'persistent://public/default/my-topic',
subscription: 'my-sub1',
subscriptionType: 'Exclusive',
subscriptionInitialPosition: 'Latest',
ackTimeoutMs: 3000,
nAckRedeliverTimeoutMs: 5000,
receiverQueueSize: 1000,
receiverQueueSizeAcrossPartitions: 5000,
consumerName: 'consumer1',
properties: {
key1: 'value1',
key2: 'value2',
},
readCompacted: false,
});
const consumer2: Pulsar.Consumer = await client.subscribe({
topic: 'persistent://public/default/my-topic',
subscription: 'my-sub2',
subscriptionType: 'Shared',
subscriptionInitialPosition: 'Earliest',
});
const consumer3: Pulsar.Consumer = await client.subscribe({
topic: 'persistent://public/default/my-topic',
subscription: 'my-sub3',
subscriptionType: 'KeyShared',
listener: (message: Pulsar.Message, consumer: Pulsar.Consumer) => {
},
});
const consumer4: Pulsar.Consumer = await client.subscribe({
topic: 'persistent://public/default/my-topic',
subscription: 'my-sub4',
subscriptionType: 'Failover',
});
const reader1: Pulsar.Reader = await client.createReader({
topic: 'persistent://public/default/my-topic',
startMessageId: Pulsar.MessageId.latest(),
receiverQueueSize: 1000,
readerName: 'reader1',
subscriptionRolePrefix: 'reader-',
readCompacted: false,
});
const reader2: Pulsar.Reader = await client.createReader({
topic: 'persistent://public/default/my-topic',
startMessageId: Pulsar.MessageId.earliest(),
});
const producerName: string = producer1.getProducerName();
const topicName1: string = producer1.getTopic();
const messageId1: Pulsar.MessageId = await producer1.send({
data: Buffer.from('my-message'),
properties: {
key1: 'value1',
key2: 'value2',
},
eventTimestamp: Date.now(),
sequenceId: 10,
partitionKey: 'key1',
replicationClusters: [
'cluster1',
'cluster2',
],
deliverAfter: 30000,
deliverAt: Date.now() + 30000,
});
const messageIdString: string = messageId1.toString();
const messageIdBuffer: Buffer = messageId1.serialize();
const messageId2: Pulsar.MessageId = Pulsar.MessageId.deserialize(messageIdBuffer);
const message1: Pulsar.Message = await consumer1.receive();
const message2: Pulsar.Message = await consumer2.receive(1000);
consumer1.negativeAcknowledge(message1);
consumer1.negativeAcknowledgeId(messageId1);
consumer1.acknowledge(message1);
consumer1.acknowledgeId(messageId1);
consumer1.acknowledgeCumulative(message1);
consumer1.acknowledgeCumulativeId(messageId1);
const topicName2: string = message1.getTopicName();
const properties: { [key: string]: string } = message1.getProperties();
const messageBuffer: Buffer = message1.getData();
const messageId3: Pulsar.MessageId = message1.getMessageId();
const publishTime: number = message1.getPublishTimestamp();
const eventTime: number = message1.getEventTimestamp();
const redeliveryCount: number = message1.getRedeliveryCount();
const partitionKey: string = message1.getPartitionKey();
const message3: Pulsar.Message = await reader1.readNext();
const message4: Pulsar.Message = await reader2.readNext(1000);
const hasNext: boolean = reader1.hasNext();
await producer1.flush();
await producer1.close();
await producer2.close();
await producer3.close();
await producer4.close();
await consumer1.unsubscribe();
await consumer2.close();
await consumer3.close();
await consumer4.close();
await reader1.close();
await reader2.close();
await client.close();
})();
// Minimal parameters
(async () => {
const client: Pulsar.Client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
});
const producer: Pulsar.Producer = await client.createProducer({
topic: 'persistent://public/default/my-topic',
});
const consumer1: Pulsar.Consumer = await client.subscribe({
topic: 'persistent://public/default/my-topic',
subscription: 'my-sub1',
});
const consumer2: Pulsar.Consumer = await client.subscribe({
topics: ['persistent://public/default/my-topic'],
subscription: 'my-sub2',
});
const consumer3: Pulsar.Consumer = await client.subscribe({
topicsPattern: 'persistent://public/default/my-topi[a-z]',
subscription: 'my-sub3',
});
const reader: Pulsar.Reader = await client.createReader({
topic: 'persistent://public/default/my-topic',
startMessageId: Pulsar.MessageId.latest(),
});
const messageId: Pulsar.MessageId = await producer.send({
data: Buffer.from('my-message'),
});
await producer.close();
await consumer1.close();
await consumer2.close();
await consumer3.close();
await reader.close();
await client.close();
})();
// Missing required parameters
(async () => {
// $ExpectError
const client: Pulsar.Client = new Pulsar.Client({
});
// $ExpectError
const producer: Pulsar.Producer = await client.createProducer({
});
// $ExpectError
const consumer: Pulsar.Consumer = await client.subscribe({
});
// $ExpectError
const reader1: Pulsar.Reader = await client.createReader({
topic: 'persistent://public/default/my-topic',
});
// $ExpectError
const reader2: Pulsar.Reader = await client.createReader({
startMessageId: Pulsar.MessageId.latest(),
});
// $ExpectError
const messageId: Pulsar.MessageId = await producer.send({
});
})();