[BEAM-9615] Make UUIDs deterministic on Go types. (#14773)
Co-authored-by: lostluck <13907733+lostluck@users.noreply.github.com>
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/schema/logicaltypes.go b/sdks/go/pkg/beam/core/runtime/graphx/schema/logicaltypes.go
index a011a78..0bd33bd 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/schema/logicaltypes.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/schema/logicaltypes.go
@@ -51,7 +51,6 @@
// Registry retains mappings from go types to Schemas and LogicalTypes.
type Registry struct {
- lastShortID int64
typeToSchema map[reflect.Type]*pipepb.Schema
idToType map[string]reflect.Type
syntheticToUser map[reflect.Type]reflect.Type
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
index abb6f10..087d8c1 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
@@ -26,7 +26,9 @@
package schema
import (
+ "bytes"
"fmt"
+ "hash/fnv"
"reflect"
"strings"
@@ -72,8 +74,19 @@
defaultRegistry.RegisterType(ut)
}
-func getUUID() string {
- return uuid.New().String()
+// getUUID generates a UUID using the string form of the type name.
+func getUUID(ut reflect.Type) string {
+ // String produces non-empty output for pointer and slice types.
+ typename := ut.String()
+ hasher := fnv.New128a()
+ if n, err := hasher.Write([]byte(typename)); err != nil || n != len(typename) {
+ panic(fmt.Sprintf("unable to generate schema uuid for %s, wrote out %d bytes, want %d: err %v", typename, n, len(typename), err))
+ }
+ id, err := uuid.NewRandomFromReader(bytes.NewBuffer(hasher.Sum(nil)))
+ if err != nil {
+ panic(fmt.Sprintf("unable to genereate schema uuid for type %s: %v", typename, err))
+ }
+ return id.String()
}
// Registered returns whether the given type has been registered with
@@ -350,7 +363,7 @@
if lID != "" {
schm.Options = append(schm.Options, logicalOption(lID))
}
- schm.Id = getUUID()
+ schm.Id = getUUID(ot)
r.typeToSchema[ot] = schm
r.idToType[schm.GetId()] = ot
return schm, nil
@@ -365,7 +378,7 @@
// Cache the pointer type here with it's own id.
pt := reflect.PtrTo(t)
schm = proto.Clone(schm).(*pipepb.Schema)
- schm.Id = getUUID()
+ schm.Id = getUUID(pt)
schm.Options = append(schm.Options, &pipepb.Option{
Name: optGoNillable,
})
@@ -454,7 +467,7 @@
schm := ftype.GetRowType().GetSchema()
schm = proto.Clone(schm).(*pipepb.Schema)
schm.Options = append(schm.Options, logicalOption(lID))
- schm.Id = getUUID()
+ schm.Id = getUUID(t)
r.typeToSchema[t] = schm
r.idToType[schm.GetId()] = t
return schm, nil
@@ -483,7 +496,7 @@
schm := &pipepb.Schema{
Fields: fields,
- Id: getUUID(),
+ Id: getUUID(t),
}
r.idToType[schm.GetId()] = t
r.typeToSchema[t] = schm