blob: 9f542d2d5ab6b8357a5839a9a6426b62b34aa14b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tools
import (
"context"
"testing"
"time"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
)
func TestBufferedLogger(t *testing.T) {
ctx := context.Background()
t.Run("write", func(t *testing.T) {
catcher := &logCatcher{}
l := &Logger{client: catcher}
bl := NewBufferedLogger(l)
message := []byte("test message")
n, err := bl.Write(message)
if err != nil {
t.Errorf("got error %v", err)
}
if got, want := n, len(message); got != want {
t.Errorf("got %d bytes written, want %d", got, want)
}
if got, want := bl.logs[0], "test message"; got != want {
t.Errorf("got message %q, want %q", got, want)
}
})
t.Run("flush single message", func(t *testing.T) {
catcher := &logCatcher{}
l := &Logger{client: catcher}
bl := NewBufferedLogger(l)
message := []byte("test message")
n, err := bl.Write(message)
if err != nil {
t.Errorf("got error %v", err)
}
if got, want := n, len(message); got != want {
t.Errorf("got %d bytes written, want %d", got, want)
}
bl.FlushAtDebug(ctx)
received := catcher.msgs[0].GetLogEntries()[0]
if got, want := received.Message, "test message"; got != want {
t.Errorf("got message %q, want %q", got, want)
}
if got, want := received.Severity, fnpb.LogEntry_Severity_DEBUG; got != want {
t.Errorf("got severity %v, want %v", got, want)
}
})
t.Run("flush multiple messages", func(t *testing.T) {
catcher := &logCatcher{}
l := &Logger{client: catcher}
bl := NewBufferedLogger(l)
messages := []string{"foo", "bar", "baz"}
for _, message := range messages {
messBytes := []byte(message)
n, err := bl.Write(messBytes)
if err != nil {
t.Errorf("got error %v", err)
}
if got, want := n, len(messBytes); got != want {
t.Errorf("got %d bytes written, want %d", got, want)
}
}
bl.FlushAtDebug(ctx)
received := catcher.msgs[0].GetLogEntries()
for i, message := range received {
if got, want := message.Message, messages[i]; got != want {
t.Errorf("got message %q, want %q", got, want)
}
if got, want := message.Severity, fnpb.LogEntry_Severity_DEBUG; got != want {
t.Errorf("got severity %v, want %v", got, want)
}
}
})
t.Run("flush single message at error", func(t *testing.T) {
catcher := &logCatcher{}
l := &Logger{client: catcher}
bl := NewBufferedLogger(l)
message := []byte("test error")
n, err := bl.Write(message)
if err != nil {
t.Errorf("got error %v", err)
}
if got, want := n, len(message); got != want {
t.Errorf("got %d bytes written, want %d", got, want)
}
bl.FlushAtError(ctx)
received := catcher.msgs[0].GetLogEntries()[0]
if got, want := received.Message, "test error"; got != want {
t.Errorf("got message %q, want %q", got, want)
}
if got, want := received.Severity, fnpb.LogEntry_Severity_ERROR; got != want {
t.Errorf("got severity %v, want %v", got, want)
}
})
t.Run("flush multiple messages at error", func(t *testing.T) {
catcher := &logCatcher{}
l := &Logger{client: catcher}
bl := NewBufferedLogger(l)
messages := []string{"foo", "bar", "baz"}
for _, message := range messages {
messBytes := []byte(message)
n, err := bl.Write(messBytes)
if err != nil {
t.Errorf("got error %v", err)
}
if got, want := n, len(messBytes); got != want {
t.Errorf("got %d bytes written, want %d", got, want)
}
}
bl.FlushAtError(ctx)
received := catcher.msgs[0].GetLogEntries()
for i, message := range received {
if got, want := message.Message, messages[i]; got != want {
t.Errorf("got message %q, want %q", got, want)
}
if got, want := message.Severity, fnpb.LogEntry_Severity_ERROR; got != want {
t.Errorf("got severity %v, want %v", got, want)
}
}
})
t.Run("direct print", func(t *testing.T) {
catcher := &logCatcher{}
l := &Logger{client: catcher}
bl := NewBufferedLogger(l)
bl.Printf(ctx, "foo %v", "bar")
received := catcher.msgs[0].GetLogEntries()[0]
if got, want := received.Message, "foo bar"; got != want {
t.Errorf("l.Printf(\"foo %%v\", \"bar\"): got message %q, want %q", got, want)
}
if got, want := received.Severity, fnpb.LogEntry_Severity_DEBUG; got != want {
t.Errorf("l.Printf(\"foo %%v\", \"bar\"): got severity %v, want %v", got, want)
}
})
t.Run("debug flush at interval", func(t *testing.T) {
catcher := &logCatcher{}
l := &Logger{client: catcher}
interval := 5 * time.Second
bl := NewBufferedLoggerWithFlushInterval(context.Background(), l, interval)
startTime := time.Now()
bl.now = func() time.Time { return startTime }
messages := []string{"foo", "bar"}
for i, message := range messages {
if i > 1 {
bl.now = func() time.Time { return startTime.Add(6 * time.Second) }
}
messBytes := []byte(message)
n, err := bl.Write(messBytes)
if err != nil {
t.Errorf("got error %v", err)
}
if got, want := n, len(messBytes); got != want {
t.Errorf("got %d bytes written, want %d", got, want)
}
}
lastMessage := "baz"
bl.now = func() time.Time { return startTime.Add(6 * time.Second) }
messBytes := []byte(lastMessage)
n, err := bl.Write(messBytes)
if err != nil {
t.Errorf("got error %v", err)
}
if got, want := n, len(messBytes); got != want {
t.Errorf("got %d bytes written, want %d", got, want)
}
// Type should have auto-flushed at debug after the third message
received := catcher.msgs[0].GetLogEntries()
messages = append(messages, lastMessage)
for i, message := range received {
if got, want := message.Message, messages[i]; got != want {
t.Errorf("got message %q, want %q", got, want)
}
if got, want := message.Severity, fnpb.LogEntry_Severity_DEBUG; got != want {
t.Errorf("got severity %v, want %v", got, want)
}
}
})
}