blob: 140b50d229f091fa09b335fc6ab4caf9eaa51a8f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package main
import (
"org/apache/htrace/common"
"time"
)
type Heartbeater struct {
// The name of this heartbeater
name string
// How long to sleep between heartbeats, in milliseconds.
periodMs int64
// The logger to use.
lg *common.Logger
// The channels to send the heartbeat on.
targets []HeartbeatTarget
// Incoming requests to the heartbeater. When this is closed, the
// heartbeater will exit.
req chan *HeartbeatTarget
}
type HeartbeatTarget struct {
// The name of the heartbeat target.
name string
// The channel for the heartbeat target.
targetChan chan interface{}
}
func (tgt *HeartbeatTarget) String() string {
return tgt.name
}
func NewHeartbeater(name string, periodMs int64, lg *common.Logger) *Heartbeater {
hb := &Heartbeater{
name: name,
periodMs: periodMs,
lg: lg,
targets: make([]HeartbeatTarget, 0, 4),
req: make(chan *HeartbeatTarget),
}
go hb.run()
return hb
}
func (hb *Heartbeater) AddHeartbeatTarget(tgt *HeartbeatTarget) {
hb.req <- tgt
}
func (hb *Heartbeater) Shutdown() {
close(hb.req)
}
func (hb *Heartbeater) String() string {
return hb.name
}
func (hb *Heartbeater) run() {
period := time.Duration(hb.periodMs) * time.Millisecond
for {
periodEnd := time.Now().Add(period)
for {
timeToWait := periodEnd.Sub(time.Now())
if timeToWait <= 0 {
break
} else if timeToWait > period {
// Smooth over jitter or clock changes
timeToWait = period
periodEnd = time.Now().Add(period)
}
select {
case tgt, open := <-hb.req:
if !open {
hb.lg.Debugf("%s: exiting.\n", hb.String())
return
}
hb.targets = append(hb.targets, *tgt)
hb.lg.Debugf("%s: added %s.\n", hb.String(), tgt.String())
case <-time.After(timeToWait):
}
}
for targetIdx := range hb.targets {
select {
case hb.targets[targetIdx].targetChan <- nil:
default:
// We failed to send a heartbeat because the other goroutine was busy and
// hasn't cleared the previous one from its channel. This could indicate a
// stuck goroutine.
hb.lg.Infof("%s: could not send heartbeat to %s.\n",
hb.String(), hb.targets[targetIdx])
}
}
}
}