Support string FQN as a way to add lineage information (#32613)
* Support string FQN as a way to add lineage information
* clarify the two use case of lineage.add
diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py
index 065a497..f402c0a 100644
--- a/sdks/python/apache_beam/metrics/metric.py
+++ b/sdks/python/apache_beam/metrics/metric.py
@@ -370,7 +370,31 @@
def add(
self, system: str, *segments: str, subtype: Optional[str] = None) -> None:
- self.metric.add(self.get_fq_name(system, *segments, subtype=subtype))
+ """
+ Adds the given details as Lineage.
+
+ For asset level lineage the resource location should be specified as
+ Dataplex FQN, see
+ https://cloud.google.com/data-catalog/docs/fully-qualified-names
+
+ Example of adding FQN components:
+
+ - `add("system", "segment1", "segment2")`
+ - `add("system", "segment1", "segment2", subtype="subtype")`
+
+ Example of adding a FQN:
+
+ - `add("system:segment1.segment2")`
+ - `add("system:subtype:segment1.segment2")`
+
+ The first positional argument serves as system, if full segments are
+ provided, or the full FQN if it is provided as a single argument.
+ """
+ system_or_details = system
+ if len(segments) == 0 and subtype is None:
+ self.metric.add(system_or_details)
+ else:
+ self.metric.add(self.get_fq_name(system, *segments, subtype=subtype))
@staticmethod
def query(results: MetricResults, label: str) -> Set[str]:
diff --git a/sdks/python/apache_beam/metrics/metric_test.py b/sdks/python/apache_beam/metrics/metric_test.py
index 1c6a11e..524a214 100644
--- a/sdks/python/apache_beam/metrics/metric_test.py
+++ b/sdks/python/apache_beam/metrics/metric_test.py
@@ -269,6 +269,17 @@
"apache:beam:" + v + '.' + v,
Lineage.get_fq_name("apache", k, k, subtype="beam"))
+ def test_add(self):
+ lineage = Lineage(Lineage.SOURCE)
+ stringset = set()
+ # override
+ lineage.metric = stringset
+ lineage.add("s", "1", "2")
+ lineage.add("s:3.4")
+ lineage.add("s", "5", "6.7")
+ lineage.add("s", "1", "2", subtype="t")
+ self.assertSetEqual(stringset, {"s:1.2", "s:3.4", "s:t:1.2", "s:5.`6.7`"})
+
if __name__ == '__main__':
unittest.main()