// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package pulsar

import (
	"context"
	"fmt"
	"log"
	"testing"
	"time"

	"github.com/docker/docker/api/types/container"
	"github.com/docker/go-connections/nat"
	"github.com/stretchr/testify/require"
	"github.com/testcontainers/testcontainers-go"
	"github.com/testcontainers/testcontainers-go/wait"

	"github.com/apache/pulsar-client-go/pulsar/internal"
	"github.com/apache/pulsar-client-go/pulsaradmin"
	"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
	"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
	"github.com/stretchr/testify/assert"
)

func TestRetryEnableZeroQueueConsumer(t *testing.T) {
	client, err := NewClient(ClientOptions{
		URL: lookupURL,
	})

	assert.Nil(t, err)
	defer client.Close()

	topic := newTopicName()

	// create consumer
	_, err = client.Subscribe(ConsumerOptions{
		Topic:                   topic,
		SubscriptionName:        "my-sub",
		RetryEnable:             true,
		EnableZeroQueueConsumer: true,
	})
	assert.ErrorContains(t, err, "ZeroQueueConsumer is not supported with RetryEnable")
}

func TestNormalZeroQueueConsumer(t *testing.T) {
	client, err := NewClient(ClientOptions{
		URL: lookupURL,
	})

	assert.Nil(t, err)
	defer client.Close()

	topic := newTopicName()
	ctx := context.Background()

	// create consumer
	consumer, err := client.Subscribe(ConsumerOptions{
		Topic:                   topic,
		SubscriptionName:        "my-sub",
		EnableZeroQueueConsumer: true,
	})
	assert.Nil(t, err)
	_, ok := consumer.(*zeroQueueConsumer)
	assert.True(t, ok)
	defer consumer.Close()

	// create producer
	producer, err := client.CreateProducer(ProducerOptions{
		Topic:           topic,
		DisableBatching: false,
	})
	assert.Nil(t, err)
	defer producer.Close()

	// send 10 messages
	for i := 0; i < 10; i++ {
		msg, err := producer.Send(ctx, &ProducerMessage{
			Payload: []byte(fmt.Sprintf("hello-%d", i)),
			Key:     "pulsar",
			Properties: map[string]string{
				"key-1": "pulsar-1",
			},
		})
		assert.Nil(t, err)
		log.Printf("send message: %s", msg.String())
	}

	// receive 10 messages
	for i := 0; i < 10; i++ {
		msg, err := consumer.Receive(context.Background())
		if err != nil {
			log.Fatal(err)
		}

		expectMsg := fmt.Sprintf("hello-%d", i)
		expectProperties := map[string]string{
			"key-1": "pulsar-1",
		}
		assert.Equal(t, []byte(expectMsg), msg.Payload())
		assert.Equal(t, "pulsar", msg.Key())
		assert.Equal(t, expectProperties, msg.Properties())
		// ack message
		err = consumer.Ack(msg)
		assert.Nil(t, err)
		log.Printf("receive message: %s", msg.ID().String())
	}
	err = consumer.Unsubscribe()
	assert.Nil(t, err)
}
func TestReconnectConsumer(t *testing.T) {

	req := testcontainers.ContainerRequest{
		Name:         "pulsar-test",
		Image:        getPulsarTestImage(),
		ExposedPorts: []string{"6650/tcp", "8080/tcp"},
		WaitingFor:   wait.ForExposedPort(),
		HostConfigModifier: func(config *container.HostConfig) {
			config.PortBindings = map[nat.Port][]nat.PortBinding{
				"6650/tcp": {{HostIP: "0.0.0.0", HostPort: "6659"}},
				"8080/tcp": {{HostIP: "0.0.0.0", HostPort: "8089"}},
			}
		},
		Cmd: []string{"bin/pulsar", "standalone", "-nfw"},
	}
	c, err := testcontainers.GenericContainer(context.Background(), testcontainers.GenericContainerRequest{
		ContainerRequest: req,
		Started:          true,
		Reuse:            true,
	})
	require.NoError(t, err, "Failed to start the pulsar container")
	endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
	require.NoError(t, err, "Failed to get the pulsar endpoint")

	client, err := NewClient(ClientOptions{
		URL: endpoint,
	})
	assert.Nil(t, err)
	adminEndpoint, err := c.PortEndpoint(context.Background(), "8080", "http")
	assert.Nil(t, err)
	admin, err := pulsaradmin.NewClient(&config.Config{
		WebServiceURL: adminEndpoint,
	})
	assert.Nil(t, err)

	assert.Nil(t, err)
	defer client.Close()

	topic := newTopicName()
	ctx := context.Background()
	var consumer Consumer
	require.Eventually(t, func() bool {
		consumer, err = client.Subscribe(ConsumerOptions{
			Topic:                   topic,
			SubscriptionName:        "my-sub",
			EnableZeroQueueConsumer: true,
		})
		return err == nil
	}, 30*time.Second, 1*time.Second)

	assert.Nil(t, err)
	_, ok := consumer.(*zeroQueueConsumer)
	assert.True(t, ok)
	defer consumer.Close()

	// create producer
	producer, err := client.CreateProducer(ProducerOptions{
		Topic:           topic,
		DisableBatching: false,
	})
	assert.Nil(t, err)

	// send 10 messages
	for i := 0; i < 10; i++ {
		msg, err := producer.Send(ctx, &ProducerMessage{
			Payload: []byte(fmt.Sprintf("hello-%d", i)),
			Key:     "pulsar",
			Properties: map[string]string{
				"key-1": "pulsar-1",
			},
		})
		assert.Nil(t, err)
		log.Printf("send message: %s", msg.String())
	}

	ch := make(chan struct{})

	go func() {
		time.Sleep(3 * time.Second)
		log.Println("unloading topic")
		// Simulate a broker restart by stopping the pulsar container
		topicName, err := utils.GetTopicName(topic)
		assert.Nil(t, err)
		err = admin.Topics().Unload(*topicName)
		assert.Nil(t, err)
		log.Println("unloaded topic")
		ch <- struct{}{}
	}()

	// receive 10 messages
	for i := 0; i < 10; i++ {
		if i == 3 {
			<-ch
		}
		msg, err := consumer.Receive(context.Background())
		if err != nil {
			log.Fatal(err)
		}

		expectMsg := fmt.Sprintf("hello-%d", i)
		expectProperties := map[string]string{
			"key-1": "pulsar-1",
		}
		assert.Equal(t, []byte(expectMsg), msg.Payload())
		assert.Equal(t, "pulsar", msg.Key())
		assert.Equal(t, expectProperties, msg.Properties())
		// ack message
		err = consumer.Ack(msg)
		assert.Nil(t, err)
		log.Printf("receive message: %s", msg.ID().String())
	}
	err = consumer.Unsubscribe()
	assert.Nil(t, err)
	consumer.Close()
	producer.Close()
	defer c.Terminate(ctx)
}

func TestMultipleConsumer(t *testing.T) {
	client, err := NewClient(ClientOptions{
		URL: lookupURL,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer client.Close()

	topic := newTopicName()
	ctx := context.Background()

	// create consumer1
	consumer1, err := client.Subscribe(ConsumerOptions{
		Topic:                   topic,
		SubscriptionName:        "my-sub",
		Type:                    Shared,
		EnableZeroQueueConsumer: true,
	})
	assert.Nil(t, err)
	_, ok := consumer1.(*zeroQueueConsumer)
	assert.True(t, ok)
	defer consumer1.Close()

	// create consumer2
	consumer2, err := client.Subscribe(ConsumerOptions{
		Topic:                   topic,
		SubscriptionName:        "my-sub",
		Type:                    Shared,
		EnableZeroQueueConsumer: true,
	})
	assert.Nil(t, err)
	_, ok = consumer2.(*zeroQueueConsumer)
	assert.True(t, ok)
	defer consumer2.Close()

	// create producer
	producer, err := client.CreateProducer(ProducerOptions{
		Topic:           topic,
		DisableBatching: true,
	})
	assert.Nil(t, err)
	defer producer.Close()

	sendNum := 10
	// send 10 messages
	for i := 0; i < sendNum; i++ {
		msg, err := producer.Send(ctx, &ProducerMessage{
			Payload: []byte(fmt.Sprintf("hello-%d", i)),
			Key:     "pulsar",
			Properties: map[string]string{
				"key-1": "pulsar-1",
			},
		})
		assert.Nil(t, err)
		log.Printf("send message: %s", msg.String())
	}

	// receive messages
	for i := 0; i < sendNum/2; i++ {
		msg, err := consumer1.Receive(context.Background())
		if err != nil {
			log.Fatal(err)
		}
		log.Printf("consumer1 receive message: %s %s", msg.ID().String(), msg.Payload())
		// ack message
		consumer1.Ack(msg)
	}

	// receive messages
	for i := 0; i < sendNum/2; i++ {
		msg, err := consumer2.Receive(context.Background())
		if err != nil {
			log.Fatal(err)
		}
		log.Printf("consumer2 receive message: %s %s", msg.ID().String(), msg.Payload())
		// ack message
		consumer2.Ack(msg)
	}

}

func TestPartitionZeroQueueConsumer(t *testing.T) {
	client, err := NewClient(ClientOptions{
		URL: lookupURL,
	})

	assert.Nil(t, err)
	defer client.Close()

	topic := newTopicName()
	err = createPartitionedTopic(topic, 2)
	assert.Nil(t, err)

	// create consumer
	consumer, err := client.Subscribe(ConsumerOptions{
		Topic:                   topic,
		SubscriptionName:        "my-sub",
		EnableZeroQueueConsumer: true,
	})
	assert.Nil(t, consumer)
	assert.Error(t, err, "ZeroQueueConsumer is not supported for partitioned topics")
}
func TestSpecifiedPartitionZeroQueueConsumer(t *testing.T) {
	client, err := NewClient(ClientOptions{
		URL: lookupURL,
	})

	assert.Nil(t, err)
	defer client.Close()

	topic := newTopicName()
	ctx := context.Background()
	err = createPartitionedTopic(topic, 2)
	assert.Nil(t, err)
	topics, err := client.TopicPartitions(topic)
	assert.Nil(t, err)

	// create consumer
	consumer, err := client.Subscribe(ConsumerOptions{
		Topic:                   topics[1],
		SubscriptionName:        "my-sub",
		EnableZeroQueueConsumer: true,
	})
	assert.Nil(t, err)
	_, ok := consumer.(*zeroQueueConsumer)
	assert.True(t, ok)
	defer consumer.Close()

	// create producer
	producer, err := client.CreateProducer(ProducerOptions{
		Topic:           topics[1],
		DisableBatching: false,
	})
	assert.Nil(t, err)
	defer producer.Close()

	// send 10 messages
	for i := 0; i < 10; i++ {
		msg, err := producer.Send(ctx, &ProducerMessage{
			Payload: []byte(fmt.Sprintf("hello-%d", i)),
			Key:     "pulsar",
			Properties: map[string]string{
				"key-1": "pulsar-1",
			},
		})
		assert.Nil(t, err)
		log.Printf("send message: %s", msg.String())
	}

	// receive 10 messages
	for i := 0; i < 10; i++ {
		msg, err := consumer.Receive(context.Background())
		if err != nil {
			log.Fatal(err)
		}

		expectMsg := fmt.Sprintf("hello-%d", i)
		expectProperties := map[string]string{
			"key-1": "pulsar-1",
		}
		assert.Equal(t, []byte(expectMsg), msg.Payload())
		assert.Equal(t, "pulsar", msg.Key())
		assert.Equal(t, expectProperties, msg.Properties())
		// ack message
		err = consumer.Ack(msg)
		assert.Nil(t, err)
		log.Printf("receive message: %s", msg.ID().String())
	}
	err = consumer.Unsubscribe()
	assert.Nil(t, err)
}

func TestZeroQueueConsumerGetLastMessageIDs(t *testing.T) {
	client, err := NewClient(ClientOptions{
		URL: lookupURL,
	})

	assert.Nil(t, err)
	defer client.Close()

	partition := 1
	topic := newTopicName()
	// create consumer
	consumer, err := client.Subscribe(ConsumerOptions{
		Topic:                   topic,
		SubscriptionName:        "my-sub",
		Type:                    Shared,
		EnableZeroQueueConsumer: true,
	})
	_, ok := consumer.(*zeroQueueConsumer)
	assert.True(t, ok)
	assert.Nil(t, err)
	defer consumer.Close()

	// create producer
	producer, err := client.CreateProducer(ProducerOptions{
		Topic:           topic,
		DisableBatching: true,
	})
	assert.Nil(t, err)
	defer producer.Close()

	ctx := context.Background()
	// send messages
	totalMessage := 10
	for i := 0; i < totalMessage; i++ {
		if _, err := producer.Send(ctx, &ProducerMessage{
			Payload: []byte(fmt.Sprintf("hello-%d", i)),
		}); err != nil {
			assert.Nil(t, err)
		}
	}

	// create admin
	admin, err := pulsaradmin.NewClient(&config.Config{})
	assert.Nil(t, err)

	messageIDs, err := consumer.GetLastMessageIDs()
	assert.Nil(t, err)
	assert.Equal(t, partition, len(messageIDs))

	id := messageIDs[0]
	topicName, err := utils.GetTopicName(id.Topic())
	assert.Nil(t, err)
	messages, err := admin.Subscriptions().GetMessagesByID(*topicName, id.LedgerID(), id.EntryID())
	assert.Nil(t, err)
	assert.Equal(t, 1, len(messages))
	err = consumer.UnsubscribeForce()
	assert.Nil(t, err)

}

func TestZeroQueueConsumer_Chan(t *testing.T) {
	client, err := NewClient(ClientOptions{
		URL: lookupURL,
	})

	assert.Nil(t, err)
	defer client.Close()

	topic := newTopicName()
	ctx := context.Background()

	// create consumer
	consumer, err := client.Subscribe(ConsumerOptions{
		Topic:                   topic,
		SubscriptionName:        "my-sub",
		EnableZeroQueueConsumer: true,
	})
	assert.Nil(t, err)
	_, ok := consumer.(*zeroQueueConsumer)
	assert.True(t, ok)
	defer consumer.Close()

	// create producer
	producer, err := client.CreateProducer(ProducerOptions{
		Topic:           topic,
		DisableBatching: false,
	})
	assert.Nil(t, err)
	defer producer.Close()

	// send 10 messages
	for i := 0; i < 10; i++ {
		msg, err := producer.Send(ctx, &ProducerMessage{
			Payload: []byte(fmt.Sprintf("hello-%d", i)),
		})
		assert.Nil(t, err)
		log.Printf("send message: %s", msg.String())
	}
	assertPanic(t, "zeroQueueConsumer cannot support Chan method", func() {
		consumer.Chan()
	})
}

func assertPanic(t *testing.T, panicValue interface{}, f func()) {
	defer func() {
		if r := recover(); r != panicValue {
			t.Errorf("Expected panic %v, but got %v", panicValue, r)
		}
	}()
	f()
}

func TestZeroQueueConsumer_AckCumulativeConsumer(t *testing.T) {
	client, err := NewClient(ClientOptions{
		URL: lookupURL,
	})

	assert.Nil(t, err)
	defer client.Close()

	topic := newTopicName()
	ctx := context.Background()

	// create consumer
	consumer, err := client.Subscribe(ConsumerOptions{
		Topic:                   topic,
		SubscriptionName:        "my-sub",
		EnableZeroQueueConsumer: true,
	})
	assert.Nil(t, err)
	_, ok := consumer.(*zeroQueueConsumer)
	assert.True(t, ok)
	defer consumer.Close()

	// create producer
	producer, err := client.CreateProducer(ProducerOptions{
		Topic:           topic,
		DisableBatching: false,
	})
	assert.Nil(t, err)
	defer producer.Close()

	// send 10 messages
	for i := 0; i < 10; i++ {
		msg, err := producer.Send(ctx, &ProducerMessage{
			Payload: []byte(fmt.Sprintf("hello-%d", i)),
		})
		assert.Nil(t, err)
		log.Printf("send message: %s", msg.String())
	}

	var lastID MessageID
	// receive 10 messages
	for i := 0; i < 10; i++ {
		msg, err := consumer.Receive(context.Background())
		lastID = msg.ID()
		if err != nil {
			log.Fatal(err)
		}
		expectMsg := fmt.Sprintf("hello-%d", i)
		assert.Equal(t, []byte(expectMsg), msg.Payload())
	}
	err = consumer.AckIDCumulative(lastID)
	assert.Nil(t, err)

}

func TestZeroQueueConsumer_Nack(t *testing.T) {
	client, err := NewClient(ClientOptions{
		URL: lookupURL,
	})

	assert.Nil(t, err)
	defer client.Close()

	topicName := newTopicName()
	ctx := context.Background()

	producer, err := client.CreateProducer(ProducerOptions{
		Topic: topicName,
	})
	assert.Nil(t, err)
	defer producer.Close()

	consumer, err := client.Subscribe(ConsumerOptions{
		Topic:                   topicName,
		SubscriptionName:        "sub-1",
		Type:                    Shared,
		NackRedeliveryDelay:     1 * time.Second,
		EnableZeroQueueConsumer: true,
	})
	assert.Nil(t, err)
	_, ok := consumer.(*zeroQueueConsumer)
	assert.True(t, ok)
	defer consumer.Close()

	const N = 100

	for i := 0; i < N; i++ {
		if _, err := producer.Send(ctx, &ProducerMessage{
			Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
		}); err != nil {
			t.Fatal(err)
		}
	}

	for i := 0; i < N; i++ {
		msg, err := consumer.Receive(ctx)
		assert.Nil(t, err)
		assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload()))

		if i%2 == 0 {
			// Only acks even messages
			err = consumer.Ack(msg)
			assert.Nil(t, err)
		} else {
			// Fails to process odd messages
			consumer.Nack(msg)
		}
	}

	// Failed messages should be resent

	// We should only receive the odd messages
	for i := 1; i < N; i += 2 {
		msg, err := consumer.Receive(ctx)
		assert.Nil(t, err)
		assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload()))

		err = consumer.Ack(msg)
		assert.Nil(t, err)
	}
}

func TestZeroQueueConsumer_Seek(t *testing.T) {
	client, err := NewClient(ClientOptions{
		URL: lookupURL,
	})
	assert.Nil(t, err)
	defer client.Close()

	topicName := newTopicName()
	ctx := context.Background()

	producer, err := client.CreateProducer(ProducerOptions{
		Topic:           topicName,
		DisableBatching: false,
	})
	assert.Nil(t, err)
	defer producer.Close()

	consumer, err := client.Subscribe(ConsumerOptions{
		Topic:                   topicName,
		EnableZeroQueueConsumer: true,
		SubscriptionName:        "sub-1",
		StartMessageIDInclusive: true,
	})
	assert.Nil(t, err)
	_, ok := consumer.(*zeroQueueConsumer)
	assert.True(t, ok)
	defer consumer.Close()

	const N = 10
	var seekID MessageID
	for i := 0; i < N; i++ {
		id, err := producer.Send(ctx, &ProducerMessage{
			Payload: []byte(fmt.Sprintf("hello-%d", i)),
		})
		assert.Nil(t, err)

		if i == N-5 {
			seekID = id
		}
	}

	// Don't consume all messages so some stay in queues
	for i := 0; i < N; i++ {
		msg, err := consumer.Receive(ctx)
		assert.Nil(t, err)
		assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
		err = consumer.Ack(msg)
		assert.Nil(t, err)
	}

	err = consumer.Seek(seekID)
	assert.Nil(t, err)

	time.Sleep(time.Second * 3)
	msg, err := consumer.Receive(ctx)
	assert.Nil(t, err)
	log.Printf("msg: %s", string(msg.Payload()))
	assert.Equal(t, fmt.Sprintf("hello-%d", N-5), string(msg.Payload()))
}

func TestZeroQueueConsumer_SeekByTime(t *testing.T) {
	client, err := NewClient(ClientOptions{
		URL: lookupURL,
	})
	assert.Nil(t, err)
	defer client.Close()

	topicName := newTopicName()
	ctx := context.Background()

	producer, err := client.CreateProducer(ProducerOptions{
		Topic:           topicName,
		DisableBatching: false,
	})
	assert.Nil(t, err)
	defer producer.Close()

	consumer, err := client.Subscribe(ConsumerOptions{
		Topic:                   topicName,
		EnableZeroQueueConsumer: true,
		SubscriptionName:        "my-sub",
	})
	assert.Nil(t, err)
	_, ok := consumer.(*zeroQueueConsumer)
	assert.True(t, ok)
	defer consumer.Close()

	const N = 10
	resetTimeStr := "100s"
	retentionTimeInSecond, err := internal.ParseRelativeTimeInSeconds(resetTimeStr)
	assert.Nil(t, err)

	for i := 0; i < N; i++ {
		_, err := producer.Send(ctx, &ProducerMessage{
			Payload: []byte(fmt.Sprintf("hello-%d", i)),
		})
		assert.Nil(t, err)
	}

	// Don't consume all messages so some stay in queues
	for i := 0; i < N-2; i++ {
		msg, err := consumer.Receive(ctx)
		assert.Nil(t, err)
		assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
		err = consumer.Ack(msg)
		assert.Nil(t, err)
	}

	currentTimestamp := time.Now()
	err = consumer.SeekByTime(currentTimestamp.Add(-retentionTimeInSecond))
	assert.Nil(t, err)

	time.Sleep(time.Second * 3)

	for i := 0; i < N; i++ {
		msg, err := consumer.Receive(ctx)
		assert.Nil(t, err)
		assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
		err = consumer.Ack(msg)
		assert.Nil(t, err)
	}
}
