[BEAM-9615] Recover from registration panics. (#15228)
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
index 2e3ea3f..d8ed074 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
@@ -106,7 +106,14 @@
}
// reconcileRegistrations actually finishes the registration process.
-func (r *Registry) reconcileRegistrations() error {
+func (r *Registry) reconcileRegistrations() (deferedErr error) {
+ var ut reflect.Type
+ defer func() {
+ if r := recover(); r != nil {
+ deferedErr = errors.Errorf("panicked: %v", r)
+ deferedErr = errors.WithContextf(deferedErr, "reconciling schema registration for type %v", ut)
+ }
+ }()
for _, ut := range r.toReconcile {
check := func(ut reflect.Type) bool {
return coder.LookupCustomCoder(ut) != nil