In the current version, pulsar-presto deserialize fields using SchemaHandler, but this causes the following restrictions :
RecordCursor.getObject()
to support ROW,MAP,ARRAY .etcThe motivations of this pull request :
PulsarDispatchingRowDecoderFactory
create PulsarRowDecoderFactory
by SchemaInfo.SchemaType
, PulsarRowDecoderFactory
extract ColumnMetadata
and create RowDecoder
by SchemaInfo , PulsarRowDecoder
decode pulsar ByteBuf to Map<DecoderColumnHandle, FieldValueProvider> depend on ColumnDecoder
,FieldValueProvider
Implementor prepare method getXXX() for presto runtime code-generation。
PulsarDispatchingRowDecoderFactory is a factory of PulsarRowDecoderFactory based SchemaInfo.SchemaType, all pulsar-presto core modules interact with the decoder by this interface.
public PulsarRowDecoder createRowDecoder(TopicName topicName, SchemaInfo schemaInfo, Set<DecoderColumnHandle> columns) public List<ColumnMetadata> extractColumnMetadata(TopicName topicName, SchemaInfo schemaInfo, PulsarColumnHandle.HandleKeyValueType handleKeyValueType)
PulsarRowDecoderFactory is a factory to extract ColumnMetadata and create RowDecoder by SchemaInfo.
// extract ColumnMetadata from pulsar SchemaInfo and HandleKeyValueType public List<ColumnMetadata> extractColumnMetadata(TopicName topicName, SchemaInfo schemaInfo, PulsarColumnHandle.HandleKeyValueType handleKeyValueType); // createRowDecoder RowDecoder by pulsar SchemaInfo and column DecoderColumnHandles public PulsarRowDecoder createRowDecoder(TopicName topicName, SchemaInfo schemaInfo, Set<DecoderColumnHandle> columns);
PulsarRowDecoder is the interface decode pulsar ByteBuf to Map<DecoderColumnHandle, FieldValueProvider> depend on ColumnDecoder.
//decode byteBuf to Map<DecoderColumnHandle, FieldValueProvider> public Optional<Map<DecoderColumnHandle, FieldValueProvider>> decodeRow(ByteBuf byteBuf);
ColumnDecoder is the factory create FieldValueProviders by row meta , FieldValueProviders can prepare method getBoolean()/getLong()/getDouble()/getSlice()/getBlock() for presto runtime code-generation. we do some extensions to support more type for backward compatible compare with presto original version :
Add separate decoder unit-tests to work with our puslar customized interface.
https://github.com/apache/pulsar/pull/8422
ProtobufNativeSchema