import Dispatch
import Foundation
import OAuth2
struct ListFilesResponse: Codable {
struct Item: Codable {
let kind: String
let selfLink: String
let mediaLink: String
let name: String
let bucket: String
let size: String
let kind: String
let items: [Item]
public struct GoogleStorage: FileIOSource {
public static func readFiles(matching: PCollection<KV<String, String>>) -> PCollection<Data> {
matching.pstream(type: .bounded) { matching, output in
guard let tokenProvider = DefaultTokenProvider(scopes: ["storage.objects.get"]) else {
throw ApacheBeamError.runtimeError("Unable to get OAuth2 token.")
let connection = Connection(provider: tokenProvider)
for await (file, ts, w) in matching {
let bucket = file.key
for name in file.values {
let url = "\(bucket)/o/\(name.addingPercentEncoding(withAllowedCharacters: .urlHostAllowed)!)"
let response: Data? = try await withCheckedThrowingContinuation { continuation in
do {
try connection.performRequest(method: "GET",
urlString: url,
parameters: ["alt": "media"], body: nil)
data, _, error in
if let e = error {
continuation.resume(throwing: e)
} else {
continuation.resume(returning: data)
} catch {
continuation.resume(throwing: error)
if let d = response {
output.emit(d, timestamp: ts, window: w)
public static func listFiles(matching: PCollection<KV<String, String>>) -> PCollection<KV<String, String>> {
matching.pstream(type: .bounded) { matching, output in
guard let tokenProvider = DefaultTokenProvider(scopes: ["storage.objects.list"]) else {
throw ApacheBeamError.runtimeError("Unable to get OAuth2 token.")
let connection = Connection(provider: tokenProvider)
for await (match, ts, w) in matching {
let bucket = match.key
for prefix in match.values {
let response: Data? = try await withCheckedThrowingContinuation { continuation in
do {
try connection.performRequest(
method: "GET",
urlString: "\(bucket)/o",
parameters: ["prefix": prefix],
body: nil
) { data, _, error in
if let e = error {
continuation.resume(throwing: e)
} else {
continuation.resume(returning: data)
} catch {
continuation.resume(throwing: error)
if let data = response {
let listfiles = try JSONDecoder().decode(ListFilesResponse.self, from: data)
for item in listfiles.items {
if item.size != "0" {
output.emit(KV(item.bucket,, timestamp: ts, window: w)