blob: 0ee82f085c03b1e996cbd2e96018f699b03bbb97 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import Foundation
/// This extension contains all of the decoding implementation. File separation is for clarity.
public extension Coder {
/// Decodes a raw data block into a BeamValue for further processing
func decode(_ data: inout Data) throws -> BeamValue {
switch self {
// Scalar values check for size 0 input data and return null if that's a problem
case .bytes:
return try .bytes(data.count == 0 ? Data() : data.subdata())
case .string:
return try .string(data.count == 0 ? "" : String(data: data.subdata(), encoding: .utf8))
case .varint:
return try .integer(data.count == 0 ? nil : data.varint())
case .fixedint:
return try .integer(data.count == 0 ? nil : data.next(Int.self))
case .byte:
return try .integer(data.count == 0 ? nil : Int(data.next(UInt8.self)))
case .boolean:
return try .boolean(data.count == 0 ? nil : data.next(UInt8.self) != 0)
case .double:
return try .double(data.count == 0 ? nil : data.next(Double.self))
case .globalwindow:
return .window(.global)
case let .lengthprefix(coder): // Length prefix basically serves to make values nullable
var subdata = try data.subdata()
return try coder.decode(&subdata)
case let .keyvalue(keyCoder, valueCoder):
return try .kv(keyCoder.decode(&data), valueCoder.decode(&data))
case let .iterable(coder):
let length = try data.next(Int32.self)
return try .array((0 ..< length).map { _ in try coder.decode(&data) })
case let .windowedvalue(valueCoder, windowCoder):
// This will be big endian to match java
let timestamp = try data.instant()
let windowCount = try data.next(Int32.self)
if windowCount > 1 {
throw ApacheBeamError.runtimeError("Windowed values with > 1 window not yet supported")
}
let window = try windowCoder.decode(&data)
// TODO: Actually handle pane info
let pane = try data.next(UInt8.self)
switch (pane >> 4) & 0x0F {
case 0x0:
break
case 0x1:
_ = try data.varint()
case 0x2:
_ = try data.varint()
_ = try data.varint()
default:
throw ApacheBeamError.runtimeError("Invalid pane encoding \(String(pane, radix: 2))")
}
return try .windowed(valueCoder.decode(&data), timestamp, pane, window)
case let .row(schema):
return try .row(.from(data: &data, as: .row(schema))!)
default:
throw ApacheBeamError.runtimeError("Decoding of \(urn) coders not supported.")
}
}
}