blob: f8f615d818eb451f42941e07fa234d93d04647c3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import Foundation
public extension Data {
mutating func next(_ schema: Schema) throws -> FieldValue {
try .from(data: &self, as: .row(schema))!
}
}
public extension FieldValue {
static func from(data: inout Data, as type: FieldType) throws -> FieldValue? {
switch type {
case .unspecified:
return .undefined
case .byte, .int16, .int32, .int64:
return try .int(data.varint(), type)
case .float:
return try .float(Double(data.next(Float.self)), type)
case .double:
return try .float(data.next(Double.self), type)
case .string:
return try .string(String(data: data.subdata(), encoding: .utf8)!)
case .datetime:
return try .datetime(data.instant())
case .boolean:
return try .boolean(data.next(UInt8.self) == 1)
case .bytes:
return try .bytes(data.subdata())
case .decimal:
throw ApacheBeamError.runtimeError("Decimal types not yet implemented")
case .logical:
throw ApacheBeamError.runtimeError("Logical Types not yet supported")
case let .row(schema):
let count = try data.varint()
// Consume the null byte pattern
let nullBytes = Int(ceil(Double(count) / 8.0))
let nullBits = try data.subdata()
data = data.advanced(by: nullBytes)
var fields: [FieldValue] = (0 ..< count).map { i in
let byte = Int(floor(Double(i) / 8.0))
let bit = i % 8
return (Int(nullBits[byte]) & bit) > 0 ? .null : .undefined
}
for field in schema.fields {
// TODO: Null check
try fields.append(.from(data: &data, as: field.type)!)
}
return .row(schema, fields)
case let .nullable(baseType):
return try .from(data: &data, as: baseType)
case .array:
let count = try Int(data.next(Int32.self))
if count < 1 {
throw ApacheBeamError.runtimeError("Indeterminate length not implement yet.")
}
return try .array((0 ..< count).map { _ in try .from(data: &data, as: type)! })
case let .repeated(type):
let count = try Int(data.next(Int32.self))
if count < 1 {
throw ApacheBeamError.runtimeError("Indeterminate length not implement yet.")
}
return try .repeated((0 ..< count).map { _ in try .from(data: &data, as: type)! })
case let .map(key, value):
let count = try Int(data.next(UInt32.self))
return try .map((0 ..< count).map { _ in
try (.from(data: &data, as: key)!, .from(data: &data, as: value)!)
})
}
}
}