blob: 05411c332f9fb5d00fa5e8ba463c9d161f653715 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
pipelines:
- pipeline:
type: chain
transforms:
- type: Create
config:
elements:
- {a: "x", b: 1, c: 1.0}
- {a: "y", b: 2, c: 0.5}
- {a: "z", b: 3, c: 0.25}
- {a: "z", b: 10, c: -1.5}
- type: Filter
config:
language: java
keep:
expression: c > 0
- type: MapToFields
config:
language: java
append: true
drop: a
fields:
seq:
callable: |
import java.util.function.Function;
import org.apache.beam.sdk.values.Row;
import java.util.List;
import java.util.ArrayList;
public class Seq implements Function<Row, List<String>> {
public List<String> apply(Row row) {
String base = row.getString("a");
List<String> result = new ArrayList<>();
String s = "";
for (int i = 0; i < row.getInt64("b"); i++) {
s += base;
result.add(s);
}
return result;
}
}
- type: Explode
config:
fields: [seq]
- type: AssertEqual
config:
elements:
- {b: 1, c: 1.0, seq: x}
- {b: 2, c: 0.5, seq: y}
- {b: 2, c: 0.5, seq: yy}
- {b: 3, c: .25, seq: z}
- {b: 3, c: .25, seq: zz}
- {b: 3, c: .25, seq: zzz}