blob: 8ee73d25c487a444569720fb9a28abf22e643354 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//
// FileIOTests.swift
//
//
// Created by Byron Ellis on 8/22/23.
//
import ApacheBeam
import Logging
import XCTest
final class FileIOTests: XCTestCase {
override func setUpWithError() throws {}
override func tearDownWithError() throws {}
func testGoogleStorageListFiles() async throws {
throw XCTSkip()
try await PCollectionTest(PCollection<KV<String, String>>().listFiles(in: GoogleStorage.self)) { log, inputs, outputs in
log.info("Sending value")
try inputs[0].emit(value: KV("dataflow-samples", "shakespeare"))
log.info("Value sent")
inputs[0].finish()
for try await (output, _, _) in outputs[0] {
log.info("Output: \(output)")
}
}.run()
}
func testGoogleStorageReadFiles() async throws {
throw XCTSkip()
try await PCollectionTest(PCollection<KV<String, String>>().readFiles(in: GoogleStorage.self)) { log, inputs, outputs in
log.info("Sending value")
try inputs[0].emit(value: KV("dataflow-samples", "shakespeare/asyoulikeit.txt"))
log.info("Value sent")
inputs[0].finish()
for try await (output, _, _) in outputs[0] {
log.info("Output: \(String(data: output as! Data, encoding: .utf8)!)")
}
}.run()
}
func testShakespeareWordcount() async throws {
throw XCTSkip()
try await Pipeline { pipeline in
let contents = pipeline
.create(["dataflow-samples/shakespeare"])
.map { value in
let parts = value.split(separator: "/", maxSplits: 1)
print("Got filename \(parts) from \(value)")
return KV(parts[0].lowercased(), parts[1].lowercased())
}
.listFiles(in: GoogleStorage.self)
.readFiles(in: GoogleStorage.self)
// Simple ParDo that takes advantage of enumerateLines. No name to test name generation of pardos
let lines = contents.pstream { contents, lines in
for await (content, ts, w) in contents {
String(data: content, encoding: .utf8)!.enumerateLines { line, _ in
lines.emit(line, timestamp: ts, window: w)
}
}
}
// Our first group by operation
let baseCount = lines
.flatMap { (line: String) in line.components(separatedBy: .whitespaces) }
.groupBy { ($0, 1) }
.sum()
let normalizedCounts = baseCount.groupBy {
($0.key.lowercased().trimmingCharacters(in: .punctuationCharacters),
$0.value ?? 1)
}.sum()
normalizedCounts.log(prefix: "COUNT OUTPUT")
}.run(PortableRunner(loopback: true))
}
}