blob: 274d72d99348e442eb0bff5a77ea08b9fd4f6e21 [file] [log] [blame]
package org.apache.hawq.pxf.service;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.hawq.pxf.api.BadRecordException;
import org.apache.hawq.pxf.api.OneField;
import org.apache.hawq.pxf.api.io.DataType;
import org.apache.hawq.pxf.service.io.BufferWritable;
import org.apache.hawq.pxf.service.io.GPDBWritable;
import org.apache.hawq.pxf.service.io.Writable;
import org.apache.hawq.pxf.service.utilities.ProtocolData;
import org.junit.Test;
public class BridgeOutputBuilderTest {
/**
* Test class to check the data inside BufferWritable.
*/
private class DataOutputToBytes implements DataOutput {
byte[] output;
public byte[] getOutput() {
return output;
}
@Override
public void write(int b) throws IOException {
throw new IOException("not implemented");
}
@Override
public void write(byte[] b) throws IOException {
output = b;
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
throw new IOException("not implemented");
}
@Override
public void writeBoolean(boolean v) throws IOException {
throw new IOException("not implemented");
}
@Override
public void writeByte(int v) throws IOException {
throw new IOException("not implemented");
}
@Override
public void writeShort(int v) throws IOException {
throw new IOException("not implemented");
}
@Override
public void writeChar(int v) throws IOException {
throw new IOException("not implemented");
}
@Override
public void writeInt(int v) throws IOException {
throw new IOException("not implemented");
}
@Override
public void writeLong(long v) throws IOException {
throw new IOException("not implemented");
}
@Override
public void writeFloat(float v) throws IOException {
throw new IOException("not implemented");
}
@Override
public void writeDouble(double v) throws IOException {
throw new IOException("not implemented");
}
@Override
public void writeBytes(String s) throws IOException {
throw new IOException("not implemented");
}
@Override
public void writeChars(String s) throws IOException {
throw new IOException("not implemented");
}
@Override
public void writeUTF(String s) throws IOException {
throw new IOException("not implemented");
}
}
private static final int UN_SUPPORTED_TYPE = -1;
private GPDBWritable output = null;
private DataOutputToBytes dos = new DataOutputToBytes();
@Test
public void testFillGPDBWritable() throws Exception {
Map<String, String> parameters = new HashMap<String, String>();
parameters.put("X-GP-ATTRS", "13");
addColumn(parameters, 0, DataType.INTEGER, "col0");
addColumn(parameters, 1, DataType.FLOAT8, "col1");
addColumn(parameters, 2, DataType.REAL, "col2");
addColumn(parameters, 3, DataType.BIGINT, "col3");
addColumn(parameters, 4, DataType.SMALLINT, "col4");
addColumn(parameters, 5, DataType.BOOLEAN, "col5");
addColumn(parameters, 6, DataType.BYTEA, "col6");
addColumn(parameters, 7, DataType.VARCHAR, "col7");
addColumn(parameters, 8, DataType.BPCHAR, "col8");
addColumn(parameters, 9, DataType.TEXT, "col9");
addColumn(parameters, 10, DataType.NUMERIC, "col10");
addColumn(parameters, 11, DataType.TIMESTAMP, "col11");
addColumn(parameters, 12, DataType.DATE, "col12");
BridgeOutputBuilder builder = makeBuilder(parameters);
output = builder.makeGPDBWritableOutput();
List<OneField> recFields = Arrays.asList(
new OneField(DataType.INTEGER.getOID(), 0), new OneField(
DataType.FLOAT8.getOID(), (double) 0), new OneField(
DataType.REAL.getOID(), (float) 0), new OneField(
DataType.BIGINT.getOID(), (long) 0), new OneField(
DataType.SMALLINT.getOID(), (short) 0), new OneField(
DataType.BOOLEAN.getOID(), true), new OneField(
DataType.BYTEA.getOID(), new byte[] { 0 }),
new OneField(DataType.VARCHAR.getOID(), "value"), new OneField(
DataType.BPCHAR.getOID(), "value"), new OneField(
DataType.TEXT.getOID(), "value"), new OneField(
DataType.NUMERIC.getOID(), "0"), new OneField(
DataType.TIMESTAMP.getOID(), new Timestamp(0)),
new OneField(DataType.DATE.getOID(), new Date(1)));
builder.fillGPDBWritable(recFields);
assertEquals(output.getInt(0), Integer.valueOf(0));
assertEquals(output.getDouble(1), Double.valueOf(0));
assertEquals(output.getFloat(2), Float.valueOf(0));
assertEquals(output.getLong(3), Long.valueOf(0));
assertEquals(output.getShort(4), Short.valueOf((short) 0));
assertEquals(output.getBoolean(5), true);
assertArrayEquals(output.getBytes(6), new byte[] { 0 });
assertEquals(output.getString(7), "value\0");
assertEquals(output.getString(8), "value\0");
assertEquals(output.getString(9), "value\0");
assertEquals(output.getString(10), "0\0");
assertEquals(Timestamp.valueOf(output.getString(11)), new Timestamp(0));
assertEquals(Date.valueOf(output.getString(12).trim()).toString(),
new Date(1).toString());
}
@Test
public void testFillOneGPDBWritableField() throws Exception {
Map<String, String> parameters = new HashMap<String, String>();
parameters.put("X-GP-ATTRS", "1");
addColumn(parameters, 0, DataType.INTEGER, "col0");
BridgeOutputBuilder builder = makeBuilder(parameters);
output = builder.makeGPDBWritableOutput();
OneField unSupportedField = new OneField(UN_SUPPORTED_TYPE, new Byte(
(byte) 0));
try {
builder.fillOneGPDBWritableField(unSupportedField, 0);
fail("Unsupported data type should throw exception");
} catch (UnsupportedOperationException e) {
assertEquals(e.getMessage(),
"Byte is not supported for HAWQ conversion");
}
}
@Test
public void testRecordSmallerThanSchema() throws Exception {
Map<String, String> parameters = new HashMap<String, String>();
parameters.put("X-GP-ATTRS", "4");
addColumn(parameters, 0, DataType.INTEGER, "col0");
addColumn(parameters, 1, DataType.INTEGER, "col1");
addColumn(parameters, 2, DataType.INTEGER, "col2");
addColumn(parameters, 3, DataType.INTEGER, "col3");
BridgeOutputBuilder builder = makeBuilder(parameters);
output = builder.makeGPDBWritableOutput();
/* all four fields */
List<OneField> complete = Arrays.asList(
new OneField(DataType.INTEGER.getOID(), 10), new OneField(
DataType.INTEGER.getOID(), 20), new OneField(
DataType.INTEGER.getOID(), 30), new OneField(
DataType.INTEGER.getOID(), 40));
builder.fillGPDBWritable(complete);
assertEquals(output.getColType().length, 4);
assertEquals(output.getInt(0), Integer.valueOf(10));
assertEquals(output.getInt(1), Integer.valueOf(20));
assertEquals(output.getInt(2), Integer.valueOf(30));
assertEquals(output.getInt(3), Integer.valueOf(40));
/* two fields instead of four */
List<OneField> incomplete = Arrays.asList(
new OneField(DataType.INTEGER.getOID(), 10), new OneField(
DataType.INTEGER.getOID(), 20));
try {
builder.fillGPDBWritable(incomplete);
fail("testRecordBiggerThanSchema should have failed on - Record has 2 fields but the schema size is 4");
} catch (BadRecordException e) {
assertEquals(e.getMessage(),
"Record has 2 fields but the schema size is 4");
}
}
@Test
public void testRecordBiggerThanSchema() throws Exception {
Map<String, String> parameters = new HashMap<String, String>();
parameters.put("X-GP-ATTRS", "4");
addColumn(parameters, 0, DataType.INTEGER, "col0");
addColumn(parameters, 1, DataType.INTEGER, "col1");
addColumn(parameters, 2, DataType.INTEGER, "col2");
addColumn(parameters, 3, DataType.INTEGER, "col3");
BridgeOutputBuilder builder = makeBuilder(parameters);
output = builder.makeGPDBWritableOutput();
/* five fields instead of four */
List<OneField> complete = Arrays.asList(
new OneField(DataType.INTEGER.getOID(), 10), new OneField(
DataType.INTEGER.getOID(), 20), new OneField(
DataType.INTEGER.getOID(), 30), new OneField(
DataType.INTEGER.getOID(), 40), new OneField(
DataType.INTEGER.getOID(), 50));
try {
builder.fillGPDBWritable(complete);
fail("testRecordBiggerThanSchema should have failed on - Record has 5 fields but the schema size is 4");
} catch (BadRecordException e) {
assertEquals(e.getMessage(),
"Record has 5 fields but the schema size is 4");
}
}
@Test
public void testFieldTypeMismatch() throws Exception {
Map<String, String> parameters = new HashMap<String, String>();
parameters.put("X-GP-ATTRS", "4");
addColumn(parameters, 0, DataType.INTEGER, "col0");
addColumn(parameters, 1, DataType.INTEGER, "col1");
addColumn(parameters, 2, DataType.INTEGER, "col2");
addColumn(parameters, 3, DataType.INTEGER, "col3");
BridgeOutputBuilder builder = makeBuilder(parameters);
output = builder.makeGPDBWritableOutput();
/* last field is REAL while schema requires INT */
List<OneField> complete = Arrays.asList(
new OneField(DataType.INTEGER.getOID(), 10), new OneField(
DataType.INTEGER.getOID(), 20), new OneField(
DataType.INTEGER.getOID(), 30), new OneField(
DataType.REAL.getOID(), 40.0));
try {
builder.fillGPDBWritable(complete);
fail("testFieldTypeMismatch should have failed on - For field 3 schema requires type INTEGER but input record has type REAL");
} catch (BadRecordException e) {
assertEquals(e.getMessage(),
"For field col3 schema requires type INTEGER but input record has type REAL");
}
}
@Test
public void convertTextDataToLines() throws Exception {
String data = "Que sara sara\n" + "Whatever will be will be\n"
+ "We are going\n" + "to Wembeley!\n";
byte[] dataBytes = data.getBytes();
String[] dataLines = new String[] {
"Que sara sara\n",
"Whatever will be will be\n",
"We are going\n",
"to Wembeley!\n" };
OneField field = new OneField(DataType.BYTEA.getOID(), dataBytes);
List<OneField> fields = new ArrayList<OneField>();
fields.add(field);
Map<String, String> parameters = new HashMap<String, String>();
parameters.put("X-GP-ATTRS", "1");
addColumn(parameters, 0, DataType.TEXT, "col0");
// activate sampling code
parameters.put("X-GP-OPTIONS-STATS-MAX-FRAGMENTS", "100");
parameters.put("X-GP-OPTIONS-STATS-SAMPLE-RATIO", "1.00");
BridgeOutputBuilder builder = makeBuilder(parameters);
LinkedList<Writable> outputQueue = builder.makeOutput(fields);
assertEquals(4, outputQueue.size());
for (int i = 0; i < dataLines.length; ++i) {
Writable line = outputQueue.get(i);
compareBufferWritable(line, dataLines[i]);
}
assertNull(builder.getPartialLine());
}
@Test
public void convertTextDataToLinesPartial() throws Exception {
String data = "oh well\n" + "what the hell";
OneField field = new OneField(DataType.BYTEA.getOID(), data.getBytes());
List<OneField> fields = new ArrayList<OneField>();
fields.add(field);
Map<String, String> parameters = new HashMap<String, String>();
parameters.put("X-GP-ATTRS", "1");
addColumn(parameters, 0, DataType.TEXT, "col0");
// activate sampling code
parameters.put("X-GP-OPTIONS-STATS-MAX-FRAGMENTS", "100");
parameters.put("X-GP-OPTIONS-STATS-SAMPLE-RATIO", "1.00");
BridgeOutputBuilder builder = makeBuilder(parameters);
LinkedList<Writable> outputQueue = builder.makeOutput(fields);
assertEquals(1, outputQueue.size());
Writable line = outputQueue.get(0);
compareBufferWritable(line, "oh well\n");
Writable partial = builder.getPartialLine();
assertNotNull(partial);
compareBufferWritable(partial, "what the hell");
// check that append works
data = " but the show must go on\n" + "!!!\n";
field = new OneField(DataType.BYTEA.getOID(), data.getBytes());
fields.clear();
fields.add(field);
outputQueue = builder.makeOutput(fields);
assertNull(builder.getPartialLine());
assertEquals(2, outputQueue.size());
line = outputQueue.get(0);
compareBufferWritable(line, "what the hell but the show must go on\n");
line = outputQueue.get(1);
compareBufferWritable(line, "!!!\n");
// check that several partial lines gets appended to each other
data = "I want to ride my bicycle\n" + "I want to ride my bike";
field = new OneField(DataType.BYTEA.getOID(), data.getBytes());
fields.clear();
fields.add(field);
outputQueue = builder.makeOutput(fields);
assertEquals(1, outputQueue.size());
line = outputQueue.get(0);
compareBufferWritable(line, "I want to ride my bicycle\n");
partial = builder.getPartialLine();
assertNotNull(partial);
compareBufferWritable(partial, "I want to ride my bike");
// data consisting of one long line
data = " I want to ride my bicycle";
field = new OneField(DataType.BYTEA.getOID(), data.getBytes());
fields.clear();
fields.add(field);
outputQueue = builder.makeOutput(fields);
assertEquals(0, outputQueue.size());
partial = builder.getPartialLine();
assertNotNull(partial);
compareBufferWritable(partial,
"I want to ride my bike I want to ride my bicycle");
// data with lines
data = " bicycle BICYCLE\n" + "bicycle BICYCLE\n";
field = new OneField(DataType.BYTEA.getOID(), data.getBytes());
fields.clear();
fields.add(field);
outputQueue = builder.makeOutput(fields);
assertEquals(2, outputQueue.size());
line = outputQueue.get(0);
compareBufferWritable(line,
"I want to ride my bike I want to ride my bicycle bicycle BICYCLE\n");
line = outputQueue.get(1);
compareBufferWritable(line, "bicycle BICYCLE\n");
partial = builder.getPartialLine();
assertNull(partial);
}
private void compareBufferWritable(Writable line, String expected)
throws IOException {
assertTrue(line instanceof BufferWritable);
line.write(dos);
assertArrayEquals(expected.getBytes(), dos.getOutput());
}
private void addColumn(Map<String, String> parameters, int idx,
DataType dataType, String name) {
parameters.put("X-GP-ATTR-NAME" + idx, name);
parameters.put("X-GP-ATTR-TYPECODE" + idx,
Integer.toString(dataType.getOID()));
parameters.put("X-GP-ATTR-TYPENAME" + idx, dataType.toString());
}
private BridgeOutputBuilder makeBuilder(Map<String, String> parameters)
throws Exception {
parameters.put("X-GP-ALIGNMENT", "8");
parameters.put("X-GP-SEGMENT-ID", "-44");
parameters.put("X-GP-SEGMENT-COUNT", "2");
parameters.put("X-GP-HAS-FILTER", "0");
parameters.put("X-GP-FORMAT", "TEXT");
parameters.put("X-GP-URL-HOST", "my://bags");
parameters.put("X-GP-URL-PORT", "-8020");
parameters.put("X-GP-OPTIONS-ACCESSOR", "are");
parameters.put("X-GP-OPTIONS-RESOLVER", "packed");
parameters.put("X-GP-DATA-DIR", "i'm/ready/to/go");
parameters.put("X-GP-FRAGMENT-METADATA", "U29tZXRoaW5nIGluIHRoZSB3YXk=");
parameters.put("X-GP-OPTIONS-I'M-STANDING-HERE", "outside-your-door");
parameters.put("X-GP-USER", "alex");
ProtocolData protocolData = new ProtocolData(parameters);
BridgeOutputBuilder builder = new BridgeOutputBuilder(protocolData);
return builder;
}
}