blob: b9c6ec7f7c82985dae18179ee55737bcddf6a62a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with this
// work for additional information regarding copyright ownership. The ASF
// licenses this file to You under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
package beam
import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
)
// UnnamedInput is a helper function for passing single unnamed inputs to
// beam.CrossLanguage.
//
// Example:
// beam.CrossLanguage(s, urn, payload, addr, UnnamedInput(input), outputs)
func UnnamedInput(col PCollection) map[string]PCollection {
return map[string]PCollection{graph.UnnamedInputTag: col}
}
// UnnamedOutput is a helper function for passing single unnamed output types to
// beam.CrossLanguage. The associated output can be accessed with beam.UnnamedOutputTag.
//
// Example:
// resultMap := beam.CrossLanguage(s, urn, payload, addr, inputs, UnnamedOutput(output));
// result := resultMap[beam.UnnamedOutputTag()]
func UnnamedOutput(t FullType) map[string]FullType {
return map[string]FullType{graph.UnnamedOutputTag: t}
}
// UnnamedOutputTag provides the output tag used for an output passed to beam.UnnamedOutput.
// Needed to retrieve the unnamed output PCollection from the result of beam.CrossLanguage.
func UnnamedOutputTag() string {
return graph.UnnamedOutputTag
}
// CrossLanguagePayload encodes a native Go struct into a payload for cross-language transforms.
// payloads are []byte encoded ExternalConfigurationPayload protobufs. In order to fill the
// contents of the protobuf, the provided struct will be used to converted to a row encoded
// representation with an accompanying schema, so the input struct must be compatible with schemas.
//
// See https://beam.apache.org/documentation/programming-guide/#schemas for basic information on
// schemas, and pkg/beam/core/runtime/graphx/schema for details on schemas in the Go SDK.
//
// Example:
// type stringPayload struct {
// Data string
// }
// encodedPl := beam.CrossLanguagePayload(stringPayload{Data: "foo"})
func CrossLanguagePayload(pl interface{}) []byte {
bytes, err := xlangx.EncodeStructPayload(pl)
if err != nil {
panic(err)
}
return bytes
}
// CrossLanguage is a low-level transform for executing cross-language transforms written in other
// SDKs. Because this is low-level, it is recommended to use one of the higher-level IO-specific
// wrappers where available. These can be found in the pkg/beam/io/xlang subdirectory.
// CrossLanguage is useful for executing cross-language transforms which do not have any existing
// IO wrappers.
//
// Usage requires an address for an expansion service accessible during pipeline construction, a
// URN identifying the desired transform, an optional payload with configuration information, and
// input and output names. It outputs a map of named output PCollections.
//
// For more information on expansion services and other aspects of cross-language transforms in
// general, refer to the Beam programming guide: https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines
//
// Payload
//
// Payloads are configuration data that some cross-language transforms require for expansion.
// Consult the documentation of the transform in the source SDK to find out what payload data it
// requires. If no payload is required, pass in nil.
//
// CrossLanguage accepts payloads as a []byte containing an encoded ExternalConfigurationPayload
// protobuf. The helper function beam.CrossLanguagePayload is the recommended way to easily encode
// a standard Go struct for use as a payload.
//
// Inputs and Outputs
//
// Like most transforms, any input PCollections must be provided. Unlike most transforms, output
// types must be provided because Go cannot infer output types from external transforms.
//
// Inputs and outputs to a cross-language transform may be either named or unnamed. Named
// inputs/outputs are used when there are more than one input/output, and are provided as maps with
// names as keys. Unnamed inputs/outputs are used when there is only one, and a map can be quickly
// constructed with the UnnamedInput and UnnamedOutput methods.
//
// An example of defining named inputs and outputs:
//
// namedInputs := map[string]beam.PCollection{"pcol1": pcol1, "pcol2": pcol2}
// namedOutputTypes := map[string]typex.FullType{
// "main": typex.New(reflectx.String),
// "side": typex.New(reflectx.Int64),
// }
//
// CrossLanguage outputs a map of PCollections with associated names. These names will match those
// from provided named outputs. If the beam.UnnamedOutput method was used, the PCollection can be
// retrieved with beam.UnnamedOutputTag().
//
// An example of retrieving named outputs from a call to CrossLanguage:
//
// outputs := beam.CrossLanguage(...)
// mainPcol := outputs["main"]
// sidePcol := outputs["side"]
//
// Example
//
// This example shows using CrossLanguage to execute the Prefix cross-language transform using an
// expansion service running on localhost:8099. Prefix requires a payload containing a prefix to
// prepend to every input string.
//
// type prefixPayload struct {
// Data string
// }
// encodedPl := beam.CrossLanguagePayload(prefixPayload{Data: "foo"})
// urn := "beam:transforms:xlang:test:prefix"
// expansionAddr := "localhost:8099"
// outputType := beam.UnnamedOutput(typex.New(reflectx.String))
// input := beam.UnnamedInput(inputPcol)
// outs := beam.CrossLanguage(s, urn, encodedPl, expansionAddr, input, outputType)
// outPcol := outputs[beam.UnnamedOutputTag()]
func CrossLanguage(
s Scope,
urn string,
payload []byte,
expansionAddr string,
namedInputs map[string]PCollection,
namedOutputTypes map[string]FullType,
) map[string]PCollection {
if !s.IsValid() {
panic(errors.New("invalid scope"))
}
inputsMap, inboundLinks := graph.NamedInboundLinks(mapPCollectionToNode(namedInputs))
outputsMap, outboundLinks := graph.NamedOutboundLinks(s.real, namedOutputTypes)
ext := graph.ExternalTransform{
Urn: urn,
Payload: payload,
ExpansionAddr: expansionAddr,
}.WithNamedInputs(inputsMap).WithNamedOutputs(outputsMap)
namedOutputs, err := TryCrossLanguage(s, &ext, inboundLinks, outboundLinks)
if err != nil {
panic(errors.WithContextf(err, "tried cross-language and failed"))
}
return mapNodeToPCollection(namedOutputs)
}
// TryCrossLanguage coordinates the core functions required to execute the cross-language transform.
// This is mainly intended for internal use. For the general-use entry point, see
// beam.CrossLanguage.
func TryCrossLanguage(s Scope, ext *graph.ExternalTransform, ins []*graph.Inbound, outs []*graph.Outbound) (map[string]*graph.Node, error) {
// Adding an edge in the graph corresponding to the ExternalTransform
edge, isBoundedUpdater := graph.NewCrossLanguage(s.real, s.scope, ext, ins, outs)
// Once the appropriate input and output nodes are added to the edge, a
// unique namespace can be requested.
ext.Namespace = graph.NewNamespace()
// Expand the transform into ext.Expanded.
if err := xlangx.Expand(edge, ext); err != nil {
return nil, errors.WithContext(err, "expanding external transform")
}
// Ensures the expected named outputs are present
graphx.VerifyNamedOutputs(ext)
// Using the expanded outputs, the graph's counterpart outputs are updated with bounded values
graphx.ResolveOutputIsBounded(edge, isBoundedUpdater)
return graphx.ExternalOutputs(edge), nil
}
// Wrapper functions to handle beam <-> graph boundaries
func pCollectionToNode(p PCollection) *graph.Node {
if !p.IsValid() {
panic("tried converting invalid PCollection")
}
return p.n
}
func nodeToPCollection(n *graph.Node) PCollection {
if n == nil {
panic("tried converting invalid Node")
}
c := PCollection{n}
c.SetCoder(NewCoder(c.Type()))
return c
}
func mapPCollectionToNode(pMap map[string]PCollection) map[string]*graph.Node {
if pMap == nil {
return nil
}
nMap := make(map[string]*graph.Node)
for k, p := range pMap {
nMap[k] = pCollectionToNode(p)
}
return nMap
}
func mapNodeToPCollection(nMap map[string]*graph.Node) map[string]PCollection {
if nMap == nil {
return nil
}
pMap := make(map[string]PCollection)
for k, n := range nMap {
pMap[k] = nodeToPCollection(n)
}
return pMap
}