/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package golang

import (
	"context"
	"sync"
	"time"

	"github.com/apache/rocketmq-clients/golang/v5/pkg/ticker"
	"github.com/apache/rocketmq-clients/golang/v5/pkg/utils"
	v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
)

// TODO: no global clientManager
type ClientManager interface {
	RegisterClient(client Client)
	UnRegisterClient(client Client)
	QueryRoute(ctx context.Context, endpoints *v2.Endpoints, request *v2.QueryRouteRequest, duration time.Duration) (*v2.QueryRouteResponse, error)
	HeartBeat(ctx context.Context, endpoints *v2.Endpoints, request *v2.HeartbeatRequest, duration time.Duration) (*v2.HeartbeatResponse, error)
	SendMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.SendMessageRequest, duration time.Duration) (*v2.SendMessageResponse, error)
	Telemetry(ctx context.Context, endpoints *v2.Endpoints, duration time.Duration) (v2.MessagingService_TelemetryClient, error)
	EndTransaction(ctx context.Context, endpoints *v2.Endpoints, request *v2.EndTransactionRequest, duration time.Duration) (*v2.EndTransactionResponse, error)
	NotifyClientTermination(ctx context.Context, endpoints *v2.Endpoints, request *v2.NotifyClientTerminationRequest, duration time.Duration) (*v2.NotifyClientTerminationResponse, error)
	ReceiveMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.ReceiveMessageRequest) (v2.MessagingService_ReceiveMessageClient, error)
	AckMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.AckMessageRequest, duration time.Duration) (*v2.AckMessageResponse, error)
	ChangeInvisibleDuration(ctx context.Context, endpoints *v2.Endpoints, request *v2.ChangeInvisibleDurationRequest, duration time.Duration) (*v2.ChangeInvisibleDurationResponse, error)
}

type clientManagerOptions struct {
	RPC_CLIENT_MAX_IDLE_DURATION time.Duration

	RPC_CLIENT_IDLE_CHECK_INITIAL_DELAY time.Duration
	RPC_CLIENT_IDLE_CHECK_PERIOD        time.Duration

	HEART_BEAT_INITIAL_DELAY time.Duration
	HEART_BEAT_PERIOD        time.Duration

	LOG_STATS_INITIAL_DELAY time.Duration
	LOG_STATS_PERIOD        time.Duration

	SYNC_SETTINGS_DELAY  time.Duration
	SYNC_SETTINGS_PERIOD time.Duration
}

var defaultClientManagerOptions = clientManagerOptions{
	RPC_CLIENT_MAX_IDLE_DURATION: time.Minute * 30,

	RPC_CLIENT_IDLE_CHECK_INITIAL_DELAY: time.Second * 5,
	RPC_CLIENT_IDLE_CHECK_PERIOD:        time.Minute * 1,

	HEART_BEAT_INITIAL_DELAY: time.Second * 1,
	HEART_BEAT_PERIOD:        time.Second * 10,

	LOG_STATS_INITIAL_DELAY: time.Second * 60,
	LOG_STATS_PERIOD:        time.Second * 60,

	SYNC_SETTINGS_DELAY:  time.Second * 1,
	SYNC_SETTINGS_PERIOD: time.Minute * 5,
}

type defaultClientManager struct {
	rpcClientTable     map[string]RpcClient
	rpcClientTableLock sync.RWMutex
	clientTable        sync.Map
	done               chan struct{}
	opts               clientManagerOptions
}

var _ = ClientManager(&defaultClientManager{})

var NewDefaultClientManager = func() *defaultClientManager {
	return &defaultClientManager{
		rpcClientTable: make(map[string]RpcClient),
		done:           make(chan struct{}),
		opts:           defaultClientManagerOptions,
	}
}

func (cm *defaultClientManager) RegisterClient(client Client) {
	cm.clientTable.Store(client.GetClientID(), client)
}

func (cm *defaultClientManager) UnRegisterClient(client Client) {
	cm.clientTable.Delete(client.GetClientID())
}

func (cm *defaultClientManager) startUp() {
	sugarBaseLogger.Info("begin to start the client manager")

	go func() {
		time.Sleep(cm.opts.RPC_CLIENT_IDLE_CHECK_INITIAL_DELAY)
		cm.clearIdleRpcClients()
		ticker.Tick(cm.clearIdleRpcClients, (cm.opts.RPC_CLIENT_IDLE_CHECK_PERIOD), cm.done)
	}()

	go func() {
		time.Sleep(cm.opts.HEART_BEAT_INITIAL_DELAY)
		cm.doHeartbeat()
		ticker.Tick(cm.doHeartbeat, (cm.opts.HEART_BEAT_PERIOD), cm.done)
	}()

	go func() {
		time.Sleep(cm.opts.LOG_STATS_INITIAL_DELAY)
		cm.doStats()
		ticker.Tick(cm.doStats, (cm.opts.LOG_STATS_PERIOD), cm.done)
	}()

	go func() {
		time.Sleep(cm.opts.SYNC_SETTINGS_DELAY)
		cm.syncSettings()
		ticker.Tick(cm.syncSettings, (cm.opts.SYNC_SETTINGS_PERIOD), cm.done)
	}()

	sugarBaseLogger.Info("the client manager starts successfully")
}
func (cm *defaultClientManager) deleteRpcClient(rpcClient RpcClient) {
	delete(cm.rpcClientTable, rpcClient.GetTarget())
	rpcClient.GracefulStop()
}

func (cm *defaultClientManager) clearIdleRpcClients() {
	cm.rpcClientTableLock.Lock()
	defer cm.rpcClientTableLock.Unlock()
	for target, rpcClient := range cm.rpcClientTable {
		idleDuration := rpcClient.idleDuration()
		if idleDuration > cm.opts.RPC_CLIENT_MAX_IDLE_DURATION {
			cm.deleteRpcClient(rpcClient)
			sugarBaseLogger.Warnf("rpc client has been idle for a long time, target=%s, idleDuration=%d, rpcClientMaxIdleDuration=%d\n", target, idleDuration, cm.opts.RPC_CLIENT_MAX_IDLE_DURATION)
		}
	}
}
func (cm *defaultClientManager) doHeartbeat() {
	sugarBaseLogger.Info("clientManager start doHeartbeat")
	cm.clientTable.Range(func(_, v interface{}) bool {
		client := v.(*defaultClient)
		client.Heartbeat()
		return true
	})
}
func (cm *defaultClientManager) doStats() {
	// TODO
}
func (cm *defaultClientManager) syncSettings() {
	sugarBaseLogger.Info("clientManager start syncSettings")
	cm.clientTable.Range(func(_, v interface{}) bool {
		client := v.(*defaultClient)
		client.trySyncSettings()
		return true
	})
}
func (cm *defaultClientManager) shutdown() {
	sugarBaseLogger.Info("begin to shutdown the client manager")
	cm.done <- struct{}{}
	close(cm.done)
	cm.cleanRpcClient()
	sugarBaseLogger.Info("shutdown the client manager successfully")
}
func (cm *defaultClientManager) cleanRpcClient() {
	sugarBaseLogger.Info("clientManager start cleanRpcClient")
	cm.rpcClientTableLock.Lock()
	defer cm.rpcClientTableLock.Unlock()
	for _, rpcClient := range cm.rpcClientTable {
		cm.deleteRpcClient(rpcClient)
	}
}
func (cm *defaultClientManager) getRpcClient(endpoints *v2.Endpoints) (RpcClient, error) {
	target := utils.ParseAddress(utils.SelectAnAddress(endpoints))

	cm.rpcClientTableLock.RLock()
	item, ok := cm.rpcClientTable[target]
	cm.rpcClientTableLock.RUnlock()
	if ok {
		if ret, ok := item.(*rpcClient); ok {
			return ret, nil
		}
	}

	cm.rpcClientTableLock.Lock()
	defer cm.rpcClientTableLock.Unlock()

	// double check
	item, ok = cm.rpcClientTable[target]
	if ok {
		if ret, ok := item.(*rpcClient); ok {
			return ret, nil
		}
	}
	rpcClient, err := NewRpcClient(target)
	if err != nil {
		return nil, err
	}
	cm.rpcClientTable[target] = rpcClient
	return rpcClient, nil
}
func (cm *defaultClientManager) handleGrpcError(rpcClient RpcClient, err error) {
	if err != nil {
		if e, ok := status.FromError(err); ok {
			if e.Code() == codes.Unavailable {
				sugarBaseLogger.Errorf("happened unavailable err=%w, close rpcClient=%s", err, rpcClient.GetTarget())
				cm.rpcClientTableLock.Lock()
				defer cm.rpcClientTableLock.Unlock()
				cm.deleteRpcClient(rpcClient)
			}
		}
	}
}
func (cm *defaultClientManager) QueryRoute(ctx context.Context, endpoints *v2.Endpoints, request *v2.QueryRouteRequest, duration time.Duration) (*v2.QueryRouteResponse, error) {
	ctx, _ = context.WithTimeout(ctx, duration)
	rpcClient, err := cm.getRpcClient(endpoints)
	if err != nil {
		return nil, err
	}
	ret, err := rpcClient.QueryRoute(ctx, request)
	cm.handleGrpcError(rpcClient, err)
	return ret, err
}

func (cm *defaultClientManager) SendMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.SendMessageRequest, duration time.Duration) (*v2.SendMessageResponse, error) {
	ctx, _ = context.WithTimeout(ctx, duration)
	rpcClient, err := cm.getRpcClient(endpoints)
	if err != nil {
		return nil, err
	}
	ret, err := rpcClient.SendMessage(ctx, request)
	cm.handleGrpcError(rpcClient, err)
	return ret, err
}

func (cm *defaultClientManager) Telemetry(ctx context.Context, endpoints *v2.Endpoints, duration time.Duration) (v2.MessagingService_TelemetryClient, error) {
	ctx, _ = context.WithTimeout(ctx, duration)
	rpcClient, err := cm.getRpcClient(endpoints)
	if err != nil {
		return nil, err
	}
	ret, err := rpcClient.Telemetry(ctx)
	cm.handleGrpcError(rpcClient, err)
	return ret, err
}

func (cm *defaultClientManager) EndTransaction(ctx context.Context, endpoints *v2.Endpoints, request *v2.EndTransactionRequest, duration time.Duration) (*v2.EndTransactionResponse, error) {
	ctx, _ = context.WithTimeout(ctx, duration)
	rpcClient, err := cm.getRpcClient(endpoints)
	if err != nil {
		return nil, err
	}
	ret, err := rpcClient.EndTransaction(ctx, request)
	cm.handleGrpcError(rpcClient, err)
	return ret, err
}

func (cm *defaultClientManager) HeartBeat(ctx context.Context, endpoints *v2.Endpoints, request *v2.HeartbeatRequest, duration time.Duration) (*v2.HeartbeatResponse, error) {
	ctx, _ = context.WithTimeout(ctx, duration)
	rpcClient, err := cm.getRpcClient(endpoints)
	if err != nil {
		return nil, err
	}

	ret, err := rpcClient.HeartBeat(ctx, request)
	cm.handleGrpcError(rpcClient, err)
	return ret, err
}

func (cm *defaultClientManager) NotifyClientTermination(ctx context.Context, endpoints *v2.Endpoints, request *v2.NotifyClientTerminationRequest, duration time.Duration) (*v2.NotifyClientTerminationResponse, error) {
	ctx, _ = context.WithTimeout(ctx, duration)
	rpcClient, err := cm.getRpcClient(endpoints)
	if err != nil {
		return nil, err
	}
	ret, err := rpcClient.NotifyClientTermination(ctx, request)
	cm.handleGrpcError(rpcClient, err)
	return ret, err
}

func (cm *defaultClientManager) ReceiveMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.ReceiveMessageRequest) (v2.MessagingService_ReceiveMessageClient, error) {
	rpcClient, err := cm.getRpcClient(endpoints)
	if err != nil {
		return nil, err
	}
	ret, err := rpcClient.ReceiveMessage(ctx, request)
	cm.handleGrpcError(rpcClient, err)
	return ret, err
}

func (cm *defaultClientManager) AckMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.AckMessageRequest, duration time.Duration) (*v2.AckMessageResponse, error) {
	ctx, _ = context.WithTimeout(ctx, duration)
	rpcClient, err := cm.getRpcClient(endpoints)
	if err != nil {
		return nil, err
	}
	ret, err := rpcClient.AckMessage(ctx, request)
	cm.handleGrpcError(rpcClient, err)
	return ret, err
}

func (cm *defaultClientManager) ChangeInvisibleDuration(ctx context.Context, endpoints *v2.Endpoints, request *v2.ChangeInvisibleDurationRequest, duration time.Duration) (*v2.ChangeInvisibleDurationResponse, error) {
	ctx, _ = context.WithTimeout(ctx, duration)
	rpcClient, err := cm.getRpcClient(endpoints)
	if err != nil {
		return nil, err
	}
	ret, err := rpcClient.ChangeInvisibleDuration(ctx, request)
	cm.handleGrpcError(rpcClient, err)
	return ret, err
}
