/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package main

import (
	"htrace/common"
	"htrace/conf"
	"math"
	"sync"
	"time"
)

//
// The Metrics Sink for HTraced.
//
// The Metrics sink keeps track of metrics for the htraced daemon.
// It is important to have good metrics so that we can properly manager htraced.  In particular, we
// need to know what rate we are receiving spans at, the main places spans came from.  If spans
// were dropped because of a high sampling rates, we need to know which part of the system dropped
// them so that we can adjust the sampling rate there.
//

const LATENCY_CIRC_BUF_SIZE = 4096

type MetricsSink struct {
	// The metrics sink logger.
	lg *common.Logger

	// The maximum number of entries we shuld allow in the HostSpanMetrics map.
	maxMtx int

	// The total number of spans ingested by the server (counting dropped spans)
	IngestedSpans uint64

	// The total number of spans written to leveldb since the server started.
	WrittenSpans uint64

	// The total number of spans dropped by the server.
	ServerDropped uint64

	// Per-host Span Metrics
	HostSpanMetrics common.SpanMetricsMap

	// The last few writeSpan latencies
	wsLatencyCircBuf *CircBufU32

	// Lock protecting all metrics
	lock sync.Mutex
}

func NewMetricsSink(cnf *conf.Config) *MetricsSink {
	return &MetricsSink{
		lg:               common.NewLogger("metrics", cnf),
		maxMtx:           cnf.GetInt(conf.HTRACE_METRICS_MAX_ADDR_ENTRIES),
		HostSpanMetrics:  make(common.SpanMetricsMap),
		wsLatencyCircBuf: NewCircBufU32(LATENCY_CIRC_BUF_SIZE),
	}
}

// Update the total number of spans which were ingested, as well as other
// metrics that get updated during span ingest.
func (msink *MetricsSink) UpdateIngested(addr string, totalIngested int,
	serverDropped int, wsLatency time.Duration) {
	msink.lock.Lock()
	defer msink.lock.Unlock()
	msink.IngestedSpans += uint64(totalIngested)
	msink.ServerDropped += uint64(serverDropped)
	msink.updateSpanMetrics(addr, 0, serverDropped)
	wsLatencyMs := wsLatency.Nanoseconds() / 1000000
	var wsLatency32 uint32
	if wsLatencyMs > math.MaxUint32 {
		wsLatency32 = math.MaxUint32
	} else {
		wsLatency32 = uint32(wsLatencyMs)
	}
	msink.wsLatencyCircBuf.Append(wsLatency32)
}

// Update the per-host span metrics.  Must be called with the lock held.
func (msink *MetricsSink) updateSpanMetrics(addr string, numWritten int,
	serverDropped int) {
	mtx, found := msink.HostSpanMetrics[addr]
	if !found {
		// Ensure that the per-host span metrics map doesn't grow too large.
		if len(msink.HostSpanMetrics) >= msink.maxMtx {
			// Delete a random entry
			for k := range msink.HostSpanMetrics {
				msink.lg.Warnf("Evicting metrics entry for addr %s "+
					"because there are more than %d addrs.\n", k, msink.maxMtx)
				delete(msink.HostSpanMetrics, k)
				break
			}
		}
		mtx = &common.SpanMetrics{}
		msink.HostSpanMetrics[addr] = mtx
	}
	mtx.Written += uint64(numWritten)
	mtx.ServerDropped += uint64(serverDropped)
}

// Update the total number of spans which were persisted to disk.
func (msink *MetricsSink) UpdatePersisted(addr string, totalWritten int,
	serverDropped int) {
	msink.lock.Lock()
	defer msink.lock.Unlock()
	msink.WrittenSpans += uint64(totalWritten)
	msink.ServerDropped += uint64(serverDropped)
	msink.updateSpanMetrics(addr, totalWritten, serverDropped)
}

// Read the server stats.
func (msink *MetricsSink) PopulateServerStats(stats *common.ServerStats) {
	msink.lock.Lock()
	defer msink.lock.Unlock()
	stats.IngestedSpans = msink.IngestedSpans
	stats.WrittenSpans = msink.WrittenSpans
	stats.ServerDroppedSpans = msink.ServerDropped
	stats.MaxWriteSpansLatencyMs = msink.wsLatencyCircBuf.Max()
	stats.AverageWriteSpansLatencyMs = msink.wsLatencyCircBuf.Average()
	stats.HostSpanMetrics = make(common.SpanMetricsMap)
	for k, v := range msink.HostSpanMetrics {
		stats.HostSpanMetrics[k] = &common.SpanMetrics{
			Written:       v.Written,
			ServerDropped: v.ServerDropped,
		}
	}
}

// A circular buffer of uint32s which supports appending and taking the
// average, and some other things.
type CircBufU32 struct {
	// The next slot to fill
	slot int

	// The number of slots which are in use.  This number only ever
	// increases until the buffer is full.
	slotsUsed int

	// The buffer
	buf []uint32
}

func NewCircBufU32(size int) *CircBufU32 {
	return &CircBufU32{
		slotsUsed: -1,
		buf:       make([]uint32, size),
	}
}

func (cbuf *CircBufU32) Max() uint32 {
	var max uint32
	for bufIdx := 0; bufIdx < cbuf.slotsUsed; bufIdx++ {
		if cbuf.buf[bufIdx] > max {
			max = cbuf.buf[bufIdx]
		}
	}
	return max
}

func (cbuf *CircBufU32) Average() uint32 {
	var total uint64
	for bufIdx := 0; bufIdx < cbuf.slotsUsed; bufIdx++ {
		total += uint64(cbuf.buf[bufIdx])
	}
	return uint32(total / uint64(cbuf.slotsUsed))
}

func (cbuf *CircBufU32) Append(val uint32) {
	cbuf.buf[cbuf.slot] = val
	cbuf.slot++
	if cbuf.slotsUsed < cbuf.slot {
		cbuf.slotsUsed = cbuf.slot
	}
	if cbuf.slot >= len(cbuf.buf) {
		cbuf.slot = 0
	}
}
