parse(schema, [opts])
schema
{Object|String} An Avro schema, represented by one of:'["null", "int"]'
).'./Schema.avsc'
).{type: 'array', items: 'int'}
).opts
{Object} Parsing options. The following keys are currently supported:namespace
{String} Optional parent namespace.registry
{Object} Registry of predefined type names. This can for example be used to override the types used for primitives or to split a schema declaration over multiple files.logicalTypes
{Object} Optional dictionary of LogicalType
. This can be used to support serialization and deserialization of arbitrary native objects.assertLogicalTypes
{Boolean} The Avro specification mandates that we fall through to the underlying type if a logical type is invalid. When set, this option will override this behavior and throw an error when a logical type can't be applied.typeHook(attrs, opts)
{Function} Function called before each new type is instantiated. The relevant decoded schema is available as first argument and the parsing options as second. This function can optionally return a type which will then be used in place of the result of parsing schema
. See below for more details.Parse a schema and return an instance of the corresponding Type
.
Using the typeHook
option, it is possible to customize the parsing process by intercepting the creation of any type. As a sample use-case, we show below how to replace the default EnumType
(which decodes enum
values into strings) with a LongType
(which will decode the enum
's values into integers). This can be useful when the enum
already exists as a JavaScript object (e.g. if it was generated by TypeScript).
var longType = new avro.types.LongType(); function typeHook(schema) { if (schema.type === 'enum') { // For simplicity, we don't do any bound checking here but we could by // implementing a "bounded long" logical type and returning that instead. return longType(); } // Falling through will cause the default type to be used. }
To use it:
// Assume we already have an "enum" with each symbol. var PETS = {CAT: 0, DOG: 1}; // We can provide our hook when parsing a schema. var type = avro.parse({ name: 'Pet', type: 'enum', symbols: ['CAT', 'DOG'] }, {typeHook: typeHook}); // And encode integer enum values directly. var buf = type.toBuffer(PETS.CAT);
Finally, type hooks work well with logical types (for example to dynamically add logicalType
attributes to a schema).
All the classes below are available in the avro.types
namespace:
Type
BooleanType
BytesType
DoubleType
FloatType
IntType
LongType
NullType
StringType
LogicalType
Type
“Abstract” base Avro type class; all implementations inherit from it.
type.decode(buf, [pos,] [resolver])
buf
{Buffer} Buffer to read from.pos
{Number} Offset to start reading from.resolver
{Resolver} Optional resolver to decode values serialized from another schema. See createResolver
for how to create one.Returns {value: value, offset: offset}
if buf
contains a valid encoding of type
(value
being the decoded value, and offset
the new offset in the buffer). Returns {value: undefined, offset: -1}
when the buffer is too short.
type.encode(val, buf, [pos])
val
{...} The value to encode. An error will be raised if this isn't a valid type
value.buf
{Buffer} Buffer to write to.pos
{Number} Offset to start writing at.Encode a value into an existing buffer. If enough space was available in buf
, returns the new (non-negative) offset, otherwise returns -N
where N
is the (positive) number of bytes by which the buffer was short.
type.fromBuffer(buf, [resolver,] [noCheck])
buf
{Buffer} Bytes containing a serialized value of type
.resolver
{Resolver} To decode values serialized from another schema. See createResolver
for how to create an resolver.noCheck
{Boolean} Do not check that the entire buffer has been read. This can be useful when using an resolver which only decodes fields at the start of the buffer, allowing decoding to bail early and yield significant performance speedups.Deserialize a buffer into its corresponding value.
type.toBuffer(val)
val
{...} The value to encode. It must be a valid type
value.Returns a Buffer
containing the Avro serialization of val
.
type.fromString(str)
str
{String} String representing a JSON-serialized object.Deserialize a JSON-encoded object of type
.
type.toString([val])
val
{...} The value to serialize. If not specified, this method will return a human-friendly description of type
.Serialize an object into a JSON-encoded string.
type.isValid(val, [opts])
val
{...} The value to validate.opts
{Object} Options:errorHook(path, any, type)
{Function} Function called when an invalid value is encountered. When an invalid value causes its parent values to also be invalid, the latter do not trigger a callback. path
will be an array of strings identifying where the mismatch occurred. See below for a few examples.Check whether val
is a valid type
value.
For complex schemas, it can be difficult to figure out which part(s) of val
are invalid. The errorHook
option provides access to more information about these mismatches. We illustrate a few use-cases below:
// A sample schema. var personType = avro.parse({ type: 'record', name: 'Person', fields: [ {name: 'age', type: 'int'}, {name: 'names', type: {type: 'array', items: 'string'}} ] }); // A corresponding invalid record. var invalidPerson = {age: null, names: ['ann', 3, 'bob']};
As a first use-case, we use the errorHook
to implement a function to gather all invalid paths a value (if any):
function getInvalidPaths(type, val) { var paths = []; type.isValid(val, {errorHook: function (path) { paths.push(path.join()); }}); return paths; } var paths = getInvalidPaths(personType, invalidPerson); // == ['age', 'names,1']
We can also implement an assertValid
function which throws a helpful error on the first mismatch encountered (if any):
var util = require('util'); function assertValid(type, val) { return type.isValid(val, {errorHook: hook}); function hook(path, any) { throw new Error(util.format('invalid %s: %j', path.join(), any)); } } try { assertValid(personType, invalidPerson); // Will throw. } catch (err) { // err.message === 'invalid age: null' }
type.clone(val, [opts])
val
{...} The object to copy.opts
{Object} Options:coerceBuffers
{Boolean} Allow coercion of JSON buffer representations into actual Buffer
objects.fieldHook(field, any, type)
{Function} Function called when each record field is populated. The value returned by this function will be used instead of any
. field
is the current Field
instance and type
the parent type.wrapUnions
{Boolean} Avro's JSON representation expects all union values to be wrapped inside objects. Setting this parameter to true
will try to wrap unwrapped union values into their first matching type.Deep copy a value of type
.
type.compare(val1, val2)
val1
{...} Value of type
.val2
{...} Value of type
.Returns 0
if both values are equal according to their sort order, -1
if the first is smaller than the second , and 1
otherwise. Comparing invalid values is undefined behavior.
type.compareBuffers(buf1, buf2)
buf1
{Buffer} type
value bytes.buf2
{Buffer} type
value bytes.Similar to compare
, but doesn't require decoding values.
type.createResolver(writerType)
writerType
{Type} Writer type.Create a resolver that can be be passed to the type
's decode
and fromBuffer
methods. This will enable decoding values which had been serialized using writerType
, according to the Avro resolution rules. If the schemas are incompatible, this method will throw an error.
For example, assume we have the following two versions of a type:
// A schema's first version. var v1 = avro.parse({ name: 'Person', type: 'record', fields: [ {name: 'name', type: 'string'}, {name: 'age', type: 'int'} ] }); // The updated version. var v2 = avro.parse({ type: 'record', name: 'Person', fields: [ { name: 'name', type: [ 'string', { name: 'Name', type: 'record', fields: [ {name: 'first', type: 'string'}, {name: 'last', type: 'string'} ] } ] }, {name: 'phone', type: ['null', 'string'], default: null} ] });
The two types are compatible since the name
field is present in both (the string
can be promoted to the new union
) and the new phone
field has a default value.
// We can therefore create a resolver. var resolver = v2.createResolver(v1); // And pass it whenever we want to decode from the old type to the new. var buf = v1.toBuffer({name: 'Ann', age: 25}); var obj = v2.fromBuffer(buf, resolver); // === {name: {string: 'Ann'}, phone: null}
See the advanced usage page for more details on how schema evolution can be used to significantly speed up decoding.
type.random()
Returns a random value of type
.
type.getName([noRef])
noRef
{Boolean} Return built-in names (e.g. 'record'
, 'map'
, 'boolean'
) rather than user-specified references.Returns type
's fully qualified name if it exists, undefined
otherwise.
type.getSchema([noDeref])
noDeref
{Boolean} Do not dereference any type names.Returns type
's canonical schema (as a string). This can be used to compare schemas for equality.
type.getFingerprint([algorithm])
algorithm
{String} Algorithm used to generate the schema's fingerprint. Defaults to md5
. Not supported in the browser.Type.__reset(size)
size
{Number} New buffer size in bytes.This method resizes the internal buffer used to encode all types. You should only ever need to call this if you are encoding very large values and need to reclaim memory.
LongType(attrs, [opts])
attrs
{Object} Decoded type attributes.opts
{Object} Parsing options.LongType.using(methods, [noUnpack])
methods
{Object} Method implementations dictionary keyed by method name, see below for details on each of the functions to implement.noUnpack
{Boolean} Do not automatically unpack bytes before passing them to the above methods
' fromBuffer
function and pack bytes returned by its toBuffer
function.This function provides a way to support arbitrary long representations. Doing so requires implementing the following methods (a few examples are available here):
fromBuffer(buf)
buf
{Buffer} Encoded long. If noUnpack
is off (the default), buf
will be an 8-byte buffer containing the long‘s unpacked representation. Otherwise, buf
will contain a variable length buffer with the long’s packed representation.This method should return the corresponding decoded long.
toBuffer(val)
val
{...} Decoded long.If noUnpack
is off (the default), this method should return an 8-byte buffer with the long
's unpacked representation. Otherwise, toBuffer
should return an already packed buffer (of variable length).
fromJSON(any)
any
{Number|...} Parsed value. To ensure that the fromString
method works correctly on data JSON-serialized according to the Avro spec, this method should at least support numbers as input.This method should return the corresponding decoded long.
It might also be useful to support other kinds of input (typically the output of the long implementation's toJSON
method) to enable serializing large numbers without loss of precision (at the cost of violating the Avro spec).
toJSON(val)
val
{...} Decoded long.This method should return the long
's JSON representation.
isValid(val, [opts])
See Type.isValid
.
compare(val1, val2)
See Type.compare
.
ArrayType(attrs, [opts])
attrs
{Object} Decoded type attributes.opts
{Object} Parsing options.type.getItemsType()
The type of the array's items.
EnumType(attrs, [opts])
attrs
{Object} Decoded type attributes.opts
{Object} Parsing options.type.getAliases()
Optional type aliases. These are used when adapting a schema from another type.
type.getSymbols()
Returns a copy of the type's symbols (an array of strings representing the enum
's valid values).
FixedType(attrs, [opts])
attrs
{Object} Decoded type attributes.opts
{Object} Parsing options.type.getAliases()
Optional type aliases. These are used when adapting a schema from another type.
type.getSize()
The size in bytes of instances of this type.
MapType(attrs, [opts])
attrs
{Object} Decoded type attributes.opts
{Object} Parsing options.type.getValuesType()
The type of the map's values (keys are always strings).
RecordType(attrs, [opts])
attrs
{Object} Decoded type attributes.opts
{Object} Parsing options.type.getAliases()
Optional type aliases. These are used when adapting a schema from another type.
type.getFields()
Returns a copy of the array of fields contained in this record. Each field is an object with the following methods:
getAliases()
getDefault()
getName()
getOrder()
getType()
type.getRecordConstructor()
The Record
constructor for instances of this type.
UnionType(attrs, [opts])
attrs
{Object} Decoded type attributes.opts
{Object} Parsing options.type.getTypes()
The possible types that this union can take.
LogicalType(attrs, [opts,] [Types])
attrs
{Object} Decoded type attributes.opts
{Object} Parsing options.Types
{Array} Optional of type classes. If specified, only these will be accepted as underlying type.“Abstract class” used to implement custom native types.
type.getUnderlyingType()
Get the underlying Avro type. This can be useful when a logical type can support different underlying types.
To implement a custom logical type, the steps are:
LogicalType
‘s constructor inside your own subclass’ to make sure the underlying type is property set up. Throwing an error anywhere inside your constructor will prevent the logical type from being used (the underlying type will be used instead).LogicalType
in your own subclass (typically using util.inherits
).LogicalType
methods).See here for a couple sample implementations.
type._fromValue(val)
val
{...} A value deserialized by the underlying type.This function should return the final, wrapped, value.
type._toValue(any)
any
{...} A wrapped value.This function should return a value which can be serialized by the underlying type.
type._resolve(type)
type
{Type} The writer's type.This function should return:
undefined
if the writer's values cannot be converted.Each RecordType
generates a corresponding Record
constructor when its schema is parsed. It is available using the RecordType
's getRecordConstructor
methods. This helps make decoding and encoding records more efficient.
All prototype methods below are prefixed with $
to avoid clashing with an existing record field ($
is a valid identifier in JavaScript, but not in Avro).
Record(...)
Calling the constructor directly can sometimes be a convenient shortcut to instantiate new records of a given type. In particular, it will correctly initialize all the missing record's fields with their default values.
record.$clone([opts])
opts
{Object} See type.clone
.Deep copy the record.
record.$compare(val)
val
{Record} See type.compare
.Compare the record to another.
record.$getType()
Get the record's type
.
record.$isValid([opts])
opts
{Object} See type.isValid
.Check whether the record is valid.
record.$toBuffer()
Return binary encoding of record.
record.$toString()
Return JSON-stringified record.
Record.getType()
Convenience class method to get the record's type.
Not available in the browser.
The following convenience functions are available for common operations on container files:
createFileDecoder(path, [opts])
path
{String} Path to Avro container file.opts
{Object} Decoding options, passed to BlockDecoder
.Returns a readable stream of decoded objects from an Avro container file.
createFileEncoder(path, schema, [opts])
path
{String} Destination path.schem
{Object|String|Type} Type used to serialize.opts
{Object} Encoding options, passed to BlockEncoder
.Returns a writable stream of objects. These will end up serialized into an Avro container file.
extractFileHeader(path, [opts])
path
{String} Path to Avro container file.opts
{Object} Options:decode
{Boolean} Decode schema and codec metadata (otherwise they will be returned as bytes). Defaults to true
.Extract header from an Avro container file synchronously. If no header is present (i.e. the path doesn't point to a valid Avro container file), null
is returned.
For more specific use-cases, the following stream classes are available in the avro.streams
namespace:
BlockDecoder([opts])
opts
{Object} Decoding options. Available keys:codecs
{Object} Dictionary of decompression functions, keyed by codec name. A decompression function has the signature fn(compressedData, cb)
where compressedData
is a buffer of compressed data, and must call cb(err, uncompressedData)
on completion. The default contains handlers for the 'null'
and 'deflate'
codecs.decode
{Boolean} Whether to decode records before returning them. Defaults to true
.parseOpts
{Object} Options passed when parsing the writer's schema.A duplex stream which decodes bytes coming from on Avro object container file.
'metadata'
type
{Type} The type used to write the file.codec
{String} The codec's name.header
{Object} The file's header, containing in particular the raw schema and codec.'data'
data
{...} Decoded element or raw bytes.BlockDecoder.getDefaultCodecs()
Get built-in decompression functions (currently null
and deflate
).
RawDecoder(schema, [opts])
schema
{Object|String|Type} Writer schema. Required since the input doesn't contain a header. Argument parsing logic is the same as for parse
.opts
{Object} Decoding options. Available keys:decode
{Boolean} Whether to decode records before returning them. Defaults to true
.A duplex stream which can be used to decode a stream of serialized Avro objects with no headers or blocks.
'data'
data
{...} Decoded element or raw bytes.BlockEncoder(schema, [opts])
schema
{Object|String|Type} Schema used for encoding. Argument parsing logic is the same as for parse
.opts
{Object} Encoding options. Available keys:blockSize
{Number} Maximum uncompressed size of each block data. A new block will be started when this number is exceeded. If it is too small to fit a single element, it will be increased appropriately. Defaults to 64kB.codec
{String} Name of codec to use for encoding. See codecs
option below to support arbitrary compression functions.codecs
{Object} Dictionary of compression functions, keyed by codec name. A compression function has the signature fn(uncompressedData, cb)
where uncompressedData
is a buffer of uncompressed data, and must call cb(err, compressedData)
on completion. The default contains handlers for the 'null'
and 'deflate'
codecs.omitHeader
{Boolean} Don't emit the header. This can be useful when appending to an existing container file. Defaults to false
.syncMarker
{Buffer} 16 byte buffer to use as synchronization marker inside the file. If unspecified, a random value will be generated.A duplex stream to create Avro container object files.
'data'
data
{Buffer} Serialized bytes.BlockEncoder.getDefaultCodecs()
Get built-in compression functions (currently null
and deflate
).
RawEncoder(schema, [opts])
schema
{Object|String|Type} Schema used for encoding. Argument parsing logic is the same as for parse
.opts
{Object} Encoding options. Available keys:batchSize
{Number} To increase performance, records are serialized in batches. Use this option to control how often batches are emitted. If it is too small to fit a single record, it will be increased automatically. Defaults to 64kB.The encoding equivalent of RawDecoder
.
'data'
data
{Buffer} Serialized bytes.Avro also defines a way of executing remote procedure calls. We expose this via an API modeled after node.js' core EventEmitter
.
Protocol
Protocol
instances are obtained by parse
-ing a protocol declaration and provide a way of sending remote messages (for example to another machine, or another process on the same machine). For this reason, instances of this class are very similar to EventEmitter
s, exposing both emit
and on
methods.
Being able to send remote messages (and to do so efficiently) introduces a few differences however:
protocol.emit(name, req, emitter, cb)
name
{String} Name of the message to emit. If this message is sent to a Protocol
instance with no handler defined for this name, an “unsupported message” error will be returned.req
{Object} Request value, must correspond to the message's declared request type.emitter
{MessageEmitter} Emitter used to send the message. See createEmitter
for how to obtain one.cb(err, res)
{Function} Function called with the remote call's response (and eventual error) when available. This can be omitted when the message is one way.Send a message. This is always done asynchronously.
protocol.on(name, handler)
name
{String} Message name to add the handler for. An error will be thrown if this name isn't defined in the protocol. At most one handler can exist for a given name (any previously defined handler will be overwritten).handler(req, listener, cb)
{Function} Handler, called each time a message with matching name is received. The listener
argument will be the corresponding MessageListener
instance. The final callback argument cb(err, res)
should be called to send the response back to the emitter (except when the message is one way, in which case cb
will be undefined
).Add a handler for a given message.
protocol.createEmitter(transport, [opts,] [cb])
transport
{Duplex|Object|Function} The transport used to communicate with the remote listener. Multiple argument types are supported, see below.opts
{Object} Options.IdType
{LogicalType} Metadata logical type.bufferSize
{Number} Internal serialization buffer size (in bytes). Defaults to 2048.frameSize
{Number} Size used when framing messages. Defaults to 2048.cb(pending)
{Function} End of transmission callback.Generate a MessageEmitter
for this protocol. This emitter can then be used to communicate with a remote server of compatible protocol.
There are two major types of transports:
Stateful
A pair of binary streams {readable, writable}
.
As a convenience passing a single duplex stream is also supported and equivalent to passing {readable: duplex, writable: duplex}
.
Stateless
Stream factory fn(cb)
which should return a writable stream and call its callback argument with a readable stream (when available).
protocol.createListener(transport, [opts,] [cb])
transport
{Duplex|Object|Function} Similar to createEmitter
's corresponding argument, except that readable and writable roles are reversed for stateless transports.opts
{Object} Identical to createEmitter
's options.IdType
{LogicalType} Metadata logical type.bufferSize
{Number} Internal serialization buffer size (in bytes). Defaults to 2048.frameSize
{Number} Size used when framing messages. Defaults to 2048.cb(pending)
{Function} End of transmission callback.Generate a MessageListener
for this protocol. This listener can be used to respond to messages emitted from compatible protocols.
protocol.subprotocol()
Returns a copy of the original protocol, which inherits all its handlers.
protocol.getMessages()
Retrieve all the messages defined in the protocol. Each message is an object with the following (read-only) properties:
name
{String}requestType
{Type}responseType
{Type}errorType
{Type}oneWay
{Boolean}protocol.getName()
Returns the protocol's fully qualified name.
protocol.getType(name)
name
{String} A type's fully qualified name.Convenience function to retrieve a type defined inside this protocol. Returns undefined
if no type exists for the given name.
MessageEmitter
Instance of this class are EventEmitter
s, with the following events:
'handshake'
request
{Object} Handshake request.response
{Object} Handshake response.Emitted when the server's handshake response is received.
'eot'
pending
{Number} Number of interrupted requests. This will always be zero, unless the emitter was destroyed with noWait
set.End of transmission event, emitted after the client is destroyed and there are no more pending requests.
emitter.destroy([noWait])
noWait
{Boolean} Cancel any pending requests. By default pending requests will still be honored.Disable the emitter.
MessageListener
Listeners are the receiving-side equivalent of MessageEmitter
s and are also EventEmitter
s, with the following events:
'handshake'
request
{Object} Handshake request.response
{Object} Handshake response.Emitted right before the server sends a handshake response.
'eot'
pending
{Number} Number of cancelled pending responses. This will always be zero, unless the listener was destroyed with noWait
set.End of transmission event, emitted after the listener is destroyed and there are no more responses to send.
listener.destroy([noWait])
noWait
{Boolean} Don't wait for all pending responses to have been sent.Disable this listener and release underlying streams. In general you shouldn‘t need to call this: stateless listeners will be destroyed automatically when a response is sent, and stateful listeners are best destroyed from the client’s side.