blob: b9970c398154b2c4a08609aa577a38039596476d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package statecache
import (
"testing"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
)
// TestReusableInput implements the ReusableInput interface for the purposes
// of testing.
type TestReusableInput struct {
transformID string
sideInputID string
value interface{}
}
func makeTestReusableInput(transformID, sideInputID string, value interface{}) exec.ReusableInput {
return &TestReusableInput{transformID: transformID, sideInputID: sideInputID, value: value}
}
// Init is a ReusableInput interface method, this is a no-op.
func (r *TestReusableInput) Init() error {
return nil
}
// Value returns the stored value in the TestReusableInput.
func (r *TestReusableInput) Value() interface{} {
return r.value
}
// Reset clears the value in the TestReusableInput.
func (r *TestReusableInput) Reset() error {
r.value = nil
return nil
}
func TestInit(t *testing.T) {
var s SideInputCache
err := s.Init(5)
if err != nil {
t.Errorf("SideInputCache failed but should have succeeded, got %v", err)
}
}
func TestInit_Bad(t *testing.T) {
var s SideInputCache
err := s.Init(0)
if err == nil {
t.Error("SideInputCache init succeeded but should have failed")
}
}
func TestQueryCache_EmptyCase(t *testing.T) {
var s SideInputCache
err := s.Init(1)
if err != nil {
t.Fatalf("cache init failed, got %v", err)
}
output := s.QueryCache("side1", "transform1")
if output != nil {
t.Errorf("Cache hit when it should have missed, got %v", output)
}
}
func TestSetCache_UncacheableCase(t *testing.T) {
var s SideInputCache
err := s.Init(1)
if err != nil {
t.Fatalf("cache init failed, got %v", err)
}
input := makeTestReusableInput("t1", "s1", 10)
s.SetCache("t1", "s1", input)
output := s.QueryCache("t1", "s1")
if output != nil {
t.Errorf("Cache hit when should have missed, got %v", output)
}
}
func TestSetCache_CacheableCase(t *testing.T) {
var s SideInputCache
err := s.Init(1)
if err != nil {
t.Fatalf("cache init failed, got %v", err)
}
transID := "t1"
sideID := "s1"
tok := token("tok1")
s.setValidToken(transID, sideID, tok)
input := makeTestReusableInput(transID, sideID, 10)
s.SetCache(transID, sideID, input)
output := s.QueryCache(transID, sideID)
if output == nil {
t.Fatalf("call to query cache missed when should have hit")
}
val, ok := output.Value().(int)
if !ok {
t.Errorf("failed to convert value to integer, got %v", output.Value())
}
if val != 10 {
t.Errorf("element mismatch, expected 10, got %v", val)
}
}
func makeRequest(transformID, sideInputID string, t token) fnpb.ProcessBundleRequest_CacheToken {
var tok fnpb.ProcessBundleRequest_CacheToken
var wrap fnpb.ProcessBundleRequest_CacheToken_SideInput_
var side fnpb.ProcessBundleRequest_CacheToken_SideInput
side.TransformId = transformID
side.SideInputId = sideInputID
wrap.SideInput = &side
tok.Type = &wrap
tok.Token = []byte(t)
return tok
}
func TestSetValidTokens(t *testing.T) {
inputs := []struct {
transformID string
sideInputID string
tok token
}{
{
"t1",
"s1",
"tok1",
},
{
"t2",
"s2",
"tok2",
},
{
"t3",
"s3",
"tok3",
},
}
var s SideInputCache
err := s.Init(3)
if err != nil {
t.Fatalf("cache init failed, got %v", err)
}
var tokens []fnpb.ProcessBundleRequest_CacheToken
for _, input := range inputs {
t := makeRequest(input.transformID, input.sideInputID, input.tok)
tokens = append(tokens, t)
}
s.SetValidTokens(tokens...)
if len(s.idsToTokens) != len(inputs) {
t.Errorf("Missing tokens, expected %v, got %v", len(inputs), len(s.idsToTokens))
}
for i, input := range inputs {
// Check that the token is in the valid list
if !s.isValid(input.tok) {
t.Errorf("error in input %v, token %v is not valid", i, input.tok)
}
// Check that the mapping of IDs to tokens is correct
mapped := s.idsToTokens[input.transformID+input.sideInputID]
if mapped != input.tok {
t.Errorf("token mismatch for input %v, expected %v, got %v", i, input.tok, mapped)
}
}
}
func TestSetValidTokens_ClearingBetween(t *testing.T) {
inputs := []struct {
transformID string
sideInputID string
tk token
}{
{
"t1",
"s1",
"tok1",
},
{
"t2",
"s2",
"tok2",
},
{
"t3",
"s3",
"tok3",
},
}
var s SideInputCache
err := s.Init(1)
if err != nil {
t.Fatalf("cache init failed, got %v", err)
}
for i, input := range inputs {
tok := makeRequest(input.transformID, input.sideInputID, input.tk)
s.SetValidTokens(tok)
// Check that the token is in the valid list
if !s.isValid(input.tk) {
t.Errorf("error in input %v, token %v is not valid", i, input.tk)
}
// Check that the mapping of IDs to tokens is correct
mapped := s.idsToTokens[input.transformID+input.sideInputID]
if mapped != input.tk {
t.Errorf("token mismatch for input %v, expected %v, got %v", i, input.tk, mapped)
}
s.CompleteBundle(tok)
}
for k, _ := range s.validTokens {
if s.validTokens[k] != 0 {
t.Errorf("token count mismatch for token %v, expected 0, got %v", k, s.validTokens[k])
}
}
}
func TestSetCache_Eviction(t *testing.T) {
var s SideInputCache
err := s.Init(1)
if err != nil {
t.Fatalf("cache init failed, got %v", err)
}
tokOne := makeRequest("t1", "s1", "tok1")
inOne := makeTestReusableInput("t1", "s1", 10)
s.SetValidTokens(tokOne)
s.SetCache("t1", "s1", inOne)
// Mark bundle as complete, drop count for tokOne to 0
s.CompleteBundle(tokOne)
tokTwo := makeRequest("t2", "s2", "tok2")
inTwo := makeTestReusableInput("t2", "s2", 20)
s.SetValidTokens(tokTwo)
s.SetCache("t2", "s2", inTwo)
if len(s.cache) != 1 {
t.Errorf("cache size incorrect, expected 1, got %v", len(s.cache))
}
if s.metrics.Evictions != 1 {
t.Errorf("number evictions incorrect, expected 1, got %v", s.metrics.Evictions)
}
}
func TestSetCache_EvictionFailure(t *testing.T) {
var s SideInputCache
err := s.Init(1)
if err != nil {
t.Fatalf("cache init failed, got %v", err)
}
tokOne := makeRequest("t1", "s1", "tok1")
inOne := makeTestReusableInput("t1", "s1", 10)
tokTwo := makeRequest("t2", "s2", "tok2")
inTwo := makeTestReusableInput("t2", "s2", 20)
s.SetValidTokens(tokOne, tokTwo)
s.SetCache("t1", "s1", inOne)
// Should fail to evict because the first token is still valid
s.SetCache("t2", "s2", inTwo)
// Cache should not exceed size 1
if len(s.cache) != 1 {
t.Errorf("cache size incorrect, expected 1, got %v", len(s.cache))
}
if s.metrics.InUseEvictions != 1 {
t.Errorf("number of failed evicition calls incorrect, expected 1, got %v", s.metrics.InUseEvictions)
}
}