/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

const lodash = require('lodash');
const Pulsar = require('../index.js');

(() => {
  describe('End To End', () => {
    test('Produce/Consume', async () => {
      const client = new Pulsar.Client({
        serviceUrl: 'pulsar://localhost:6650',
        operationTimeoutSeconds: 30,
      });

      const topic = 'persistent://public/default/produce-consume';
      const producer = await client.createProducer({
        topic,
        sendTimeoutMs: 30000,
        batchingEnabled: true,
      });
      expect(producer).not.toBeNull();

      const consumer = await client.subscribe({
        topic,
        subscription: 'sub1',
        ackTimeoutMs: 10000,
      });

      expect(consumer).not.toBeNull();

      const messages = [];
      for (let i = 0; i < 10; i += 1) {
        const msg = `my-message-${i}`;
        producer.send({
          data: Buffer.from(msg),
        });
        messages.push(msg);
      }
      await producer.flush();

      const results = [];
      for (let i = 0; i < 10; i += 1) {
        const msg = await consumer.receive();
        consumer.acknowledge(msg);
        results.push(msg.getData().toString());
      }
      expect(lodash.difference(messages, results)).toEqual([]);

      await producer.close();
      await consumer.close();
      await client.close();
    });

    test('negativeAcknowledge', async () => {
      const client = new Pulsar.Client({
        serviceUrl: 'pulsar://localhost:6650',
        operationTimeoutSeconds: 30,
      });

      const topic = 'persistent://public/default/produce-consume';
      const producer = await client.createProducer({
        topic,
        sendTimeoutMs: 30000,
        batchingEnabled: true,
      });
      expect(producer).not.toBeNull();

      const consumer = await client.subscribe({
        topic,
        subscription: 'sub1',
        ackTimeoutMs: 10000,
        nAckRedeliverTimeoutMs: 1000,
      });

      expect(consumer).not.toBeNull();

      const message = 'my-message';
      producer.send({
        data: Buffer.from(message),
      });
      await producer.flush();

      const results = [];
      const msg = await consumer.receive();
      results.push(msg.getData().toString());
      consumer.negativeAcknowledge(msg);

      const msg2 = await consumer.receive();
      results.push(msg2.getData().toString());
      consumer.acknowledge(msg2);

      await expect(consumer.receive(1000)).rejects.toThrow(
        'Failed to received message TimeOut',
      );

      expect(results).toEqual([message, message]);

      await producer.close();
      await consumer.close();
      await client.close();
    });

    test('getRedeliveryCount', async () => {
      const client = new Pulsar.Client({
        serviceUrl: 'pulsar://localhost:6650',
        operationTimeoutSeconds: 30,
      });

      const topic = 'persistent://public/default/produce-consume';
      const producer = await client.createProducer({
        topic,
        sendTimeoutMs: 30000,
        batchingEnabled: true,
      });
      expect(producer).not.toBeNull();

      const consumer = await client.subscribe({
        topic,
        subscriptionType: 'Shared',
        subscription: 'sub1',
        ackTimeoutMs: 10000,
        nAckRedeliverTimeoutMs: 100,
      });

      expect(consumer).not.toBeNull();

      const message = 'my-message';
      producer.send({
        data: Buffer.from(message),
      });
      await producer.flush();

      let redeliveryCount;
      let msg;
      for (let index = 0; index < 3; index += 1) {
        msg = await consumer.receive();
        redeliveryCount = msg.getRedeliveryCount();
        consumer.negativeAcknowledge(msg);
      }
      expect(redeliveryCount).toBe(2);
      consumer.acknowledge(msg);

      await producer.close();
      await consumer.close();
      await client.close();
    });

    test('Produce/Consume Listener', async () => {
      const client = new Pulsar.Client({
        serviceUrl: 'pulsar://localhost:6650',
        operationTimeoutSeconds: 30,
      });

      const topic = 'persistent://public/default/produce-consume-listener';
      const producer = await client.createProducer({
        topic,
        sendTimeoutMs: 30000,
        batchingEnabled: true,
      });
      expect(producer).not.toBeNull();

      let finish;
      const results = [];
      const finishPromise = new Promise((resolve) => {
        finish = resolve;
      });

      const consumer = await client.subscribe({
        topic,
        subscription: 'sub1',
        ackTimeoutMs: 10000,
        listener: (message, messageConsumer) => {
          const data = message.getData().toString();
          results.push(data);
          messageConsumer.acknowledge(message);
          if (results.length === 10) finish();
        },
      });

      expect(consumer).not.toBeNull();

      const messages = [];
      for (let i = 0; i < 10; i += 1) {
        const msg = `my-message-${i}`;
        producer.send({
          data: Buffer.from(msg),
        });
        messages.push(msg);
      }
      await producer.flush();

      await finishPromise;
      expect(lodash.difference(messages, results)).toEqual([]);

      await producer.close();
      await consumer.close();
      await client.close();
    });

    test('acknowledgeCumulative', async () => {
      const client = new Pulsar.Client({
        serviceUrl: 'pulsar://localhost:6650',
        operationTimeoutSeconds: 30,
      });

      const topic = 'persistent://public/default/acknowledgeCumulative';
      const producer = await client.createProducer({
        topic,
        sendTimeoutMs: 30000,
        batchingEnabled: true,
      });
      expect(producer).not.toBeNull();

      const consumer = await client.subscribe({
        topic,
        subscription: 'sub1',
        ackTimeoutMs: 10000,
      });
      expect(consumer).not.toBeNull();

      const messages = [];
      for (let i = 0; i < 10; i += 1) {
        const msg = `my-message-${i}`;
        producer.send({
          data: Buffer.from(msg),
        });
        messages.push(msg);
      }
      await producer.flush();

      for (let i = 0; i < 10; i += 1) {
        const msg = await consumer.receive();
        if (i === 9) {
          consumer.acknowledgeCumulative(msg);
        }
      }

      await expect(consumer.receive(1000)).rejects.toThrow(
        'Failed to received message TimeOut',
      );

      await producer.close();
      await consumer.close();
      await client.close();
    });

    test('subscriptionInitialPosition', async () => {
      const client = new Pulsar.Client({
        serviceUrl: 'pulsar://localhost:6650',
        operationTimeoutSeconds: 30,
      });

      const topic = 'persistent://public/default/subscriptionInitialPosition';
      const producer = await client.createProducer({
        topic,
        sendTimeoutMs: 30000,
        batchingEnabled: false,
      });
      expect(producer).not.toBeNull();

      const messages = [];
      for (let i = 0; i < 2; i += 1) {
        const msg = `my-message-${i}`;
        producer.send({
          data: Buffer.from(msg),
        });
        messages.push(msg);
      }
      await producer.flush();

      const latestConsumer = await client.subscribe({
        topic,
        subscription: 'latestSub',
        subscriptionInitialPosition: 'Latest',
      });
      expect(latestConsumer).not.toBeNull();

      const earliestConsumer = await client.subscribe({
        topic,
        subscription: 'earliestSub',
        subscriptionInitialPosition: 'Earliest',
      });
      expect(earliestConsumer).not.toBeNull();

      for (let i = 2; i < 4; i += 1) {
        const msg = `my-message-${i}`;
        producer.send({
          data: Buffer.from(msg),
        });
        messages.push(msg);
      }
      await producer.flush();

      const latestResults = [];
      const earliestResults = [];
      for (let i = 0; i < 4; i += 1) {
        if (i < 2) {
          const latestMsg = await latestConsumer.receive(5000);
          latestConsumer.acknowledge(latestMsg);
          latestResults.push(latestMsg.getData().toString());
        }

        const earliestMsg = await earliestConsumer.receive(5000);
        earliestConsumer.acknowledge(earliestMsg);
        earliestResults.push(earliestMsg.getData().toString());
      }
      expect(lodash.difference(messages, latestResults)).toEqual(['my-message-0', 'my-message-1']);
      expect(lodash.difference(messages, earliestResults)).toEqual([]);

      await producer.close();
      await latestConsumer.close();
      await earliestConsumer.close();
      await client.close();
    });

    test('Produce/Read', async () => {
      const client = new Pulsar.Client({
        serviceUrl: 'pulsar://localhost:6650',
        operationTimeoutSeconds: 30,
      });
      expect(client).not.toBeNull();

      const topic = 'persistent://public/default/produce-read';
      const producer = await client.createProducer({
        topic,
        sendTimeoutMs: 30000,
        batchingEnabled: true,
      });
      expect(producer).not.toBeNull();

      const reader = await client.createReader({
        topic,
        startMessageId: Pulsar.MessageId.latest(),
      });
      expect(reader).not.toBeNull();

      const messages = [];
      for (let i = 0; i < 10; i += 1) {
        const msg = `my-message-${i}`;
        producer.send({
          data: Buffer.from(msg),
        });
        messages.push(msg);
      }
      await producer.flush();

      expect(reader.hasNext()).toBe(true);

      const results = [];
      for (let i = 0; i < 10; i += 1) {
        const msg = await reader.readNext();
        results.push(msg.getData().toString());
      }
      expect(lodash.difference(messages, results)).toEqual([]);

      expect(reader.hasNext()).toBe(false);

      await producer.close();
      await reader.close();
      await client.close();
    });

    test('Produce-Delayed/Consume', async () => {
      const client = new Pulsar.Client({
        serviceUrl: 'pulsar://localhost:6650',
        operationTimeoutSeconds: 30,
      });
      expect(client).not.toBeNull();

      const topic = 'persistent://public/default/produce-read-delayed';
      const producer = await client.createProducer({
        topic,
        sendTimeoutMs: 30000,
        batchingEnabled: true,
      });
      expect(producer).not.toBeNull();

      const consumer = await client.subscribe({
        topic,
        subscription: 'sub',
        subscriptionType: 'Shared',
      });
      expect(consumer).not.toBeNull();

      const messages = [];
      const time = (new Date()).getTime();
      for (let i = 0; i < 5; i += 1) {
        const msg = `my-message-${i}`;
        producer.send({
          data: Buffer.from(msg),
          deliverAfter: 3000,
        });
        messages.push(msg);
      }
      for (let i = 5; i < 10; i += 1) {
        const msg = `my-message-${i}`;
        producer.send({
          data: Buffer.from(msg),
          deliverAt: (new Date()).getTime() + 3000,
        });
        messages.push(msg);
      }
      await producer.flush();

      const results = [];
      for (let i = 0; i < 10; i += 1) {
        const msg = await consumer.receive();
        results.push(msg.getData().toString());
        consumer.acknowledge(msg);
      }
      expect(lodash.difference(messages, results)).toEqual([]);
      expect((new Date()).getTime() - time).toBeGreaterThan(3000);

      await producer.close();
      await consumer.close();
      await client.close();
    });

    test('Produce/Consume/Unsubscribe', async () => {
      const client = new Pulsar.Client({
        serviceUrl: 'pulsar://localhost:6650',
        operationTimeoutSeconds: 30,
      });

      const topic = 'persistent://public/default/produce-consume-unsubscribe';
      const producer = await client.createProducer({
        topic,
        sendTimeoutMs: 30000,
        batchingEnabled: true,
      });
      expect(producer).not.toBeNull();

      const consumer = await client.subscribe({
        topic,
        subscription: 'sub1',
        ackTimeoutMs: 10000,
      });

      expect(consumer).not.toBeNull();

      const messages = [];
      for (let i = 0; i < 10; i += 1) {
        const msg = `my-message-${i}`;
        producer.send({
          data: Buffer.from(msg),
        });
        messages.push(msg);
      }
      await producer.flush();

      const results = [];
      for (let i = 0; i < 10; i += 1) {
        const msg = await consumer.receive();
        consumer.acknowledge(msg);
        results.push(msg.getData().toString());
      }
      expect(lodash.difference(messages, results)).toEqual([]);

      await consumer.unsubscribe();
      producer.send({ data: Buffer.from('drop') });
      await producer.flush();

      const consumer2 = await client.subscribe({
        topic,
        subscription: 'sub1',
        ackTimeoutMs: 10000,
      });

      const testData = 'success';
      producer.send({ data: Buffer.from(testData) });
      await producer.flush();

      const msg = await consumer2.receive();
      consumer2.acknowledge(msg);
      expect(msg.getData().toString()).toBe(testData);

      await consumer2.close();
      await producer.close();
      await client.close();
    });

    test('Produce/Read (Compression)', async () => {
      const client = new Pulsar.Client({
        serviceUrl: 'pulsar://localhost:6650',
        operationTimeoutSeconds: 30,
      });
      expect(client).not.toBeNull();

      const topic = 'persistent://public/default/produce-read-compression';
      const producer = await client.createProducer({
        topic,
        sendTimeoutMs: 30000,
        batchingEnabled: true,
        compressionType: 'ZSTD',
      });
      expect(producer).not.toBeNull();

      const reader = await client.createReader({
        topic,
        startMessageId: Pulsar.MessageId.latest(),
      });
      expect(reader).not.toBeNull();

      const messages = [];
      for (let i = 0; i < 10; i += 1) {
        const msg = `my-message-${i}`;
        producer.send({
          data: Buffer.from(msg),
        });
        messages.push(msg);
      }
      await producer.flush();

      expect(reader.hasNext()).toBe(true);

      const results = [];
      for (let i = 0; i < 10; i += 1) {
        const msg = await reader.readNext();
        results.push(msg.getData().toString());
      }
      expect(lodash.difference(messages, results)).toEqual([]);

      expect(reader.hasNext()).toBe(false);

      await producer.close();
      await reader.close();
      await client.close();
    });

    test('Produce/Consume-Pattern', async () => {
      const client = new Pulsar.Client({
        serviceUrl: 'pulsar://localhost:6650',
        operationTimeoutSeconds: 30,
      });
      expect(client).not.toBeNull();

      const topic1 = 'persistent://public/default/produce-abcdef';
      const topic2 = 'persistent://public/default/produce-abczef';
      const topicsPattern = 'persistent://public/default/produce-abc[a-z]ef';
      const producer1 = await client.createProducer({
        topic: topic1,
        sendTimeoutMs: 30000,
        batchingEnabled: true,
      });
      expect(producer1).not.toBeNull();
      const producer2 = await client.createProducer({
        topic: topic2,
        sendTimeoutMs: 30000,
        batchingEnabled: true,
      });
      expect(producer2).not.toBeNull();

      const consumer = await client.subscribe({
        topicsPattern,
        subscription: 'sub',
        subscriptionType: 'Shared',
      });
      expect(consumer).not.toBeNull();

      const messages = [];
      for (let i = 0; i < 5; i += 1) {
        const msg = `my-message-${i}`;
        producer1.send({
          data: Buffer.from(msg),
        });
        messages.push(msg);
      }
      await producer1.flush();
      for (let i = 5; i < 10; i += 1) {
        const msg = `my-message-${i}`;
        producer2.send({
          data: Buffer.from(msg),
        });
        messages.push(msg);
      }
      await producer2.flush();

      const results = [];
      for (let i = 0; i < 10; i += 1) {
        const msg = await consumer.receive();
        results.push(msg.getData().toString());
        consumer.acknowledge(msg);
      }
      expect(lodash.difference(messages, results)).toEqual([]);

      await producer1.close();
      await producer2.close();
      await consumer.close();
      await client.close();
    });

    test('Produce/Consume-Multi-Topic', async () => {
      const client = new Pulsar.Client({
        serviceUrl: 'pulsar://localhost:6650',
        operationTimeoutSeconds: 30,
      });
      expect(client).not.toBeNull();

      const topic1 = 'persistent://public/default/produce-mtopic-1';
      const topic2 = 'persistent://public/default/produce-mtopic-2';
      const producer1 = await client.createProducer({
        topic: topic1,
        sendTimeoutMs: 30000,
        batchingEnabled: true,
      });
      expect(producer1).not.toBeNull();
      const producer2 = await client.createProducer({
        topic: topic2,
        sendTimeoutMs: 30000,
        batchingEnabled: true,
      });
      expect(producer2).not.toBeNull();

      const consumer = await client.subscribe({
        topics: [topic1, topic2],
        subscription: 'sub',
        subscriptionType: 'Shared',
      });
      expect(consumer).not.toBeNull();

      const messages = [];
      for (let i = 0; i < 5; i += 1) {
        const msg = `my-message-${i}`;
        producer1.send({
          data: Buffer.from(msg),
        });
        messages.push(msg);
      }
      await producer1.flush();
      for (let i = 5; i < 10; i += 1) {
        const msg = `my-message-${i}`;
        producer2.send({
          data: Buffer.from(msg),
        });
        messages.push(msg);
      }
      await producer2.flush();

      const results = [];
      for (let i = 0; i < 10; i += 1) {
        const msg = await consumer.receive();
        results.push(msg.getData().toString());
        consumer.acknowledge(msg);
      }
      expect(lodash.difference(messages, results)).toEqual([]);

      await producer1.close();
      await producer2.close();
      await consumer.close();
      await client.close();
    });
  });
})();
