blob: d77822a28f2d9536e4ad760b9eaf9fea8ccf6c1b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <pulsar/Schema.h>
#include <pulsar/defines.h>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
#include <iostream>
#include <map>
#include <memory>
#include "SchemaUtils.h"
using boost::property_tree::ptree;
using boost::property_tree::read_json;
using boost::property_tree::write_json;
PULSAR_PUBLIC std::ostream &operator<<(std::ostream &s, pulsar::SchemaType schemaType) {
return s << strSchemaType(schemaType);
}
PULSAR_PUBLIC std::ostream &operator<<(std::ostream &s, pulsar::KeyValueEncodingType encodingType) {
return s << strEncodingType(encodingType);
}
namespace pulsar {
PULSAR_PUBLIC const char *strEncodingType(KeyValueEncodingType encodingType) {
switch (encodingType) {
case KeyValueEncodingType::INLINE:
return "INLINE";
case KeyValueEncodingType::SEPARATED:
return "SEPARATED";
};
// NOTE : Do not add default case in the switch above. In future if we get new cases for
// Schema and miss them in the switch above we would like to get notified. Adding
// return here to make the compiler happy.
return "UnknownSchemaType";
}
PULSAR_PUBLIC KeyValueEncodingType enumEncodingType(std::string encodingTypeStr) {
if (encodingTypeStr == "INLINE") {
return KeyValueEncodingType::INLINE;
} else if (encodingTypeStr == "SEPARATED") {
return KeyValueEncodingType::SEPARATED;
} else {
throw std::invalid_argument("No match encoding type: " + encodingTypeStr);
}
}
PULSAR_PUBLIC const char *strSchemaType(SchemaType schemaType) {
switch (schemaType) {
case NONE:
return "NONE";
case STRING:
return "STRING";
case INT8:
return "INT8";
case INT16:
return "INT16";
case INT32:
return "INT32";
case INT64:
return "INT64";
case FLOAT:
return "FLOAT";
case DOUBLE:
return "DOUBLE";
case BYTES:
return "BYTES";
case JSON:
return "JSON";
case PROTOBUF:
return "PROTOBUF";
case AVRO:
return "AVRO";
case AUTO_CONSUME:
return "AUTO_CONSUME";
case AUTO_PUBLISH:
return "AUTO_PUBLISH";
case KEY_VALUE:
return "KEY_VALUE";
case PROTOBUF_NATIVE:
return "PROTOBUF_NATIVE";
};
// NOTE : Do not add default case in the switch above. In future if we get new cases for
// Schema and miss them in the switch above we would like to get notified. Adding
// return here to make the compiler happy.
return "UnknownSchemaType";
}
PULSAR_PUBLIC SchemaType enumSchemaType(std::string schemaTypeStr) {
if (schemaTypeStr == "NONE") {
return NONE;
} else if (schemaTypeStr == "STRING") {
return STRING;
} else if (schemaTypeStr == "INT8") {
return INT8;
} else if (schemaTypeStr == "INT16") {
return INT16;
} else if (schemaTypeStr == "INT32") {
return INT32;
} else if (schemaTypeStr == "INT64") {
return INT64;
} else if (schemaTypeStr == "FLOAT") {
return FLOAT;
} else if (schemaTypeStr == "DOUBLE") {
return DOUBLE;
} else if (schemaTypeStr == "BYTES") {
return BYTES;
} else if (schemaTypeStr == "JSON") {
return JSON;
} else if (schemaTypeStr == "PROTOBUF") {
return PROTOBUF;
} else if (schemaTypeStr == "AVRO") {
return AVRO;
} else if (schemaTypeStr == "AUTO_CONSUME") {
return AUTO_CONSUME;
} else if (schemaTypeStr == "AUTO_PUBLISH") {
return AUTO_PUBLISH;
} else if (schemaTypeStr == "KEY_VALUE") {
return KEY_VALUE;
} else if (schemaTypeStr == "PROTOBUF_NATIVE") {
return PROTOBUF_NATIVE;
} else {
throw std::invalid_argument("No match schema type: " + schemaTypeStr);
}
}
class PULSAR_PUBLIC SchemaInfoImpl {
public:
const std::string name_;
const std::string schema_;
const SchemaType type_ = BYTES;
const std::map<std::string, std::string> properties_;
SchemaInfoImpl() : name_("BYTES") {}
SchemaInfoImpl(SchemaType schemaType, const std::string &name, const std::string &schema,
const StringMap &properties)
: name_(name), schema_(schema), type_(schemaType), properties_(properties) {}
};
SchemaInfo::SchemaInfo() : impl_(std::make_shared<SchemaInfoImpl>()) {}
SchemaInfo::SchemaInfo(SchemaType schemaType, const std::string &name, const std::string &schema,
const StringMap &properties)
: impl_(std::make_shared<SchemaInfoImpl>(schemaType, name, schema, properties)) {}
SchemaInfo::SchemaInfo(const SchemaInfo &keySchema, const SchemaInfo &valueSchema,
const KeyValueEncodingType &keyValueEncodingType) {
auto writeJson = [](const StringMap &properties) {
ptree pt;
for (auto &entry : properties) {
pt.put(entry.first, entry.second);
}
std::ostringstream buf;
write_json(buf, pt, false);
auto s = buf.str();
s.pop_back();
return s;
};
StringMap properties;
properties.emplace(KEY_SCHEMA_NAME, keySchema.getName());
properties.emplace(KEY_SCHEMA_TYPE, strSchemaType(keySchema.getSchemaType()));
properties.emplace(KEY_SCHEMA_PROPS, writeJson(keySchema.getProperties()));
properties.emplace(VALUE_SCHEMA_NAME, valueSchema.getName());
properties.emplace(VALUE_SCHEMA_TYPE, strSchemaType(valueSchema.getSchemaType()));
properties.emplace(VALUE_SCHEMA_PROPS, writeJson(valueSchema.getProperties()));
properties.emplace(KV_ENCODING_TYPE, strEncodingType(keyValueEncodingType));
std::string keySchemaStr = keySchema.getSchema();
std::string valueSchemaStr = valueSchema.getSchema();
impl_ = std::make_shared<SchemaInfoImpl>(KEY_VALUE, "KeyValue",
mergeKeyValueSchema(keySchemaStr, valueSchemaStr), properties);
}
SchemaType SchemaInfo::getSchemaType() const { return impl_->type_; }
const std::string &SchemaInfo::getName() const { return impl_->name_; }
const std::string &SchemaInfo::getSchema() const { return impl_->schema_; }
const std::map<std::string, std::string> &SchemaInfo::getProperties() const { return impl_->properties_; }
} // namespace pulsar