HertzBeat-Collector-Go is the Go implementation of the collector for Apache HertzBeat. It supports multi-protocol and multi-type monitoring data collection, featuring high performance, easy extensibility, and seamless integration.
. ├── cmd/ # Main entry point ├── internal/ # Core collector implementation and common modules │ ├── collector/ # Various collectors │ ├── common/ # Common modules (scheduling, jobs, types, logging, etc.) │ └── util/ # Utilities ├── api/ # Protocol definitions (protobuf) ├── examples/ # Example code ├── docs/ # Architecture and development docs ├── tools/ # Build, CI, scripts, and tools ├── Makefile # Build entry └── README.md # Project description
The collector implements a unified configuration system with three main components:
Central configuration factory that provides:
// Create configuration with defaults factory := config.NewConfigFactory() cfg := factory.CreateDefaultConfig() // Create from environment variables envCfg := factory.CreateFromEnv() // Merge file config with environment overrides mergedCfg := factory.MergeWithEnv(fileCfg) // Validate configuration if err := factory.ValidateConfig(cfg); err != nil { log.Fatal("Invalid configuration:", err) }
Three distinct entry points for different use cases:
config.LoadFromFile(path)
: File-only configuration loadingconfig.LoadFromEnv()
: Environment-only configuration loadingconfig.LoadUnified(path)
: Combined file + environment loading (recommended)type CollectorConfig struct {
Collector CollectorSection `yaml:"collector"`
}
type CollectorSection struct {
Info CollectorInfo `yaml:"info"`
Log CollectorLogConfig `yaml:"log"`
Manager ManagerConfig `yaml:"manager"`
Identity string `yaml:"identity"`
Mode string `yaml:"mode"`
}
type ManagerConfig struct {
Host string `yaml:"host"`
Port string `yaml:"port"`
Protocol string `yaml:"protocol"`
}
The system includes comprehensive validation:
Field | Default Value | Description |
---|---|---|
Identity | hertzbeat-collector-go | Collector identifier |
Mode | public | Collector mode |
Collector.Name | hertzbeat-collector-go | Collector service name |
Collector.IP | 127.0.0.1 | Collector bind address |
Collector.Port | 8080 | Collector service port |
Manager.Host | 127.0.0.1 | Manager server host |
Manager.Port | 1158 | Manager server port |
Manager.Protocol | netty | Communication protocol |
Log.Level | info | Logging level |
If you have existing configurations, here's how to migrate:
server: host: "0.0.0.0" port: 1158 transport: protocol: "netty" server_addr: "127.0.0.1:1158"
collector: info: name: hertzbeat-collector-go ip: 0.0.0.0 port: 8080 manager: host: 127.0.0.1 port: 1158 protocol: netty identity: hertzbeat-collector-go mode: public
# Install dependencies go mod tidy # Build make build # Run ./bin/collector server --config etc/hertzbeat-collector.yaml
The collector supports multiple configuration methods with a unified configuration system:
# Run with configuration file ./bin/collector server --config etc/hertzbeat-collector.yaml
Example configuration file (etc/hertzbeat-collector.yaml
):
collector: info: name: hertzbeat-collector-go ip: 127.0.0.1 port: 8080 log: level: debug # Manager/Transport configuration manager: host: 127.0.0.1 port: 1158 protocol: netty # Collector identity and mode identity: hertzbeat-collector-go mode: public
The Go version is fully compatible with the Java version's environment variable configuration:
# Set environment variables export IDENTITY=local export COLLECTOR_NAME=hertzbeat-collector-go export COLLECTOR_IP=127.0.0.1 export COLLECTOR_PORT=8080 export MANAGER_HOST=192.168.97.0 export MANAGER_PORT=1158 export MANAGER_PROTOCOL=grpc export MODE=public export LOG_LEVEL=info # Run with environment variables ./bin/collector server # Or use Docker docker run -d \ -e IDENTITY=local \ -e MANAGER_HOST=192.168.97.0 \ -e MANAGER_PORT=1158 \ -e MANAGER_PROTOCOL=grpc \ -e MODE=public \ --name hertzbeat-collector-go \ hertzbeat-collector-go
The collector uses a unified configuration system that supports both file and environment variable configurations:
Configuration precedence (highest to lowest):
Environment Variable | Description | Default Value |
---|---|---|
IDENTITY | Collector identity | hertzbeat-collector-go |
MODE | Collector mode (public /private ) | public |
COLLECTOR_NAME | Collector name | hertzbeat-collector-go |
COLLECTOR_IP | Collector bind IP | 127.0.0.1 |
COLLECTOR_PORT | Collector bind port | 8080 |
MANAGER_HOST | Manager server host | 127.0.0.1 |
MANAGER_PORT | Manager server port | 1158 |
MANAGER_PROTOCOL | Protocol (netty /grpc ) | netty |
LOG_LEVEL | Log level | info |
See examples/
directory for various usage examples:
examples/main.go
- Main example with environment variablesexamples/README.md
- Complete usage guideexamples/Dockerfile
- Docker build exampleThis Go collector is designed to be compatible with the Java version of HertzBeat manager server. The transport layer supports both gRPC and Netty protocols for seamless integration.
The Go collector supports two communication protocols:
Netty Protocol (Recommended for Java server compatibility)
gRPC Protocol
The collector supports flexible configuration through multiple entry points:
# etc/hertzbeat-collector.yaml collector: info: name: hertzbeat-collector-go ip: 127.0.0.1 port: 8080 log: level: debug # Manager/Transport configuration manager: host: 127.0.0.1 port: 1158 protocol: netty # Collector identity and mode identity: hertzbeat-collector-go mode: public
The collector provides three configuration loading methods:
File-only Configuration:
import "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/config"
cfg, err := config.LoadFromFile("etc/hertzbeat-collector.yaml")
if err != nil {
log.Fatal("Failed to load config:", err)
}
Environment-only Configuration:
import "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/config"
cfg := config.LoadFromEnv()
Unified Configuration (Recommended):
import "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/config" // Environment variables override file values cfg, err := config.LoadUnified("etc/hertzbeat-collector.yaml") if err != nil { log.Fatal("Failed to load config:", err) }
package main import ( "context" "log" "os" "os/signal" "syscall" "time" "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/config" "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/transport" ) func main() { // Load configuration using unified loader (file + env) cfg, err := config.LoadUnified("etc/hertzbeat-collector.yaml") if err != nil { log.Fatal("Failed to load configuration:", err) } // Create transport runner with unified config runner := transport.New(cfg) ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Start transport in background go func() { if err := runner.Start(ctx); err != nil { log.Printf("Failed to start transport: %v", err) cancel() } }() // Wait for shutdown signal sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) <-sigChan log.Println("Shutting down...") if err := runner.Close(); err != nil { log.Printf("Failed to close transport: %v", err) } }
For more granular control, you can use the transport client directly:
package main import ( "log" "hertzbeat.apache.org/hertzbeat-collector-go/internal/transport" pb "hertzbeat.apache.org/hertzbeat-collector-go/api/cluster_msg" ) func main() { // Create Netty client for Java server factory := &transport.TransportClientFactory{} client, err := factory.CreateClient("netty", "127.0.0.1:1158") if err != nil { log.Fatal("Failed to create client:", err) } // Start client if err := client.Start(); err != nil { log.Fatal("Failed to start client:", err) } defer client.Shutdown() // Register message processor client.RegisterProcessor(100, func(msg interface{}) (interface{}, error) { if pbMsg, ok := msg.(*pb.Message); ok { log.Printf("Received message: %s", string(pbMsg.Msg)) return &pb.Message{ Type: pb.MessageType_HEARTBEAT, Direction: pb.Direction_RESPONSE, Identity: pbMsg.Identity, Msg: []byte("response"), }, nil } return nil, nil }) // Send heartbeat message heartbeat := &pb.Message{ Type: pb.MessageType_HEARTBEAT, Direction: pb.Direction_REQUEST, Identity: "go-collector", Msg: []byte("heartbeat"), } // Async send if err := client.SendMsg(heartbeat); err != nil { log.Printf("Failed to send message: %v", err) } // Sync send with timeout resp, err := client.SendMsgSync(heartbeat, 5000) if err != nil { log.Printf("Failed to send sync message: %v", err) } else if resp != nil { if pbResp, ok := resp.(*pb.Message); ok { log.Printf("Received response: %s", string(pbResp.Msg)) } } }
The Go collector supports all message types defined in the Java version:
Message Type | Value | Description |
---|---|---|
HEARTBEAT | 0 | Heartbeat/health check |
GO_ONLINE | 1 | Collector online notification |
GO_OFFLINE | 2 | Collector offline notification |
GO_CLOSE | 3 | Collector shutdown notification |
ISSUE_CYCLIC_TASK | 4 | Issue cyclic collection task |
DELETE_CYCLIC_TASK | 5 | Delete cyclic collection task |
ISSUE_ONE_TIME_TASK | 6 | Issue one-time collection task |
The transport layer provides robust connection management:
The implementation includes comprehensive error handling:
The Go collector implementation provides comprehensive compatibility with the Java version:
Transport Layer Compatibility
Connection Management
Message Processing
Protocol Compatibility
Task Processing Logic
Configuration Management
Monitoring and Metrics
Netty Protocol Implementation
// Length-prefixed message format for Java compatibility func (c *NettyClient) writeMessage(msg *pb.Message) error { data, err := proto.Marshal(msg) if err != nil { return fmt.Errorf("failed to marshal message: %w", err) } // Write length prefix (varint32) length := len(data) if err := binary.Write(c.writer, binary.BigEndian, uint32(length)); err != nil { return fmt.Errorf("failed to write length: %w", err) } // Write message data if _, err := c.writer.Write(data); err != nil { return fmt.Errorf("failed to write message: %w", err) } return c.writer.Flush() }
Response Future Pattern
// Synchronous communication using ResponseFuture func (c *NettyClient) SendMsgSync(msg interface{}, timeoutMillis int) (interface{}, error) { // Create response future for this request future := NewResponseFuture() c.responseTable[pbMsg.Identity] = future defer delete(c.responseTable, pbMsg.Identity) // Send message if err := c.writeMessage(pbMsg); err != nil { future.PutError(err) return nil, err } // Wait for response with timeout return future.Wait(time.Duration(timeoutMillis) * time.Millisecond) }
Event-Driven Architecture
// Connection event handling func (c *NettyClient) triggerEvent(eventType EventType, err error) { if c.eventHandler != nil { c.eventHandler(Event{ Type: eventType, Address: c.addr, Error: err, }) } }
The Go implementation achieves high compatibility with the Java version:
For Production Use:
For Development:
Testing Strategy:
The Go collector implementation successfully recreates the core communication capabilities of the Java version, providing a solid foundation for HertzBeat monitoring data collection in Go.
Contributions are welcome! Please see CONTRIBUTING.md for details, including code, documentation, tests, and discussions.
This project is licensed under the Apache 2.0 License.
For Chinese documentation, please see README-CN.md.