blob: 9c119538778b8084237f26cbb8a97e803e1bc9b3 [file] [log] [blame]
/* jshint node: true, mocha: true */
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
'use strict';
var files = require('../lib/files'),
protocols = require('../lib/protocols'),
schemas = require('../lib/schemas'),
assert = require('assert'),
fs = require('fs'),
path = require('path'),
tmp = require('tmp');
var DPATH = path.join(__dirname, 'dat');
var Header = files.HEADER_TYPE.getRecordConstructor();
var MAGIC_BYTES = files.MAGIC_BYTES;
var SYNC = new Buffer('atokensyncheader');
var createType = schemas.createType;
var streams = files.streams;
var types = schemas.types;
suite('files', function () {
suite('parse', function () {
var parse = files.parse;
test('type object', function () {
var obj = {
type: 'record',
name: 'Person',
fields: [{name: 'so', type: 'Person'}]
};
assert(parse(obj) instanceof types.RecordType);
});
test('protocol object', function () {
var obj = {protocol: 'Foo'};
assert(parse(obj) instanceof protocols.Protocol);
});
test('schema instance', function () {
var type = parse({
type: 'record',
name: 'Person',
fields: [{name: 'so', type: 'Person'}]
});
assert.strictEqual(parse(type), type);
});
test('stringified schema', function () {
assert(parse('"int"') instanceof types.IntType);
});
test('type name', function () {
assert(parse('double') instanceof types.DoubleType);
});
test('file', function () {
var t1 = parse({type: 'fixed', name: 'id.Id', size: 64});
var t2 = parse(path.join(__dirname, 'dat', 'Id.avsc'));
assert.deepEqual(JSON.stringify(t1), JSON.stringify(t2));
});
});
suite('RawEncoder', function () {
var RawEncoder = streams.RawEncoder;
test('flush once', function (cb) {
var t = createType('int');
var buf;
var encoder = new RawEncoder(t)
.on('data', function (chunk) {
assert.strictEqual(buf, undefined);
buf = chunk;
})
.on('end', function () {
assert.deepEqual(buf, new Buffer([2, 0, 3]));
cb();
});
encoder.write(1);
encoder.write(0);
encoder.end(-2);
});
test('write multiple', function (cb) {
var t = createType('int');
var bufs = [];
var encoder = new RawEncoder(t, {batchSize: 1})
.on('data', function (chunk) {
bufs.push(chunk);
})
.on('end', function () {
assert.deepEqual(bufs, [new Buffer([1]), new Buffer([2])]);
cb();
});
encoder.write(-1);
encoder.end(1);
});
test('resize', function (cb) {
var t = createType({type: 'fixed', name: 'A', size: 2});
var data = new Buffer([48, 18]);
var buf;
var encoder = new RawEncoder(t, {batchSize: 1})
.on('data', function (chunk) {
assert.strictEqual(buf, undefined);
buf = chunk;
})
.on('end', function () {
assert.deepEqual(buf, data);
cb();
});
encoder.write(data);
encoder.end();
});
test('flush when full', function (cb) {
var t = createType({type: 'fixed', name: 'A', size: 2});
var data = new Buffer([48, 18]);
var chunks = [];
var encoder = new RawEncoder(t, {batchSize: 2})
.on('data', function (chunk) { chunks.push(chunk); })
.on('end', function () {
assert.deepEqual(chunks, [data, data]);
cb();
});
encoder.write(data);
encoder.write(data);
encoder.end();
});
test('empty', function (cb) {
var t = createType('int');
var chunks = [];
var encoder = new RawEncoder(t, {batchSize: 2})
.on('data', function (chunk) { chunks.push(chunk); })
.on('end', function () {
assert.deepEqual(chunks, []);
cb();
});
encoder.end();
});
test('missing writer type', function () {
assert.throws(function () { new RawEncoder(); });
});
test('writer type from schema', function () {
var encoder = new RawEncoder('int');
assert(encoder._type instanceof types.IntType);
});
test('invalid object', function (cb) {
var t = createType('int');
var encoder = new RawEncoder(t)
.on('error', function () { cb(); });
encoder.write('hi');
});
});
suite('RawDecoder', function () {
var RawDecoder = streams.RawDecoder;
test('single item', function (cb) {
var t = createType('int');
var objs = [];
var decoder = new RawDecoder(t)
.on('data', function (obj) { objs.push(obj); })
.on('end', function () {
assert.deepEqual(objs, [0]);
cb();
});
decoder.end(new Buffer([0]));
});
test('no writer type', function () {
assert.throws(function () { new RawDecoder(); });
});
test('decoding', function (cb) {
var t = createType('int');
var objs = [];
var decoder = new RawDecoder(t)
.on('data', function (obj) { objs.push(obj); })
.on('end', function () {
assert.deepEqual(objs, [1, 2]);
cb();
});
decoder.write(new Buffer([2]));
decoder.end(new Buffer([4]));
});
test('no decoding', function (cb) {
var t = createType('int');
var bufs = [new Buffer([3]), new Buffer([124])];
var objs = [];
var decoder = new RawDecoder(t, {decode: false})
.on('data', function (obj) { objs.push(obj); })
.on('end', function () {
assert.deepEqual(objs, bufs);
cb();
});
decoder.write(bufs[0]);
decoder.end(bufs[1]);
});
test('write partial', function (cb) {
var t = createType('bytes');
var objs = [];
var decoder = new RawDecoder(t)
.on('data', function (obj) { objs.push(obj); })
.on('end', function () {
assert.deepEqual(objs, [new Buffer([6])]);
cb();
});
decoder.write(new Buffer([2]));
// Let the first read go through (and return null).
process.nextTick(function () { decoder.end(new Buffer([6])); });
});
});
suite('BlockEncoder', function () {
var BlockEncoder = streams.BlockEncoder;
test('invalid type', function () {
assert.throws(function () { new BlockEncoder(); });
});
test('invalid codec', function (cb) {
var t = createType('int');
var encoder = new BlockEncoder(t, {codec: 'foo'})
.on('error', function () { cb(); });
encoder.write(2);
});
test('invalid object', function (cb) {
var t = createType('int');
var encoder = new BlockEncoder(t)
.on('error', function () { cb(); });
encoder.write('hi');
});
test('empty', function (cb) {
var t = createType('int');
var chunks = [];
var encoder = new BlockEncoder(t)
.on('data', function (chunk) { chunks.push(chunk); })
.on('end', function () {
assert.equal(chunks.length, 0);
cb();
});
encoder.end();
});
test('flush on finish', function (cb) {
var t = createType('int');
var chunks = [];
var encoder = new BlockEncoder(t, {
omitHeader: true,
syncMarker: SYNC
}).on('data', function (chunk) { chunks.push(chunk); })
.on('end', function () {
assert.deepEqual(chunks, [
new Buffer([6]),
new Buffer([6]),
new Buffer([24, 0, 8]),
SYNC
]);
cb();
});
encoder.write(12);
encoder.write(0);
encoder.end(4);
});
test('flush when full', function (cb) {
var chunks = [];
var encoder = new BlockEncoder(createType('int'), {
omitHeader: true,
syncMarker: SYNC,
blockSize: 2
}).on('data', function (chunk) { chunks.push(chunk); })
.on('end', function () {
assert.deepEqual(
chunks,
[
new Buffer([2]), new Buffer([2]), new Buffer([2]), SYNC,
new Buffer([2]), new Buffer([4]), new Buffer([128, 1]), SYNC
]
);
cb();
});
encoder.write(1);
encoder.end(64);
});
test('resize', function (cb) {
var t = createType({type: 'fixed', size: 8, name: 'Eight'});
var buf = new Buffer('abcdefgh');
var chunks = [];
var encoder = new BlockEncoder(t, {
omitHeader: true,
syncMarker: SYNC,
blockSize: 4
}).on('data', function (chunk) { chunks.push(chunk); })
.on('end', function () {
var b1 = new Buffer([4]);
var b2 = new Buffer([32]);
assert.deepEqual(chunks, [b1, b2, Buffer.concat([buf, buf]), SYNC]);
cb();
});
encoder.write(buf);
encoder.end(buf);
});
test('compression error', function (cb) {
var t = createType('int');
var codecs = {
invalid: function (data, cb) { cb(new Error('ouch')); }
};
var encoder = new BlockEncoder(t, {codec: 'invalid', codecs: codecs})
.on('error', function () { cb(); });
encoder.end(12);
});
test('write non-canonical schema', function (cb) {
var obj = {type: 'fixed', size: 2, name: 'Id', doc: 'An id.'};
var id = new Buffer([1, 2]);
var ids = [];
var encoder = new BlockEncoder(obj);
var decoder = new streams.BlockDecoder()
.on('metadata', function (type, codec, header) {
var schema = JSON.parse(header.meta['avro.schema'].toString());
assert.deepEqual(schema, obj); // Check that doc field not stripped.
})
.on('data', function (id) { ids.push(id); })
.on('end', function () {
assert.deepEqual(ids, [id]);
cb();
});
encoder.pipe(decoder);
encoder.end(id);
});
});
suite('BlockDecoder', function () {
var BlockDecoder = streams.BlockDecoder;
test('invalid magic bytes', function (cb) {
var decoder = new BlockDecoder()
.on('data', function () {})
.on('error', function () { cb(); });
decoder.write(new Buffer([0, 3, 2, 1])); // !== MAGIC_BYTES
decoder.write(new Buffer([0]));
decoder.end(SYNC);
});
test('invalid sync marker', function (cb) {
var decoder = new BlockDecoder()
.on('data', function () {})
.on('error', function () { cb(); });
var header = new Header(
MAGIC_BYTES,
{
'avro.schema': new Buffer('"int"'),
'avro.codec': new Buffer('null')
},
SYNC
);
decoder.write(header.$toBuffer());
decoder.write(new Buffer([0, 0])); // Empty block.
decoder.end(new Buffer('alongerstringthansixteenbytes'));
});
test('missing codec', function (cb) {
var decoder = new BlockDecoder()
.on('data', function () {})
.on('end', function () { cb(); });
var header = new Header(
MAGIC_BYTES,
{'avro.schema': new Buffer('"int"')},
SYNC
);
decoder.end(header.$toBuffer());
});
test('unknown codec', function (cb) {
var decoder = new BlockDecoder()
.on('data', function () {})
.on('error', function () { cb(); });
var header = new Header(
MAGIC_BYTES,
{
'avro.schema': new Buffer('"int"'),
'avro.codec': new Buffer('"foo"')
},
SYNC
);
decoder.end(header.$toBuffer());
});
test('invalid schema', function (cb) {
var decoder = new BlockDecoder()
.on('data', function () {})
.on('error', function () { cb(); });
var header = new Header(
MAGIC_BYTES,
{
'avro.schema': new Buffer('"int2"'),
'avro.codec': new Buffer('null')
},
SYNC
);
decoder.end(header.$toBuffer());
});
});
suite('encode & decode', function () {
test('uncompressed int', function (cb) {
var t = createType('int');
var objs = [];
var encoder = new streams.BlockEncoder(t);
var decoder = new streams.BlockDecoder()
.on('data', function (obj) { objs.push(obj); })
.on('end', function () {
assert.deepEqual(objs, [12, 23, 48]);
cb();
});
encoder.pipe(decoder);
encoder.write(12);
encoder.write(23);
encoder.end(48);
});
test('uncompressed int non decoded', function (cb) {
var t = createType('int');
var objs = [];
var encoder = new streams.BlockEncoder(t);
var decoder = new streams.BlockDecoder({decode: false})
.on('data', function (obj) { objs.push(obj); })
.on('end', function () {
assert.deepEqual(objs, [new Buffer([96])]);
cb();
});
encoder.pipe(decoder);
encoder.end(48);
});
test('deflated records', function (cb) {
var t = createType({
type: 'record',
name: 'Person',
fields: [
{name: 'name', type: 'string'},
{name: 'age', type: 'int'}
]
});
var Person = t.getRecordConstructor();
var p1 = [
new Person('Ann', 23),
new Person('Bob', 25)
];
var p2 = [];
var encoder = new streams.BlockEncoder(t, {codec: 'deflate'});
var decoder = new streams.BlockDecoder()
.on('data', function (obj) { p2.push(obj); })
.on('end', function () {
assert.deepEqual(p2, p1);
cb();
});
encoder.pipe(decoder);
var i, l;
for (i = 0, l = p1.length; i < l; i++) {
encoder.write(p1[i]);
}
encoder.end();
});
test('decompression error', function (cb) {
var t = createType('int');
var codecs = {
'null': function (data, cb) { cb(new Error('ouch')); }
};
var encoder = new streams.BlockEncoder(t, {codec: 'null'});
var decoder = new streams.BlockDecoder({codecs: codecs})
.on('error', function () { cb(); });
encoder.pipe(decoder);
encoder.end(1);
});
test('decompression late read', function (cb) {
var chunks = [];
var encoder = new streams.BlockEncoder(createType('int'));
var decoder = new streams.BlockDecoder();
encoder.pipe(decoder);
encoder.end(1);
decoder.on('data', function (chunk) { chunks.push(chunk); })
.on('end', function () {
assert.deepEqual(chunks, [1]);
cb();
});
});
});
test('createFileDecoder', function (cb) {
var n = 0;
var type = loadSchema(path.join(DPATH, 'Person.avsc'));
files.createFileDecoder(path.join(DPATH, 'person-10.avro'))
.on('metadata', function (writerType) {
assert.equal(writerType.toString(), type.toString());
})
.on('data', function (obj) {
n++;
assert(type.isValid(obj));
})
.on('end', function () {
assert.equal(n, 10);
cb();
});
});
test('createFileEncoder', function (cb) {
var type = createType({
type: 'record',
name: 'Person',
fields: [
{name: 'name', type: 'string'},
{name: 'age', type: 'int'}
]
});
var path = tmp.fileSync().name;
var encoder = files.createFileEncoder(path, type);
encoder.write({name: 'Ann', age: 32});
encoder.end({name: 'Bob', age: 33});
var n = 0;
encoder.on('finish', function () {
files.createFileDecoder(path)
.on('data', function (obj) {
n++;
assert(type.isValid(obj));
})
.on('end', function () {
assert.equal(n, 2);
cb();
});
});
});
test('extractFileHeader', function () {
var header;
var fpath = path.join(DPATH, 'person-10.avro');
header = files.extractFileHeader(fpath);
assert(header !== null);
assert.equal(typeof header.meta['avro.schema'], 'object');
header = files.extractFileHeader(fpath, {decode: false});
assert(Buffer.isBuffer(header.meta['avro.schema']));
header = files.extractFileHeader(fpath, {size: 2});
assert.equal(typeof header.meta['avro.schema'], 'object');
header = files.extractFileHeader(path.join(DPATH, 'person-10.avro.raw'));
assert(header === null);
header = files.extractFileHeader(
path.join(DPATH, 'person-10.no-codec.avro')
);
assert(header !== null);
});
});
// Helpers.
function loadSchema(path) {
return createType(JSON.parse(fs.readFileSync(path)));
}