blob: ced13b744d454786a0664acd42d1d32b57172688 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tools
import (
"context"
"errors"
"fmt"
"log"
"os"
"sync"
"time"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
"google.golang.org/protobuf/types/known/timestamppb"
)
// Logger is a wrapper around the FnAPI Logging Client, intended for
// container boot loader use. Not intended for Beam end users.
type Logger struct {
Endpoint string
client logSender
closeFn func()
mu sync.Mutex // To protect Send in the rare case multiple goroutines are calling this logger.
}
type logSender interface {
Send(*fnpb.LogEntry_List) error
CloseSend() error
}
// Close closes the grpc logging client.
func (l *Logger) Close() {
if l.closeFn != nil {
l.client.CloseSend()
l.closeFn()
l.closeFn = nil
l.client = nil
}
}
// Log a message with the given severity.
func (l *Logger) Log(ctx context.Context, sev fnpb.LogEntry_Severity_Enum, message string) {
l.mu.Lock()
defer l.mu.Unlock()
var exitErr error
defer func() {
if exitErr != nil {
log.Println("boot.go: error logging message over FnAPI. endpoint", l.Endpoint, "error:", exitErr, "message follows")
log.Println(sev.String(), message)
}
}()
if l.client == nil {
if l.Endpoint == "" {
exitErr = errors.New("no logging endpoint set")
return
}
cc, err := grpcx.Dial(ctx, l.Endpoint, 2*time.Minute)
if err != nil {
exitErr = err
return
}
l.closeFn = func() { cc.Close() }
l.client, err = fnpb.NewBeamFnLoggingClient(cc).Logging(ctx)
if err != nil {
exitErr = err
l.Close()
return
}
}
err := l.client.Send(&fnpb.LogEntry_List{
LogEntries: []*fnpb.LogEntry{
{
Severity: sev,
Timestamp: timestamppb.Now(),
Message: message,
},
},
})
if err != nil {
exitErr = err
return
}
}
// Printf logs the message with Debug severity.
func (l *Logger) Printf(ctx context.Context, format string, args ...any) {
l.Log(ctx, fnpb.LogEntry_Severity_DEBUG, fmt.Sprintf(format, args...))
}
// Warnf logs the message with Warning severity.
func (l *Logger) Warnf(ctx context.Context, format string, args ...any) {
l.Log(ctx, fnpb.LogEntry_Severity_WARN, fmt.Sprintf(format, args...))
}
// Errorf logs the message with Error severity.
func (l *Logger) Errorf(ctx context.Context, format string, args ...any) {
l.Log(ctx, fnpb.LogEntry_Severity_ERROR, fmt.Sprintf(format, args...))
}
// Fatalf logs the message with Critical severity, and then calls os.Exit(1).
func (l *Logger) Fatalf(ctx context.Context, format string, args ...any) {
l.Log(ctx, fnpb.LogEntry_Severity_CRITICAL, fmt.Sprintf(format, args...))
os.Exit(1)
}