| // Licensed to the Apache Software Foundation (ASF) under one or more |
| // contributor license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright ownership. |
| // The ASF licenses this file to You under the Apache License, Version 2.0 |
| // (the "License"); you may not use this file except in compliance with |
| // the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package graphx |
| |
| import ( |
| "fmt" |
| |
| "github.com/apache/beam/sdks/go/pkg/beam/core/graph" |
| "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder" |
| "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window" |
| v1 "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx/v1" |
| "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/pipelinex" |
| "github.com/apache/beam/sdks/go/pkg/beam/core/util/protox" |
| "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" |
| pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1" |
| "github.com/golang/protobuf/proto" |
| "github.com/golang/protobuf/ptypes" |
| ) |
| |
| // Model constants for interfacing with a Beam runner. |
| // TODO(lostluck): 2018/05/28 Extract these from their enum descriptors in the pipeline_v1 proto |
| const ( |
| URNImpulse = "beam:transform:impulse:v1" |
| URNParDo = "urn:beam:transform:pardo:v1" |
| URNFlatten = "beam:transform:flatten:v1" |
| URNGBK = "beam:transform:group_by_key:v1" |
| URNCombinePerKey = "beam:transform:combine_per_key:v1" |
| URNWindow = "beam:transform:window:v1" |
| |
| // URNIterableSideInput = "beam:side_input:iterable:v1" |
| URNMultimapSideInput = "beam:side_input:multimap:v1" |
| |
| URNGlobalWindowsWindowFn = "beam:windowfn:global_windows:v0.1" |
| URNFixedWindowsWindowFn = "beam:windowfn:fixed_windows:v0.1" |
| URNSlidingWindowsWindowFn = "beam:windowfn:sliding_windows:v0.1" |
| URNSessionsWindowFn = "beam:windowfn:session_windows:v0.1" |
| |
| // SDK constants |
| |
| // URNJavaDoFn is the legacy constant for marking a DoFn. |
| // TODO: remove URNJavaDoFN when the Dataflow runner |
| // uses the model pipeline and no longer falls back to Java. |
| URNJavaDoFn = "urn:beam:dofn:javasdk:0.1" |
| URNDoFn = "beam:go:transform:dofn:v1" |
| |
| URNIterableSideInputKey = "beam:go:transform:iterablesideinputkey:v1" |
| ) |
| |
| // TODO(herohde) 11/6/2017: move some of the configuration into the graph during construction. |
| |
| // Options for marshalling a graph into a model pipeline. |
| type Options struct { |
| // Environment used to run the user code. |
| Environment pb.Environment |
| } |
| |
| // Marshal converts a graph to a model pipeline. |
| func Marshal(edges []*graph.MultiEdge, opt *Options) (*pb.Pipeline, error) { |
| tree := NewScopeTree(edges) |
| |
| m := newMarshaller(opt) |
| for _, edge := range tree.Edges { |
| m.addMultiEdge(edge) |
| } |
| for _, t := range tree.Children { |
| m.addScopeTree(t) |
| } |
| |
| p := &pb.Pipeline{ |
| Components: m.build(), |
| } |
| return pipelinex.Normalize(p) |
| } |
| |
| type marshaller struct { |
| opt *Options |
| |
| transforms map[string]*pb.PTransform |
| pcollections map[string]*pb.PCollection |
| windowing map[string]*pb.WindowingStrategy |
| environments map[string]*pb.Environment |
| |
| coders *CoderMarshaller |
| |
| windowing2id map[string]string |
| } |
| |
| func newMarshaller(opt *Options) *marshaller { |
| return &marshaller{ |
| opt: opt, |
| transforms: make(map[string]*pb.PTransform), |
| pcollections: make(map[string]*pb.PCollection), |
| windowing: make(map[string]*pb.WindowingStrategy), |
| environments: make(map[string]*pb.Environment), |
| coders: NewCoderMarshaller(), |
| windowing2id: make(map[string]string), |
| } |
| } |
| |
| func (m *marshaller) build() *pb.Components { |
| return &pb.Components{ |
| Transforms: m.transforms, |
| Pcollections: m.pcollections, |
| WindowingStrategies: m.windowing, |
| Environments: m.environments, |
| Coders: m.coders.Build(), |
| } |
| } |
| |
| func (m *marshaller) addScopeTree(s *ScopeTree) string { |
| id := scopeID(s.Scope.Scope) |
| if _, exists := m.transforms[id]; exists { |
| return id |
| } |
| |
| var subtransforms []string |
| for _, edge := range s.Edges { |
| subtransforms = append(subtransforms, m.addMultiEdge(edge)) |
| } |
| for _, tree := range s.Children { |
| subtransforms = append(subtransforms, m.addScopeTree(tree)) |
| } |
| |
| transform := &pb.PTransform{ |
| UniqueName: s.Scope.Name, |
| Subtransforms: subtransforms, |
| } |
| |
| m.updateIfCombineComposite(s, transform) |
| |
| m.transforms[id] = transform |
| return id |
| } |
| |
| // updateIfCombineComposite examines the scope tree and sets the PTransform Spec |
| // to be a CombinePerKey with a CombinePayload if it's a liftable composite. |
| // Beam Portability requires that composites contain an implementation for runners |
| // that don't understand the URN and Payload, which this lightly checks for. |
| func (m *marshaller) updateIfCombineComposite(s *ScopeTree, transform *pb.PTransform) { |
| if s.Scope.Name != graph.CombinePerKeyScope || |
| len(s.Edges) != 2 || |
| len(s.Edges[0].Edge.Input) != 1 || |
| len(s.Edges[1].Edge.Output) != 1 || |
| s.Edges[1].Edge.Op != graph.Combine { |
| return |
| } |
| |
| edge := s.Edges[1].Edge |
| acID := m.coders.Add(edge.AccumCoder) |
| payload := &pb.CombinePayload{ |
| CombineFn: &pb.SdkFunctionSpec{ |
| Spec: &pb.FunctionSpec{ |
| Urn: URNJavaDoFn, |
| Payload: []byte(mustEncodeMultiEdgeBase64(edge)), |
| }, |
| EnvironmentId: m.addDefaultEnv(), |
| }, |
| AccumulatorCoderId: acID, |
| } |
| transform.Spec = &pb.FunctionSpec{Urn: URNCombinePerKey, Payload: protox.MustEncode(payload)} |
| } |
| |
| func (m *marshaller) addMultiEdge(edge NamedEdge) string { |
| id := edgeID(edge.Edge) |
| if _, exists := m.transforms[id]; exists { |
| return id |
| } |
| |
| if edge.Edge.Op == graph.CoGBK && len(edge.Edge.Input) > 1 { |
| return m.expandCoGBK(edge) |
| } |
| |
| inputs := make(map[string]string) |
| for i, in := range edge.Edge.Input { |
| m.addNode(in.From) |
| inputs[fmt.Sprintf("i%v", i)] = nodeID(in.From) |
| } |
| outputs := make(map[string]string) |
| for i, out := range edge.Edge.Output { |
| m.addNode(out.To) |
| outputs[fmt.Sprintf("i%v", i)] = nodeID(out.To) |
| } |
| |
| var spec *pb.FunctionSpec |
| switch edge.Edge.Op { |
| case graph.Impulse: |
| // TODO(herohde) 7/18/2018: Encode data? |
| spec = &pb.FunctionSpec{Urn: URNImpulse} |
| |
| case graph.ParDo: |
| si := make(map[string]*pb.SideInput) |
| for i, in := range edge.Edge.Input { |
| switch in.Kind { |
| case graph.Main: |
| // ignore: not a side input |
| |
| case graph.Singleton, graph.Slice, graph.Iter, graph.ReIter: |
| // The only supported form of side input is MultiMap, but we |
| // want just iteration. So we must manually add a fixed key, |
| // "", even if the input is already KV. |
| |
| out := fmt.Sprintf("%v_keyed%v_%v", nodeID(in.From), edgeID(edge.Edge), i) |
| m.makeNode(out, m.coders.Add(makeBytesKeyedCoder(in.From.Coder)), in.From) |
| |
| payload := &pb.ParDoPayload{ |
| DoFn: &pb.SdkFunctionSpec{ |
| Spec: &pb.FunctionSpec{ |
| Urn: URNIterableSideInputKey, |
| Payload: []byte(protox.MustEncodeBase64(&v1.TransformPayload{ |
| Urn: URNIterableSideInputKey, |
| })), |
| }, |
| EnvironmentId: m.addDefaultEnv(), |
| }, |
| } |
| |
| keyedID := fmt.Sprintf("%v_keyed%v", edgeID(edge.Edge), i) |
| keyed := &pb.PTransform{ |
| UniqueName: keyedID, |
| Spec: &pb.FunctionSpec{ |
| Urn: URNParDo, |
| Payload: protox.MustEncode(payload), |
| }, |
| Inputs: map[string]string{"i0": nodeID(in.From)}, |
| Outputs: map[string]string{"i0": out}, |
| } |
| m.transforms[keyedID] = keyed |
| |
| // Fixup input map |
| inputs[fmt.Sprintf("i%v", i)] = out |
| |
| si[fmt.Sprintf("i%v", i)] = &pb.SideInput{ |
| AccessPattern: &pb.FunctionSpec{ |
| Urn: URNMultimapSideInput, |
| }, |
| ViewFn: &pb.SdkFunctionSpec{ |
| Spec: &pb.FunctionSpec{ |
| Urn: "foo", |
| }, |
| EnvironmentId: m.addDefaultEnv(), |
| }, |
| WindowMappingFn: &pb.SdkFunctionSpec{ |
| Spec: &pb.FunctionSpec{ |
| Urn: "bar", |
| }, |
| EnvironmentId: m.addDefaultEnv(), |
| }, |
| } |
| |
| case graph.Map, graph.MultiMap: |
| panic("NYI") |
| |
| default: |
| panic(fmt.Sprintf("unexpected input kind: %v", edge)) |
| } |
| } |
| |
| payload := &pb.ParDoPayload{ |
| DoFn: &pb.SdkFunctionSpec{ |
| Spec: &pb.FunctionSpec{ |
| Urn: URNJavaDoFn, |
| Payload: []byte(mustEncodeMultiEdgeBase64(edge.Edge)), |
| }, |
| EnvironmentId: m.addDefaultEnv(), |
| }, |
| SideInputs: si, |
| } |
| spec = &pb.FunctionSpec{Urn: URNParDo, Payload: protox.MustEncode(payload)} |
| |
| case graph.Combine: |
| payload := &pb.ParDoPayload{ |
| DoFn: &pb.SdkFunctionSpec{ |
| Spec: &pb.FunctionSpec{ |
| Urn: URNJavaDoFn, |
| Payload: []byte(mustEncodeMultiEdgeBase64(edge.Edge)), |
| }, |
| EnvironmentId: m.addDefaultEnv(), |
| }, |
| } |
| spec = &pb.FunctionSpec{Urn: URNParDo, Payload: protox.MustEncode(payload)} |
| |
| case graph.Flatten: |
| spec = &pb.FunctionSpec{Urn: URNFlatten} |
| |
| case graph.CoGBK: |
| spec = &pb.FunctionSpec{Urn: URNGBK} |
| |
| case graph.WindowInto: |
| payload := &pb.WindowIntoPayload{ |
| WindowFn: &pb.SdkFunctionSpec{ |
| Spec: makeWindowFn(edge.Edge.WindowFn), |
| }, |
| } |
| spec = &pb.FunctionSpec{Urn: URNWindow, Payload: protox.MustEncode(payload)} |
| |
| case graph.External: |
| spec = &pb.FunctionSpec{Urn: edge.Edge.Payload.URN, Payload: edge.Edge.Payload.Data} |
| |
| default: |
| panic(fmt.Sprintf("Unexpected opcode: %v", edge.Edge.Op)) |
| } |
| |
| transform := &pb.PTransform{ |
| UniqueName: edge.Name, |
| Spec: spec, |
| Inputs: inputs, |
| Outputs: outputs, |
| } |
| m.transforms[id] = transform |
| return id |
| } |
| |
| func (m *marshaller) expandCoGBK(edge NamedEdge) string { |
| // TODO(BEAM-490): replace once CoGBK is a primitive. For now, we have to translate |
| // CoGBK with multiple PCollections as described in cogbk.go. |
| |
| id := edgeID(edge.Edge) |
| kvCoderID := m.coders.Add(MakeKVUnionCoder(edge.Edge)) |
| gbkCoderID := m.coders.Add(MakeGBKUnionCoder(edge.Edge)) |
| |
| var subtransforms []string |
| |
| inputs := make(map[string]string) |
| for i, in := range edge.Edge.Input { |
| m.addNode(in.From) |
| |
| out := fmt.Sprintf("%v_%v_inject%v", nodeID(in.From), id, i) |
| m.makeNode(out, kvCoderID, in.From) |
| |
| // Inject(i) |
| |
| injectID := fmt.Sprintf("%v_inject%v", id, i) |
| payload := &pb.ParDoPayload{ |
| DoFn: &pb.SdkFunctionSpec{ |
| Spec: &pb.FunctionSpec{ |
| Urn: URNInject, |
| Payload: []byte(protox.MustEncodeBase64(&v1.TransformPayload{ |
| Urn: URNInject, |
| Inject: &v1.InjectPayload{N: (int32)(i)}, |
| })), |
| }, |
| EnvironmentId: m.addDefaultEnv(), |
| }, |
| } |
| inject := &pb.PTransform{ |
| UniqueName: injectID, |
| Spec: &pb.FunctionSpec{ |
| Urn: URNParDo, |
| Payload: protox.MustEncode(payload), |
| }, |
| Inputs: map[string]string{"i0": nodeID(in.From)}, |
| Outputs: map[string]string{"i0": out}, |
| } |
| m.transforms[injectID] = inject |
| subtransforms = append(subtransforms, injectID) |
| |
| inputs[fmt.Sprintf("i%v", i)] = out |
| } |
| |
| outNode := edge.Edge.Output[0].To |
| |
| // Flatten |
| |
| out := fmt.Sprintf("%v_flatten", nodeID(outNode)) |
| m.makeNode(out, kvCoderID, outNode) |
| |
| flattenID := fmt.Sprintf("%v_flatten", id) |
| flatten := &pb.PTransform{ |
| UniqueName: flattenID, |
| Spec: &pb.FunctionSpec{Urn: URNFlatten}, |
| Inputs: inputs, |
| Outputs: map[string]string{"i0": out}, |
| } |
| m.transforms[flattenID] = flatten |
| subtransforms = append(subtransforms, flattenID) |
| |
| // CoGBK |
| |
| gbkOut := fmt.Sprintf("%v_out", nodeID(outNode)) |
| m.makeNode(gbkOut, gbkCoderID, outNode) |
| |
| gbkID := fmt.Sprintf("%v_gbk", id) |
| gbk := &pb.PTransform{ |
| UniqueName: gbkID, |
| Spec: &pb.FunctionSpec{Urn: URNGBK}, |
| Inputs: map[string]string{"i0": out}, |
| Outputs: map[string]string{"i0": gbkOut}, |
| } |
| m.transforms[gbkID] = gbk |
| subtransforms = append(subtransforms, gbkID) |
| |
| // Expand |
| |
| m.addNode(outNode) |
| |
| expandID := fmt.Sprintf("%v_expand", id) |
| payload := &pb.ParDoPayload{ |
| DoFn: &pb.SdkFunctionSpec{ |
| Spec: &pb.FunctionSpec{ |
| Urn: URNExpand, |
| Payload: []byte(protox.MustEncodeBase64(&v1.TransformPayload{ |
| Urn: URNExpand, |
| })), |
| }, |
| EnvironmentId: m.addDefaultEnv(), |
| }, |
| } |
| expand := &pb.PTransform{ |
| UniqueName: expandID, |
| Spec: &pb.FunctionSpec{ |
| Urn: URNParDo, |
| Payload: protox.MustEncode(payload), |
| }, |
| Inputs: map[string]string{"i0": gbkOut}, |
| Outputs: map[string]string{"i0": nodeID(outNode)}, |
| } |
| m.transforms[id] = expand |
| subtransforms = append(subtransforms, id) |
| |
| // Add composite for visualization |
| |
| cogbkID := fmt.Sprintf("%v_cogbk", id) |
| m.transforms[cogbkID] = &pb.PTransform{ |
| UniqueName: edge.Name, |
| Subtransforms: subtransforms, |
| } |
| return cogbkID |
| } |
| |
| func (m *marshaller) addNode(n *graph.Node) string { |
| id := nodeID(n) |
| if _, exists := m.pcollections[id]; exists { |
| return id |
| } |
| // TODO(herohde) 11/15/2017: expose UniqueName to user. |
| return m.makeNode(id, m.coders.Add(n.Coder), n) |
| } |
| |
| func (m *marshaller) makeNode(id, cid string, n *graph.Node) string { |
| col := &pb.PCollection{ |
| UniqueName: id, |
| CoderId: cid, |
| IsBounded: boolToBounded(n.Bounded()), |
| WindowingStrategyId: m.addWindowingStrategy(n.WindowingStrategy()), |
| } |
| m.pcollections[id] = col |
| return id |
| } |
| |
| func boolToBounded(bounded bool) pb.IsBounded_Enum { |
| if bounded { |
| return pb.IsBounded_BOUNDED |
| } |
| return pb.IsBounded_UNBOUNDED |
| } |
| |
| func (m *marshaller) addDefaultEnv() string { |
| const id = "go" |
| if _, exists := m.environments[id]; !exists { |
| m.environments[id] = &m.opt.Environment |
| } |
| return id |
| } |
| |
| func (m *marshaller) addWindowingStrategy(w *window.WindowingStrategy) string { |
| ws := marshalWindowingStrategy(m.coders, w) |
| return m.internWindowingStrategy(ws) |
| } |
| |
| func (m *marshaller) internWindowingStrategy(w *pb.WindowingStrategy) string { |
| key := proto.MarshalTextString(w) |
| if id, exists := m.windowing2id[key]; exists { |
| return id |
| } |
| |
| id := fmt.Sprintf("w%v", len(m.windowing2id)) |
| m.windowing2id[key] = id |
| m.windowing[id] = w |
| return id |
| } |
| |
| // marshalWindowingStrategy marshals the given windowing strategy in |
| // the given coder context. |
| func marshalWindowingStrategy(c *CoderMarshaller, w *window.WindowingStrategy) *pb.WindowingStrategy { |
| ws := &pb.WindowingStrategy{ |
| WindowFn: &pb.SdkFunctionSpec{ |
| Spec: makeWindowFn(w.Fn), |
| }, |
| MergeStatus: pb.MergeStatus_NON_MERGING, |
| AccumulationMode: pb.AccumulationMode_DISCARDING, |
| WindowCoderId: c.AddWindowCoder(makeWindowCoder(w.Fn)), |
| Trigger: &pb.Trigger{ |
| Trigger: &pb.Trigger_Default_{ |
| Default: &pb.Trigger_Default{}, |
| }, |
| }, |
| OutputTime: pb.OutputTime_END_OF_WINDOW, |
| ClosingBehavior: pb.ClosingBehavior_EMIT_IF_NONEMPTY, |
| AllowedLateness: 0, |
| OnTimeBehavior: pb.OnTimeBehavior_FIRE_ALWAYS, |
| } |
| return ws |
| } |
| |
| func makeWindowFn(w *window.Fn) *pb.FunctionSpec { |
| switch w.Kind { |
| case window.GlobalWindows: |
| return &pb.FunctionSpec{ |
| Urn: URNGlobalWindowsWindowFn, |
| } |
| case window.FixedWindows: |
| return &pb.FunctionSpec{ |
| Urn: URNFixedWindowsWindowFn, |
| Payload: protox.MustEncode( |
| &pb.FixedWindowsPayload{ |
| Size: ptypes.DurationProto(w.Size), |
| }, |
| ), |
| } |
| case window.SlidingWindows: |
| return &pb.FunctionSpec{ |
| Urn: URNSlidingWindowsWindowFn, |
| Payload: protox.MustEncode( |
| &pb.SlidingWindowsPayload{ |
| Size: ptypes.DurationProto(w.Size), |
| Period: ptypes.DurationProto(w.Period), |
| }, |
| ), |
| } |
| case window.Sessions: |
| return &pb.FunctionSpec{ |
| Urn: URNSessionsWindowFn, |
| Payload: protox.MustEncode( |
| &pb.SessionsPayload{ |
| GapSize: ptypes.DurationProto(w.Gap), |
| }, |
| ), |
| } |
| default: |
| panic(fmt.Sprintf("Unexpected windowing strategy: %v", w)) |
| } |
| } |
| |
| func makeWindowCoder(w *window.Fn) *coder.WindowCoder { |
| switch w.Kind { |
| case window.GlobalWindows: |
| return coder.NewGlobalWindow() |
| case window.FixedWindows, window.SlidingWindows, URNSlidingWindowsWindowFn: |
| return coder.NewIntervalWindow() |
| default: |
| panic(fmt.Sprintf("Unexpected windowing strategy: %v", w)) |
| } |
| } |
| |
| func mustEncodeMultiEdgeBase64(edge *graph.MultiEdge) string { |
| ref, err := EncodeMultiEdge(edge) |
| if err != nil { |
| panic(errors.Wrapf(err, "Failed to serialize %v", edge)) |
| } |
| return protox.MustEncodeBase64(&v1.TransformPayload{ |
| Urn: URNDoFn, |
| Edge: ref, |
| }) |
| } |
| |
| // makeBytesKeyedCoder returns KV<[]byte,A,> for any coder, |
| // even if the coder is already a KV coder. |
| func makeBytesKeyedCoder(c *coder.Coder) *coder.Coder { |
| return coder.NewKV([]*coder.Coder{coder.NewBytes(), c}) |
| } |
| |
| func edgeID(edge *graph.MultiEdge) string { |
| return fmt.Sprintf("e%v", edge.ID()) |
| } |
| |
| func nodeID(n *graph.Node) string { |
| return fmt.Sprintf("n%v", n.ID()) |
| } |
| |
| func scopeID(s *graph.Scope) string { |
| return fmt.Sprintf("s%v", s.ID()) |
| } |