Parsing schemas

parse(schema, [opts])

  • schema {Object|String} An Avro schema, represented by one of:
    • A string containing a JSON-stringified schema (e.g. '["null", "int"]').
    • A path to a file containing a JSON-stringified schema (e.g. './Schema.avsc').
    • A decoded schema object (e.g. {type: 'array', items: 'int'}).
  • opts {Object} Parsing options. The following keys are currently supported:
    • namespace {String} Optional parent namespace.
    • registry {Object} Registry of predefined type names. This can for example be used to override the types used for primitives or to split a schema declaration over multiple files.
    • logicalTypes {Object} Optional dictionary of LogicalType. This can be used to support serialization and deserialization of arbitrary native objects.
    • assertLogicalTypes {Boolean} The Avro specification mandates that we fall through to the underlying type if a logical type is invalid. When set, this option will override this behavior and throw an error when a logical type can't be applied.
    • typeHook(attrs, opts) {Function} Function called before each new type is instantiated. The relevant decoded schema is available as first argument and the parsing options as second. This function can optionally return a type which will then be used in place of the result of parsing schema. See below for more details.

Parse a schema and return an instance of the corresponding Type.

Using the typeHook option, it is possible to customize the parsing process by intercepting the creation of any type. As a sample use-case, we show below how to replace the default EnumType (which decodes enum values into strings) with a LongType (which will decode the enum's values into integers). This can be useful when the enum already exists as a JavaScript object (e.g. if it was generated by TypeScript).

var longType = new avro.types.LongType();
function typeHook(schema) {
  if (schema.type === 'enum') {
    // For simplicity, we don't do any bound checking here but we could by
    // implementing a "bounded long" logical type and returning that instead.
    return longType();
  }
  // Falling through will cause the default type to be used.
}

To use it:

// Assume we already have an "enum" with each symbol.
var PETS = {CAT: 0, DOG: 1};

// We can provide our hook when parsing a schema.
var type = avro.parse({
  name: 'Pet',
  type: 'enum',
  symbols: ['CAT', 'DOG']
}, {typeHook: typeHook});

// And encode integer enum values directly.
var buf = type.toBuffer(PETS.CAT);

Finally, type hooks work well with logical types (for example to dynamically add logicalType attributes to a schema).

Avro types

All the classes below are available in the avro.types namespace:

Class Type

“Abstract” base Avro type class; all implementations inherit from it.

type.decode(buf, [pos,] [resolver])
  • buf {Buffer} Buffer to read from.
  • pos {Number} Offset to start reading from.
  • resolver {Resolver} Optional resolver to decode values serialized from another schema. See createResolver for how to create one.

Returns {value: value, offset: offset} if buf contains a valid encoding of type (value being the decoded value, and offset the new offset in the buffer). Returns {value: undefined, offset: -1} when the buffer is too short.

type.encode(val, buf, [pos])
  • val {...} The value to encode. An error will be raised if this isn't a valid type value.
  • buf {Buffer} Buffer to write to.
  • pos {Number} Offset to start writing at.

Encode a value into an existing buffer. If enough space was available in buf, returns the new (non-negative) offset, otherwise returns -N where N is the (positive) number of bytes by which the buffer was short.

type.fromBuffer(buf, [resolver,] [noCheck])
  • buf {Buffer} Bytes containing a serialized value of type.
  • resolver {Resolver} To decode values serialized from another schema. See createResolver for how to create an resolver.
  • noCheck {Boolean} Do not check that the entire buffer has been read. This can be useful when using an resolver which only decodes fields at the start of the buffer, allowing decoding to bail early and yield significant performance speedups.

Deserialize a buffer into its corresponding value.

type.toBuffer(val)
  • val {...} The value to encode. It must be a valid type value.

Returns a Buffer containing the Avro serialization of val.

type.fromString(str)
  • str {String} String representing a JSON-serialized object.

Deserialize a JSON-encoded object of type.

type.toString([val])
  • val {...} The value to serialize. If not specified, this method will return a human-friendly description of type.

Serialize an object into a JSON-encoded string.

type.isValid(val, [opts])
  • val {...} The value to validate.
  • opts {Object} Options:
    • errorHook(path, any, type) {Function} Function called when an invalid value is encountered. When an invalid value causes its parent values to also be invalid, the latter do not trigger a callback. path will be an array of strings identifying where the mismatch occurred. See below for a few examples.

Check whether val is a valid type value.

For complex schemas, it can be difficult to figure out which part(s) of val are invalid. The errorHook option provides access to more information about these mismatches. We illustrate a few use-cases below:

// A sample schema.
var personType = avro.parse({
  type: 'record',
  name: 'Person',
  fields: [
    {name: 'age', type: 'int'},
    {name: 'names', type: {type: 'array', items: 'string'}}
  ]
});

// A corresponding invalid record.
var invalidPerson = {age: null, names: ['ann', 3, 'bob']};

As a first use-case, we use the errorHook to implement a function to gather all invalid paths a value (if any):

function getInvalidPaths(type, val) {
  var paths = [];
  type.isValid(val, {errorHook: function (path) { paths.push(path.join()); }});
  return paths;
}

var paths = getInvalidPaths(personType, invalidPerson); // == ['age', 'names,1']

We can also implement an assertValid function which throws a helpful error on the first mismatch encountered (if any):

var util = require('util');

function assertValid(type, val) {
  return type.isValid(val, {errorHook: hook});

  function hook(path, any) {
    throw new Error(util.format('invalid %s: %j', path.join(), any));
  }
}

try {
  assertValid(personType, invalidPerson); // Will throw.
} catch (err) {
  // err.message === 'invalid age: null'
}
type.clone(val, [opts])
  • val {...} The object to copy.
  • opts {Object} Options:
    • coerceBuffers {Boolean} Allow coercion of JSON buffer representations into actual Buffer objects.
    • fieldHook(field, any, type) {Function} Function called when each record field is populated. The value returned by this function will be used instead of any. field is the current Field instance and type the parent type.
    • wrapUnions {Boolean} Avro's JSON representation expects all union values to be wrapped inside objects. Setting this parameter to true will try to wrap unwrapped union values into their first matching type.

Deep copy a value of type.

type.compare(val1, val2)
  • val1 {...} Value of type.
  • val2 {...} Value of type.

Returns 0 if both values are equal according to their sort order, -1 if the first is smaller than the second , and 1 otherwise. Comparing invalid values is undefined behavior.

type.compareBuffers(buf1, buf2)
  • buf1 {Buffer} type value bytes.
  • buf2 {Buffer} type value bytes.

Similar to compare, but doesn't require decoding values.

type.createResolver(writerType)
  • writerType {Type} Writer type.

Create a resolver that can be be passed to the type's decode and fromBuffer methods. This will enable decoding values which had been serialized using writerType, according to the Avro resolution rules. If the schemas are incompatible, this method will throw an error.

For example, assume we have the following two versions of a type:

// A schema's first version.
var v1 = avro.parse({
  name: 'Person',
  type: 'record',
  fields: [
    {name: 'name', type: 'string'},
    {name: 'age', type: 'int'}
  ]
});

// The updated version.
var v2 = avro.parse({
  type: 'record',
  name: 'Person',
  fields: [
    {
      name: 'name', type: [
        'string',
        {
          name: 'Name',
          type: 'record',
          fields: [
            {name: 'first', type: 'string'},
            {name: 'last', type: 'string'}
          ]
        }
      ]
    },
    {name: 'phone', type: ['null', 'string'], default: null}
  ]
});

The two types are compatible since the name field is present in both (the string can be promoted to the new union) and the new phone field has a default value.

//  We can therefore create a resolver.
var resolver = v2.createResolver(v1);

// And pass it whenever we want to decode from the old type to the new.
var buf = v1.toBuffer({name: 'Ann', age: 25});
var obj = v2.fromBuffer(buf, resolver); // === {name: {string: 'Ann'}, phone: null}

See the advanced usage page for more details on how schema evolution can be used to significantly speed up decoding.

type.random()

Returns a random value of type.

type.getName([noRef])
  • noRef {Boolean} Return built-in names (e.g. 'record', 'map', 'boolean') rather than user-specified references.

Returns type's fully qualified name if it exists, undefined otherwise.

type.getSchema([noDeref])
  • noDeref {Boolean} Do not dereference any type names.

Returns type's canonical schema (as a string). This can be used to compare schemas for equality.

type.getFingerprint([algorithm])
  • algorithm {String} Algorithm used to generate the schema's fingerprint. Defaults to md5. Not supported in the browser.
Type.__reset(size)
  • size {Number} New buffer size in bytes.

This method resizes the internal buffer used to encode all types. You should only ever need to call this if you are encoding very large values and need to reclaim memory.

Class LongType(attrs, [opts])

  • attrs {Object} Decoded type attributes.
  • opts {Object} Parsing options.
LongType.using(methods, [noUnpack])
  • methods {Object} Method implementations dictionary keyed by method name, see below for details on each of the functions to implement.
  • noUnpack {Boolean} Do not automatically unpack bytes before passing them to the above methods' fromBuffer function and pack bytes returned by its toBuffer function.

This function provides a way to support arbitrary long representations. Doing so requires implementing the following methods (a few examples are available here):

  • fromBuffer(buf)

    • buf {Buffer} Encoded long. If noUnpack is off (the default), buf will be an 8-byte buffer containing the long‘s unpacked representation. Otherwise, buf will contain a variable length buffer with the long’s packed representation.

    This method should return the corresponding decoded long.

  • toBuffer(val)

    • val {...} Decoded long.

    If noUnpack is off (the default), this method should return an 8-byte buffer with the long's unpacked representation. Otherwise, toBuffer should return an already packed buffer (of variable length).

  • fromJSON(any)

    • any {Number|...} Parsed value. To ensure that the fromString method works correctly on data JSON-serialized according to the Avro spec, this method should at least support numbers as input.

    This method should return the corresponding decoded long.

    It might also be useful to support other kinds of input (typically the output of the long implementation's toJSON method) to enable serializing large numbers without loss of precision (at the cost of violating the Avro spec).

  • toJSON(val)

    • val {...} Decoded long.

    This method should return the long's JSON representation.

  • isValid(val, [opts])

    See Type.isValid.

  • compare(val1, val2)

    See Type.compare.

Class ArrayType(attrs, [opts])

  • attrs {Object} Decoded type attributes.
  • opts {Object} Parsing options.
type.getItemsType()

The type of the array's items.

Class EnumType(attrs, [opts])

  • attrs {Object} Decoded type attributes.
  • opts {Object} Parsing options.
type.getAliases()

Optional type aliases. These are used when adapting a schema from another type.

type.getSymbols()

Returns a copy of the type's symbols (an array of strings representing the enum's valid values).

Class FixedType(attrs, [opts])

  • attrs {Object} Decoded type attributes.
  • opts {Object} Parsing options.
type.getAliases()

Optional type aliases. These are used when adapting a schema from another type.

type.getSize()

The size in bytes of instances of this type.

Class MapType(attrs, [opts])

  • attrs {Object} Decoded type attributes.
  • opts {Object} Parsing options.
type.getValuesType()

The type of the map's values (keys are always strings).

Class RecordType(attrs, [opts])

  • attrs {Object} Decoded type attributes.
  • opts {Object} Parsing options.
type.getAliases()

Optional type aliases. These are used when adapting a schema from another type.

type.getFields()

Returns a copy of the array of fields contained in this record. Each field is an object with the following methods:

  • getAliases()
  • getDefault()
  • getName()
  • getOrder()
  • getType()
type.getRecordConstructor()

The Record constructor for instances of this type.

Class UnionType(attrs, [opts])

  • attrs {Object} Decoded type attributes.
  • opts {Object} Parsing options.
type.getTypes()

The possible types that this union can take.

Class LogicalType(attrs, [opts,] [Types])

  • attrs {Object} Decoded type attributes.
  • opts {Object} Parsing options.
  • Types {Array} Optional of type classes. If specified, only these will be accepted as underlying type.

“Abstract class” used to implement custom native types.

type.getUnderlyingType()

Get the underlying Avro type. This can be useful when a logical type can support different underlying types.

To implement a custom logical type, the steps are:

  • Call LogicalType‘s constructor inside your own subclass’ to make sure the underlying type is property set up. Throwing an error anywhere inside your constructor will prevent the logical type from being used (the underlying type will be used instead).
  • Extend LogicalType in your own subclass (typically using util.inherits).
  • Override the methods below (prefixed with an underscore because they are internal to the class that defines them and should only be called by the internal LogicalType methods).

See here for a couple sample implementations.

type._fromValue(val)
  • val {...} A value deserialized by the underlying type.

This function should return the final, wrapped, value.

type._toValue(any)
  • any {...} A wrapped value.

This function should return a value which can be serialized by the underlying type.

type._resolve(type)
  • type {Type} The writer's type.

This function should return:

  • undefined if the writer's values cannot be converted.
  • Otherwise, a function which converts a value deserialized by the writer's type into a wrapped value for the current type.

Records

Each RecordType generates a corresponding Record constructor when its schema is parsed. It is available using the RecordType's getRecordConstructor methods. This helps make decoding and encoding records more efficient.

All prototype methods below are prefixed with $ to avoid clashing with an existing record field ($ is a valid identifier in JavaScript, but not in Avro).

Class Record(...)

Calling the constructor directly can sometimes be a convenient shortcut to instantiate new records of a given type. In particular, it will correctly initialize all the missing record's fields with their default values.

record.$clone([opts])

Deep copy the record.

record.$compare(val)

Compare the record to another.

record.$getType()

Get the record's type.

record.$isValid([opts])

Check whether the record is valid.

record.$toBuffer()

Return binary encoding of record.

record.$toString()

Return JSON-stringified record.

Record.getType()

Convenience class method to get the record's type.

Files and streams

Not available in the browser.

The following convenience functions are available for common operations on container files:

createFileDecoder(path, [opts])

  • path {String} Path to Avro container file.
  • opts {Object} Decoding options, passed to BlockDecoder.

Returns a readable stream of decoded objects from an Avro container file.

createFileEncoder(path, schema, [opts])

  • path {String} Destination path.
  • schem {Object|String|Type} Type used to serialize.
  • opts {Object} Encoding options, passed to BlockEncoder.

Returns a writable stream of objects. These will end up serialized into an Avro container file.

extractFileHeader(path, [opts])

  • path {String} Path to Avro container file.
  • opts {Object} Options:
    • decode {Boolean} Decode schema and codec metadata (otherwise they will be returned as bytes). Defaults to true.

Extract header from an Avro container file synchronously. If no header is present (i.e. the path doesn't point to a valid Avro container file), null is returned.

For more specific use-cases, the following stream classes are available in the avro.streams namespace:

Class BlockDecoder([opts])

  • opts {Object} Decoding options. Available keys:
    • codecs {Object} Dictionary of decompression functions, keyed by codec name. A decompression function has the signature fn(compressedData, cb) where compressedData is a buffer of compressed data, and must call cb(err, uncompressedData) on completion. The default contains handlers for the 'null' and 'deflate' codecs.
    • decode {Boolean} Whether to decode records before returning them. Defaults to true.
    • parseOpts {Object} Options passed when parsing the writer's schema.

A duplex stream which decodes bytes coming from on Avro object container file.

Event 'metadata'
  • type {Type} The type used to write the file.
  • codec {String} The codec's name.
  • header {Object} The file's header, containing in particular the raw schema and codec.
Event 'data'
  • data {...} Decoded element or raw bytes.
BlockDecoder.getDefaultCodecs()

Get built-in decompression functions (currently null and deflate).

Class RawDecoder(schema, [opts])

  • schema {Object|String|Type} Writer schema. Required since the input doesn't contain a header. Argument parsing logic is the same as for parse.
  • opts {Object} Decoding options. Available keys:
    • decode {Boolean} Whether to decode records before returning them. Defaults to true.

A duplex stream which can be used to decode a stream of serialized Avro objects with no headers or blocks.

Event 'data'
  • data {...} Decoded element or raw bytes.

Class BlockEncoder(schema, [opts])

  • schema {Object|String|Type} Schema used for encoding. Argument parsing logic is the same as for parse.
  • opts {Object} Encoding options. Available keys:
    • blockSize {Number} Maximum uncompressed size of each block data. A new block will be started when this number is exceeded. If it is too small to fit a single element, it will be increased appropriately. Defaults to 64kB.
    • codec {String} Name of codec to use for encoding. See codecs option below to support arbitrary compression functions.
    • codecs {Object} Dictionary of compression functions, keyed by codec name. A compression function has the signature fn(uncompressedData, cb) where uncompressedData is a buffer of uncompressed data, and must call cb(err, compressedData) on completion. The default contains handlers for the 'null' and 'deflate' codecs.
    • omitHeader {Boolean} Don't emit the header. This can be useful when appending to an existing container file. Defaults to false.
    • syncMarker {Buffer} 16 byte buffer to use as synchronization marker inside the file. If unspecified, a random value will be generated.

A duplex stream to create Avro container object files.

Event 'data'
  • data {Buffer} Serialized bytes.
BlockEncoder.getDefaultCodecs()

Get built-in compression functions (currently null and deflate).

Class RawEncoder(schema, [opts])

  • schema {Object|String|Type} Schema used for encoding. Argument parsing logic is the same as for parse.
  • opts {Object} Encoding options. Available keys:
    • batchSize {Number} To increase performance, records are serialized in batches. Use this option to control how often batches are emitted. If it is too small to fit a single record, it will be increased automatically. Defaults to 64kB.

The encoding equivalent of RawDecoder.

Event 'data'
  • data {Buffer} Serialized bytes.

IPC & RPC

Avro also defines a way of executing remote procedure calls. We expose this via an API modeled after node.js' core EventEmitter.

Class Protocol

Protocol instances are obtained by parse-ing a protocol declaration and provide a way of sending remote messages (for example to another machine, or another process on the same machine). For this reason, instances of this class are very similar to EventEmitters, exposing both emit and on methods.

Being able to send remote messages (and to do so efficiently) introduces a few differences however:

  • The types used in each event (for both the emitted message and its response) must be defined upfront in the protocol's declaration.
  • The arguments emitted with each event must match the ones defined in the protocol. Similarly, handlers are guaranteed to be called only with these matching arguments.
  • Events are one-to-one: they have exactly one response (unless they are declared as one-way, in which case they have none).
protocol.emit(name, req, emitter, cb)
  • name {String} Name of the message to emit. If this message is sent to a Protocol instance with no handler defined for this name, an “unsupported message” error will be returned.
  • req {Object} Request value, must correspond to the message's declared request type.
  • emitter {MessageEmitter} Emitter used to send the message. See createEmitter for how to obtain one.
  • cb(err, res) {Function} Function called with the remote call's response (and eventual error) when available. This can be omitted when the message is one way.

Send a message. This is always done asynchronously.

protocol.on(name, handler)
  • name {String} Message name to add the handler for. An error will be thrown if this name isn't defined in the protocol. At most one handler can exist for a given name (any previously defined handler will be overwritten).
  • handler(req, listener, cb) {Function} Handler, called each time a message with matching name is received. The listener argument will be the corresponding MessageListener instance. The final callback argument cb(err, res) should be called to send the response back to the emitter (except when the message is one way, in which case cb will be undefined).

Add a handler for a given message.

protocol.createEmitter(transport, [opts,] [cb])
  • transport {Duplex|Object|Function} The transport used to communicate with the remote listener. Multiple argument types are supported, see below.
  • opts {Object} Options.
    • IdType {LogicalType} Metadata logical type.
    • bufferSize {Number} Internal serialization buffer size (in bytes). Defaults to 2048.
    • frameSize {Number} Size used when framing messages. Defaults to 2048.
  • cb(pending) {Function} End of transmission callback.

Generate a MessageEmitter for this protocol. This emitter can then be used to communicate with a remote server of compatible protocol.

There are two major types of transports:

  • Stateful

    A pair of binary streams {readable, writable}.

    As a convenience passing a single duplex stream is also supported and equivalent to passing {readable: duplex, writable: duplex}.

  • Stateless

    Stream factory fn(cb) which should return a writable stream and call its callback argument with a readable stream (when available).

protocol.createListener(transport, [opts,] [cb])
  • transport {Duplex|Object|Function} Similar to createEmitter's corresponding argument, except that readable and writable roles are reversed for stateless transports.
  • opts {Object} Identical to createEmitter's options.
    • IdType {LogicalType} Metadata logical type.
    • bufferSize {Number} Internal serialization buffer size (in bytes). Defaults to 2048.
    • frameSize {Number} Size used when framing messages. Defaults to 2048.
  • cb(pending) {Function} End of transmission callback.

Generate a MessageListener for this protocol. This listener can be used to respond to messages emitted from compatible protocols.

protocol.subprotocol()

Returns a copy of the original protocol, which inherits all its handlers.

protocol.getMessages()

Retrieve all the messages defined in the protocol. Each message is an object with the following (read-only) properties:

  • name {String}
  • requestType {Type}
  • responseType {Type}
  • errorType {Type}
  • oneWay {Boolean}
protocol.getName()

Returns the protocol's fully qualified name.

protocol.getType(name)
  • name {String} A type's fully qualified name.

Convenience function to retrieve a type defined inside this protocol. Returns undefined if no type exists for the given name.

Class MessageEmitter

Instance of this class are EventEmitters, with the following events:

Event 'handshake'
  • request {Object} Handshake request.
  • response {Object} Handshake response.

Emitted when the server's handshake response is received.

Event 'eot'
  • pending {Number} Number of interrupted requests. This will always be zero, unless the emitter was destroyed with noWait set.

End of transmission event, emitted after the client is destroyed and there are no more pending requests.

emitter.destroy([noWait])
  • noWait {Boolean} Cancel any pending requests. By default pending requests will still be honored.

Disable the emitter.

Class MessageListener

Listeners are the receiving-side equivalent of MessageEmitters and are also EventEmitters, with the following events:

Event 'handshake'
  • request {Object} Handshake request.
  • response {Object} Handshake response.

Emitted right before the server sends a handshake response.

Event 'eot'
  • pending {Number} Number of cancelled pending responses. This will always be zero, unless the listener was destroyed with noWait set.

End of transmission event, emitted after the listener is destroyed and there are no more responses to send.

listener.destroy([noWait])
  • noWait {Boolean} Don't wait for all pending responses to have been sent.

Disable this listener and release underlying streams. In general you shouldn‘t need to call this: stateless listeners will be destroyed automatically when a response is sent, and stateful listeners are best destroyed from the client’s side.