blob: ab57079c55b1965c9d81f26bdd702f4f3519dbfe [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import json
from iceberg.api import PartitionSpec
class PartitionSpecParser(object):
SPEC_ID = "spec-id"
FIELDS = "fields"
SOURCE_ID = "source-id"
FIELD_ID = "field-id"
TRANSFORM = "transform"
NAME = "name"
@staticmethod
def to_json(spec, indent=None):
return json.dumps(PartitionSpecParser.to_dict(spec), indent=indent)
@staticmethod
def to_dict(spec):
return {PartitionSpecParser.SPEC_ID: spec.spec_id,
PartitionSpecParser.FIELDS: PartitionSpecParser.to_json_fields(spec)}
@staticmethod
def to_json_fields(spec):
return [{PartitionSpecParser.NAME: field.name,
PartitionSpecParser.TRANSFORM: str(field.transform),
PartitionSpecParser.SOURCE_ID: field.source_id,
PartitionSpecParser.FIELD_ID: field.field_id}
for field in spec.fields]
@staticmethod
def from_json(schema, json_obj):
if isinstance(json_obj, str):
json_obj = json.loads(json_obj)
if not isinstance(json_obj, dict):
raise RuntimeError("Cannot parse partition spec, not an object: %s" % json_obj)
spec_id = json_obj.get(PartitionSpecParser.SPEC_ID)
builder = PartitionSpec.builder_for(schema).with_spec_id(spec_id)
fields = json_obj.get(PartitionSpecParser.FIELDS)
return PartitionSpecParser.__build_from_json_fields(builder, fields)
@staticmethod
def from_json_fields(schema, spec_id, json_obj):
builder = PartitionSpec.builder_for(schema).with_spec_id(spec_id)
if isinstance(json_obj, str):
json_obj = json.loads(json_obj)
return PartitionSpecParser.__build_from_json_fields(builder, json_obj)
@staticmethod
def __build_from_json_fields(builder, json_fields):
if not isinstance(json_fields, (list, tuple)):
raise RuntimeError("Cannot parse partition spec fields, not an array: %s" % json_fields)
field_id_count = 0
for element in json_fields:
if not isinstance(element, dict):
raise RuntimeError("Cannot parse partition field, not an object: %s" % element)
if element.get(PartitionSpecParser.FIELD_ID) is not None:
builder.add(element.get(PartitionSpecParser.SOURCE_ID),
element.get(PartitionSpecParser.FIELD_ID),
element.get(PartitionSpecParser.NAME),
element.get(PartitionSpecParser.TRANSFORM))
field_id_count = field_id_count + 1
else:
builder.add_without_field_id(element.get(PartitionSpecParser.SOURCE_ID),
element.get(PartitionSpecParser.NAME),
element.get(PartitionSpecParser.TRANSFORM))
if field_id_count > 0 and field_id_count != len(json_fields):
raise RuntimeError("Cannot parse spec with missing field IDs: %s missing of %s fields." %
(len(json_fields) - field_id_count, len(json_fields)))
return builder.build()