// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package query

import (
	"context"
	"os"
	"path"
	"testing"
	"time"

	googleUUID "github.com/google/uuid"
	"github.com/stretchr/testify/require"

	"github.com/apache/skywalking-banyandb/api/common"
	"github.com/apache/skywalking-banyandb/api/event"
	v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
	"github.com/apache/skywalking-banyandb/banyand/discovery"
	"github.com/apache/skywalking-banyandb/banyand/series"
	"github.com/apache/skywalking-banyandb/banyand/series/trace"
	"github.com/apache/skywalking-banyandb/banyand/storage"
	"github.com/apache/skywalking-banyandb/pkg/bus"
	"github.com/apache/skywalking-banyandb/pkg/logger"
	"github.com/apache/skywalking-banyandb/pkg/pb"
)

var interval = time.Millisecond * 500

type entityValue struct {
	seriesID   string
	entityID   string
	dataBinary []byte
	ts         time.Time
	items      []interface{}
}

func setupServices(tester *require.Assertions) (discovery.ServiceRepo, series.Service, func()) {
	// Bootstrap logger system
	tester.NoError(logger.Init(logger.Logging{
		Env:   "dev",
		Level: "warn",
	}))

	// Init `Discovery` module
	repo, err := discovery.NewServiceRepo(context.Background())
	tester.NoError(err)
	tester.NotNil(repo)

	// Init `Database` module
	db, err := storage.NewDB(context.TODO(), repo)
	tester.NoError(err)
	uuid, err := googleUUID.NewUUID()
	tester.NoError(err)
	rootPath := path.Join(os.TempDir(), "banyandb-"+uuid.String())
	tester.NoError(db.FlagSet().Parse([]string{"--root-path=" + rootPath}))

	// Init `Trace` module
	traceSvc, err := trace.NewService(context.TODO(), db, repo)
	tester.NoError(err)

	// Init `Query` module
	executor, err := NewExecutor(context.TODO(), repo, nil, traceSvc, traceSvc)
	tester.NoError(err)

	// :PreRun:
	// 1) TraceSeries,
	// 2) Database
	err = traceSvc.PreRun()
	tester.NoError(err)

	err = db.PreRun()
	tester.NoError(err)

	err = executor.PreRun()
	tester.NoError(err)

	return repo, traceSvc, func() {
		db.GracefulStop()
		_ = os.RemoveAll(rootPath)
	}
}

func setupData(tester *require.Assertions, baseTs time.Time, svc series.Service) {
	metadata := common.Metadata{
		Spec: &v1.Metadata{
			Name:  "sw",
			Group: "default",
		},
	}

	entityValues := []entityValue{
		{
			ts:         baseTs,
			seriesID:   "webapp_10.0.0.1",
			entityID:   "1",
			dataBinary: []byte{11},
			items: []interface{}{
				"trace_id-xxfff.111323",
				0,
				"webapp_id",
				"10.0.0.1_id",
				"/home_id",
				"webapp",
				"10.0.0.1",
				"/home",
				300,
				1622933202000000000,
			},
		},
		{
			ts:         baseTs.Add(interval),
			seriesID:   "gateway_10.0.0.2",
			entityID:   "2",
			dataBinary: []byte{12},
			items: []interface{}{
				"trace_id-xxfff.111323a",
				1,
			},
		},
		{
			ts:         baseTs.Add(interval * 2),
			seriesID:   "httpserver_10.0.0.3",
			entityID:   "3",
			dataBinary: []byte{13},
			items: []interface{}{
				"trace_id-xxfff.111323",
				1,
				"httpserver_id",
				"10.0.0.3_id",
				"/home_id",
				"httpserver",
				"10.0.0.3",
				"/home",
				300,
				1622933202000000000,
				"GET",
				"200",
			},
		},
		{
			ts:         baseTs.Add(interval * 3),
			seriesID:   "database_10.0.0.4",
			entityID:   "4",
			dataBinary: []byte{14},
			items: []interface{}{
				"trace_id-xxfff.111323",
				0,
				"database_id",
				"10.0.0.4_id",
				"/home_id",
				"database",
				"10.0.0.4",
				"/home",
				300,
				1622933202000000000,
				nil,
				nil,
				"MySQL",
				"10.1.1.4",
			},
		},
		{
			ts:         baseTs.Add(interval * 4),
			seriesID:   "mq_10.0.0.5",
			entityID:   "5",
			dataBinary: []byte{15},
			items: []interface{}{
				"trace_id-zzpp.111323",
				0,
				"mq_id",
				"10.0.0.5_id",
				"/home_id",
				"mq",
				"10.0.0.5",
				"/home",
				300,
				1622933202000000000,
				nil,
				nil,
				nil,
				nil,
				"test_topic",
				"10.0.0.5",
			},
		},
		{
			ts:         baseTs.Add(interval * 5),
			seriesID:   "database_10.0.0.6",
			entityID:   "6",
			dataBinary: []byte{16},
			items: []interface{}{
				"trace_id-zzpp.111323",
				1,
				"database_id",
				"10.0.0.6_id",
				"/home_id",
				"database",
				"10.0.0.6",
				"/home",
				300,
				1622933202000000000,
				nil,
				nil,
				"MySQL",
				"10.1.1.6",
			},
		},
		{
			ts:         baseTs.Add(interval * 6),
			seriesID:   "mq_10.0.0.7",
			entityID:   "7",
			dataBinary: []byte{17},
			items: []interface{}{
				"trace_id-zzpp.111323",
				0,
				"nq_id",
				"10.0.0.7_id",
				"/home_id",
				"mq",
				"10.0.0.7",
				"/home",
				300,
				1622933202000000000,
				nil,
				nil,
				nil,
				nil,
				"test_topic",
				"10.0.0.7",
			},
		},
	}

	for _, ev := range entityValues {
		ok, err := svc.Write(metadata, ev.ts, ev.seriesID, ev.entityID, ev.dataBinary, ev.items...)
		tester.True(ok)
		tester.NoError(err)
	}
}

func TestQueryProcessor(t *testing.T) {
	tester := require.New(t)

	// setup services
	repo, traceSvc, gracefulStop := setupServices(tester)
	defer gracefulStop()

	baseTs := time.Now()
	setupData(tester, baseTs, traceSvc)

	tests := []struct {
		// name of the test case
		name string
		// queryGenerator is used to generate a Query
		queryGenerator func(baseTs time.Time) *v1.QueryRequest
		// wantLen is the length of entities expected to return
		wantLen int
	}{
		{
			name: "query given timeRange is out of the time range of data",
			queryGenerator: func(baseTs time.Time) *v1.QueryRequest {
				return pb.NewQueryRequestBuilder().
					Limit(10).
					Offset(0).
					Metadata("default", "sw").
					TimeRange(time.Unix(0, 0), time.Unix(0, 1)).
					Projection("trace_id").
					Build()
			},
			wantLen: 0,
		},
		{
			name: "query TraceID given timeRange includes the time range of data",
			queryGenerator: func(baseTs time.Time) *v1.QueryRequest {
				return pb.NewQueryRequestBuilder().
					Limit(10).
					Offset(0).
					Metadata("default", "sw").
					Fields("trace_id", "=", "trace_id-zzpp.111323").
					TimeRange(baseTs.Add(-1*time.Minute), baseTs.Add(1*time.Minute)).
					Projection("trace_id").
					Build()
			},
			wantLen: 3,
		},
		{
			name: "query TraceID given timeRange includes the time range of data but limit to 1",
			queryGenerator: func(baseTs time.Time) *v1.QueryRequest {
				return pb.NewQueryRequestBuilder().
					Limit(1).
					Offset(0).
					Metadata("default", "sw").
					Fields("trace_id", "=", "trace_id-zzpp.111323").
					TimeRange(baseTs.Add(-1*time.Minute), baseTs.Add(1*time.Minute)).
					Projection("trace_id").
					Build()
			},
			wantLen: 1,
		},
	}

	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			singleTester := require.New(t)
			now := time.Now()
			m := bus.NewMessage(bus.MessageID(now.UnixNano()), tt.queryGenerator(baseTs))
			f, err := repo.Publish(event.TopicQueryEvent, m)
			singleTester.NoError(err)
			singleTester.NotNil(f)
			msg, err := f.Get()
			singleTester.NoError(err)
			singleTester.NotNil(msg)
			// TODO: better error response
			singleTester.NotNil(msg.Data())
			singleTester.Len(msg.Data(), tt.wantLen)
		})
	}
}
