blob: 779869d237e027918435dab4c2a7eec12c9b8406 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.bson;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.time.ZoneOffset;
import org.apache.drill.exec.memory.RootAllocator;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.BufferManager;
import org.apache.drill.exec.ops.BufferManagerImpl;
import org.apache.drill.exec.store.TestOutputMutator;
import org.apache.drill.exec.vector.complex.impl.SingleMapReaderImpl;
import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
import org.apache.drill.exec.vector.complex.reader.FieldReader;
import org.apache.drill.test.BaseTest;
import org.bson.BsonBinary;
import org.bson.BsonBinarySubType;
import org.bson.BsonBoolean;
import org.bson.BsonDateTime;
import org.bson.BsonDecimal128;
import org.bson.BsonDocument;
import org.bson.BsonDocumentReader;
import org.bson.BsonDocumentWriter;
import org.bson.BsonDouble;
import org.bson.BsonInt64;
import org.bson.BsonNull;
import org.bson.BsonObjectId;
import org.bson.BsonString;
import org.bson.BsonSymbol;
import org.bson.BsonTimestamp;
import org.bson.BsonWriter;
import org.bson.types.Decimal128;
import org.bson.types.ObjectId;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestBsonRecordReader extends BaseTest {
private BufferAllocator allocator;
private VectorContainerWriter writer;
private BufferManager bufferManager;
private BsonRecordReader bsonReader;
@Before
public void setUp() {
allocator = new RootAllocator(Long.MAX_VALUE);
TestOutputMutator mutator = new TestOutputMutator(allocator);
writer = new VectorContainerWriter(mutator);
bufferManager = new BufferManagerImpl(allocator);
bsonReader = new BsonRecordReader(bufferManager.getManagedBuffer(1024), false, false);
}
@Test
public void testIntType() throws IOException {
BsonDocument bsonDoc = new BsonDocument();
bsonDoc.append("seqNo", new BsonInt64(10));
writer.reset();
bsonReader.write(writer, new BsonDocumentReader(bsonDoc));
SingleMapReaderImpl mapReader = (SingleMapReaderImpl) writer.getMapVector().getReader();
assertEquals(10L, mapReader.reader("seqNo").readLong().longValue());
}
@Test
public void testTimeStampType() throws IOException {
BsonDocument bsonDoc = new BsonDocument();
bsonDoc.append("ts_small", new BsonTimestamp(1000, 10));
bsonDoc.append("ts_large", new BsonTimestamp(1000000000, 10));
writer.reset();
bsonReader.write(writer, new BsonDocumentReader(bsonDoc));
SingleMapReaderImpl mapReader = (SingleMapReaderImpl) writer.getMapVector().getReader();
assertEquals(1000000L, mapReader.reader("ts_small").readLocalDateTime().atZone(ZoneOffset.systemDefault()).toInstant().toEpochMilli());
assertEquals(1000000000000L, mapReader.reader("ts_large").readLocalDateTime().atZone(ZoneOffset.systemDefault()).toInstant().toEpochMilli());
}
@Test
public void testSymbolType() throws IOException {
BsonDocument bsonDoc = new BsonDocument();
bsonDoc.append("symbolKey", new BsonSymbol("test_symbol"));
writer.reset();
bsonReader.write(writer, new BsonDocumentReader(bsonDoc));
SingleMapReaderImpl mapReader = (SingleMapReaderImpl) writer.getMapVector().getReader();
assertEquals("test_symbol", mapReader.reader("symbolKey").readText().toString());
}
@Test
public void testStringType() throws IOException {
BsonDocument bsonDoc = new BsonDocument();
bsonDoc.append("stringKey", new BsonString("test_string"));
writer.reset();
bsonReader.write(writer, new BsonDocumentReader(bsonDoc));
SingleMapReaderImpl mapReader = (SingleMapReaderImpl) writer.getMapVector().getReader();
assertEquals("test_string", mapReader.reader("stringKey").readText().toString());
}
@Test
public void testSpecialCharStringType() throws IOException {
BsonDocument bsonDoc = new BsonDocument();
bsonDoc.append("stringKey", new BsonString("§§§§§§§§§1"));
writer.reset();
bsonReader.write(writer, new BsonDocumentReader(bsonDoc));
SingleMapReaderImpl mapReader = (SingleMapReaderImpl) writer.getMapVector().getReader();
assertEquals("§§§§§§§§§1",
mapReader.reader("stringKey").readText().toString());
}
@Test
public void testObjectIdType() throws IOException {
BsonDocument bsonDoc = new BsonDocument();
BsonObjectId value = new BsonObjectId(new ObjectId());
bsonDoc.append("_idKey", value);
writer.reset();
bsonReader.write(writer, new BsonDocumentReader(bsonDoc));
SingleMapReaderImpl mapReader = (SingleMapReaderImpl) writer.getMapVector().getReader();
byte[] readByteArray = mapReader.reader("_idKey").readByteArray();
assertArrayEquals(value.getValue().toByteArray(), readByteArray);
}
@Test
public void testNullType() throws IOException {
BsonDocument bsonDoc = new BsonDocument();
bsonDoc.append("nullKey", new BsonNull());
writer.reset();
bsonReader.write(writer, new BsonDocumentReader(bsonDoc));
SingleMapReaderImpl mapReader = (SingleMapReaderImpl) writer.getMapVector().getReader();
assertNull(mapReader.reader("nullKey").readObject());
}
@Test
public void testDoubleType() throws IOException {
BsonDocument bsonDoc = new BsonDocument();
bsonDoc.append("doubleKey", new BsonDouble(12.35));
writer.reset();
bsonReader.write(writer, new BsonDocumentReader(bsonDoc));
SingleMapReaderImpl mapReader = (SingleMapReaderImpl) writer.getMapVector().getReader();
assertEquals(12.35d, mapReader.reader("doubleKey").readDouble(), 0.00001);
}
@Test
public void testArrayOfDocumentType() throws IOException {
BsonDocument bsonDoc = new BsonDocument();
BsonWriter bw = new BsonDocumentWriter(bsonDoc);
bw.writeStartDocument();
bw.writeName("a");
bw.writeString("MongoDB");
bw.writeName("b");
bw.writeStartArray();
bw.writeStartDocument();
bw.writeName("c");
bw.writeInt32(1);
bw.writeEndDocument();
bw.writeEndArray();
bw.writeEndDocument();
bw.flush();
writer.reset();
bsonReader.write(writer, new BsonDocumentReader(bsonDoc));
FieldReader reader = writer.getMapVector().getReader();
SingleMapReaderImpl mapReader = (SingleMapReaderImpl) reader;
FieldReader reader3 = mapReader.reader("b");
assertEquals("MongoDB", mapReader.reader("a").readText().toString());
}
@Test
public void testRecursiveDocuments() throws IOException {
BsonDocument topDoc = new BsonDocument();
final int count = 3;
for (int i = 0; i < count; ++i) {
BsonDocument bsonDoc = new BsonDocument();
BsonWriter bw = new BsonDocumentWriter(bsonDoc);
bw.writeStartDocument();
bw.writeName("k1" + i);
bw.writeString("drillMongo1" + i);
bw.writeName("k2" + i);
bw.writeString("drillMongo2" + i);
bw.writeEndDocument();
bw.flush();
topDoc.append("doc" + i, bsonDoc);
}
writer.reset();
bsonReader.write(writer, new BsonDocumentReader(topDoc));
SingleMapReaderImpl mapReader = (SingleMapReaderImpl) writer.getMapVector().getReader();
for (int i = 0; i < count; ++i) {
SingleMapReaderImpl reader = (SingleMapReaderImpl) mapReader.reader("doc" + i);
assertEquals("drillMongo1" + i, reader.reader("k1" + i).readText().toString());
assertEquals("drillMongo2" + i, reader.reader("k2" + i).readText().toString());
}
}
@Test
public void testDateTimeType() throws IOException {
BsonDocument bsonDoc = new BsonDocument();
bsonDoc.append("dateTimeKey", new BsonDateTime(5262729712L));
writer.reset();
bsonReader.write(writer, new BsonDocumentReader(bsonDoc));
SingleMapReaderImpl mapReader = (SingleMapReaderImpl) writer.getMapVector().getReader();
assertEquals(5262729712L, mapReader.reader("dateTimeKey").readLocalDateTime().atZone(ZoneOffset.systemDefault()).toInstant().toEpochMilli());
}
@Test
public void testBooleanType() throws IOException {
BsonDocument bsonDoc = new BsonDocument();
bsonDoc.append("booleanKey", new BsonBoolean(true));
writer.reset();
bsonReader.write(writer, new BsonDocumentReader(bsonDoc));
SingleMapReaderImpl mapReader = (SingleMapReaderImpl) writer.getMapVector().getReader();
assertTrue(mapReader.reader("booleanKey").readBoolean());
}
@Test
public void testBinaryTypes() throws IOException {
// test with different binary types
BsonDocument bsonDoc = new BsonDocument();
// Binary
// String
byte[] bytes = "binaryValue".getBytes(StandardCharsets.UTF_8);
bsonDoc.append("binaryKey", new BsonBinary(BsonBinarySubType.BINARY, bytes));
// String
byte[] bytesString = "binaryStringValue".getBytes(StandardCharsets.UTF_8);
bsonDoc.append("binaryStringKey", new BsonBinary((byte) 2, bytesString));
// Double
byte[] bytesDouble = new byte[8];
java.nio.ByteBuffer.wrap(bytesDouble).putDouble(23.0123);
BsonBinary bsonDouble = new BsonBinary((byte) 1, bytesDouble);
bsonDoc.append("binaryDouble", bsonDouble);
// Boolean
byte[] booleanBytes = new byte[8];
java.nio.ByteBuffer.wrap(booleanBytes).put((byte) 1);
BsonBinary bsonBoolean = new BsonBinary((byte) 8, booleanBytes);
bsonDoc.append("bsonBoolean", bsonBoolean);
writer.reset();
bsonReader.write(writer, new BsonDocumentReader(bsonDoc));
SingleMapReaderImpl mapReader = (SingleMapReaderImpl) writer.getMapVector().getReader();
assertArrayEquals(bytes, mapReader.reader("binaryKey").readByteArray());
assertEquals("binaryStringValue", mapReader.reader("binaryStringKey").readText().toString());
assertEquals(23.0123, mapReader.reader("binaryDouble").readDouble(), 0);
FieldReader reader = mapReader.reader("bsonBoolean");
assertEquals(true, reader.readBoolean());
}
@Test
public void testArrayType() throws IOException {
BsonDocument bsonDoc = new BsonDocument();
BsonWriter bw = new BsonDocumentWriter(bsonDoc);
bw.writeStartDocument();
bw.writeName("arrayKey");
bw.writeStartArray();
bw.writeInt32(1);
bw.writeInt32(2);
bw.writeInt32(3);
bw.writeEndArray();
bw.writeEndDocument();
bw.flush();
bsonReader.write(writer, new BsonDocumentReader(bsonDoc));
SingleMapReaderImpl mapReader = (SingleMapReaderImpl) writer.getMapVector().getReader();
FieldReader reader = mapReader.reader("arrayKey");
assertEquals(3, reader.size());
}
@Test
public void testDecimal128Type() throws IOException {
BsonDocument bsonDoc = new BsonDocument();
bsonDoc.append("decimal128Key", new BsonDecimal128(Decimal128.parse("12.12345624")));
writer.reset();
bsonReader.write(writer, new BsonDocumentReader(bsonDoc));
SingleMapReaderImpl mapReader = (SingleMapReaderImpl) writer.getMapVector().getReader();
assertEquals(new BigDecimal("12.12345624"), mapReader.reader("decimal128Key").readBigDecimal());
}
@After
public void cleanUp() {
try {
writer.close();
} catch (Exception e) {
// noop
}
bufferManager.close();
allocator.close();
}
}