blob: 20d6151bac0855e1bc70a7781e739d131d65a3bd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.testutil;
import org.junit.After;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.util.ArrayList;
import org.apache.avro.file.DataFileConstants;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.mapred.FsInput;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.sqoop.io.CodecMap;
import org.apache.sqoop.lib.BlobRef;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
* Tests BLOB/CLOB import for Avro.
*/
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public abstract class LobAvroImportTestCase extends ImportJobTestCase {
private Log log;
public LobAvroImportTestCase() {
this.log = LogFactory.getLog(LobAvroImportTestCase.class.getName());
}
/**
* @return the Log object to use for reporting during this test
*/
protected abstract Log getLogger();
/**
* @return a "friendly" name for the database. e.g "mysql" or "oracle".
*/
protected abstract String getDbFriendlyName();
@Override
protected String getTablePrefix() {
return "LOB_" + getDbFriendlyName().toUpperCase() + "_";
}
@Override
protected boolean useHsqldbTestServer() {
// Hsqldb does not support BLOB/CLOB
return false;
}
@After
public void tearDown() {
try {
// Clean up the database on our way out.
dropTableIfExists(getTableName());
} catch (SQLException e) {
log.warn("Error trying to drop table '" + getTableName()
+ "' on tearDown: " + e);
}
super.tearDown();
}
protected String [] getArgv(String ... additionalArgs) {
// Import every column of the table
String [] colNames = getColNames();
String splitByCol = colNames[0];
String columnsString = "";
for (String col : colNames) {
columnsString += col + ",";
}
ArrayList<String> args = new ArrayList<String>();
CommonArgs.addHadoopFlags(args);
args.add("--table");
args.add(getTableName());
args.add("--columns");
args.add(columnsString);
args.add("--split-by");
args.add(splitByCol);
args.add("--warehouse-dir");
args.add(getWarehouseDir());
args.add("--connect");
args.add(getConnectString());
args.add("--as-avrodatafile");
args.add("--num-mappers");
args.add("1");
for (String arg : additionalArgs) {
args.add(arg);
}
return args.toArray(new String[0]);
}
protected String getBlobType() {
return "BLOB";
}
protected String getBlobInsertStr(String blobData) {
return "'" + blobData + "'";
}
/**
* Return the current table number as a string. In test, table number is used
* to name .lob files.
* @return current table number.
*/
private String getTableNum() {
return getTableName().substring(getTablePrefix().length());
}
/**
* Return an instance of DataFileReader for the given filename.
* @param filename path that we're opening a reader for.
* @return instance of DataFileReader.
* @throws IOException
*/
private DataFileReader<GenericRecord> read(Path filename)
throws IOException {
Configuration conf = getConf();
if (!BaseSqoopTestCase.isOnPhysicalCluster()) {
conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS);
}
FsInput fsInput = new FsInput(filename, conf);
DatumReader<GenericRecord> datumReader =
new GenericDatumReader<GenericRecord>();
return new DataFileReader<GenericRecord>(fsInput, datumReader);
}
/** Import blob data that is smaller than inline lob limit. Blob data
* should be saved as Avro bytes.
* @throws IOException
* @throws SQLException
*/
@Test
public void testBlobAvroImportInline() throws IOException, SQLException {
String [] types = { getBlobType() };
String expectedVal = "This is short BLOB data";
String [] vals = { getBlobInsertStr(expectedVal) };
createTableWithColTypes(types, vals);
runImport(getArgv());
Path outputFile = new Path(getTablePath(), "part-m-00000.avro");
DataFileReader<GenericRecord> reader = read(outputFile);
GenericRecord record = reader.next();
// Verify that blob data is imported as Avro bytes.
ByteBuffer buf = (ByteBuffer) record.get(getColName(0));
String returnVal = new String(buf.array());
assertEquals(getColName(0), expectedVal, returnVal);
}
/**
* Import blob data that is larger than inline lob limit. The reference file
* should be saved as Avro bytes. Blob data should be saved in LOB file
* format.
* @throws IOException
* @throws SQLException
*/
@Test
public void testBlobAvroImportExternal() throws IOException, SQLException {
String [] types = { getBlobType() };
String data = "This is short BLOB data";
String [] vals = { getBlobInsertStr(data) };
createTableWithColTypes(types, vals);
// Set inline lob limit to a small value so that blob data will be
// written to an external file.
runImport(getArgv("--inline-lob-limit", "1"));
Path outputFile = new Path(getTablePath(), "part-m-00000.avro");
DataFileReader<GenericRecord> reader = read(outputFile);
GenericRecord record = reader.next();
// Verify that the reference file is written in Avro bytes.
ByteBuffer buf = (ByteBuffer) record.get(getColName(0));
String returnVal = new String(buf.array());
String expectedStart = "externalLob(lf,_lob/large_obj";
String expectedEnd = getTableNum() + "_m_0000000.lob,68,"
+ data.length() + ")";
assertNotNull(returnVal);
assertTrue("ExpectedStart: " + expectedStart + ", value: " + returnVal, returnVal.startsWith(expectedStart));
assertTrue("ExpectedEnd: " + expectedEnd + ", value: " + returnVal, returnVal.endsWith(expectedEnd));
// Verify that blob data stored in the external lob file is correct.
BlobRef br = BlobRef.parse(returnVal);
Path lobFileDir = new Path(getWarehouseDir(), getTableName());
InputStream in = br.getDataStream(getConf(), lobFileDir);
byte [] bufArray = new byte[data.length()];
int chars = in.read(bufArray);
in.close();
assertEquals(chars, data.length());
returnVal = new String(bufArray);
String expectedVal = data;
assertEquals(getColName(0), returnVal, expectedVal);
}
/**
* Import blob data that is smaller than inline lob limit and compress with
* deflate codec. Blob data should be encoded and saved as Avro bytes.
* @throws IOException
* @throws SQLException
*/
@Test
public void testBlobCompressedAvroImportInline()
throws IOException, SQLException {
String [] types = { getBlobType() };
String expectedVal = "This is short BLOB data";
String [] vals = { getBlobInsertStr(expectedVal) };
createTableWithColTypes(types, vals);
runImport(getArgv("--compression-codec", CodecMap.DEFLATE));
Path outputFile = new Path(getTablePath(), "part-m-00000.avro");
DataFileReader<GenericRecord> reader = read(outputFile);
GenericRecord record = reader.next();
// Verify that the data block of the Avro file is compressed with deflate
// codec.
assertEquals(CodecMap.DEFLATE,
reader.getMetaString(DataFileConstants.CODEC));
// Verify that all columns are imported correctly.
ByteBuffer buf = (ByteBuffer) record.get(getColName(0));
String returnVal = new String(buf.array());
assertEquals(getColName(0), expectedVal, returnVal);
}
/**
* Import blob data that is larger than inline lob limit and compress with
* deflate codec. The reference file should be encoded and saved as Avro
* bytes. Blob data should be saved in LOB file format without compression.
* @throws IOException
* @throws SQLException
*/
@Test
public void testBlobCompressedAvroImportExternal()
throws IOException, SQLException {
String [] types = { getBlobType() };
String data = "This is short BLOB data";
String [] vals = { getBlobInsertStr(data) };
createTableWithColTypes(types, vals);
// Set inline lob limit to a small value so that blob data will be
// written to an external file.
runImport(getArgv(
"--inline-lob-limit", "1", "--compression-codec", CodecMap.DEFLATE));
Path outputFile = new Path(getTablePath(), "part-m-00000.avro");
DataFileReader<GenericRecord> reader = read(outputFile);
GenericRecord record = reader.next();
// Verify that the data block of the Avro file is compressed with deflate
// codec.
assertEquals(CodecMap.DEFLATE,
reader.getMetaString(DataFileConstants.CODEC));
// Verify that the reference file is written in Avro bytes.
ByteBuffer buf = (ByteBuffer) record.get(getColName(0));
String returnVal = new String(buf.array());
String expectedStart = "externalLob(lf,_lob/large_obj";
String expectedEnd = getTableNum() + "_m_0000000.lob,68,"
+ data.length() + ")";
assertNotNull(returnVal);
assertTrue("ExpectedStart: " + expectedStart + ", value: " + returnVal, returnVal.startsWith(expectedStart));
assertTrue("ExpectedEnd: " + expectedEnd + ", value: " + returnVal, returnVal.endsWith(expectedEnd));
// Verify that blob data stored in the external lob file is correct.
BlobRef br = BlobRef.parse(returnVal);
Path lobFileDir = new Path(getWarehouseDir(), getTableName());
InputStream in = br.getDataStream(getConf(), lobFileDir);
byte [] bufArray = new byte[data.length()];
int chars = in.read(bufArray);
in.close();
assertEquals(chars, data.length());
returnVal = new String(bufArray);
String expectedVal = data;
assertEquals(getColName(0), returnVal, expectedVal);
}
/**
* Import multiple columns of blob data. Blob data should be saved as Avro
* bytes.
* @throws IOException
* @throws SQLException
*/
@Test
public void testBlobAvroImportMultiCols() throws IOException, SQLException {
String [] types = { getBlobType(), getBlobType(), getBlobType(), };
String expectedVal1 = "This is short BLOB data1";
String expectedVal2 = "This is short BLOB data2";
String expectedVal3 = "This is short BLOB data3";
String [] vals = { getBlobInsertStr(expectedVal1),
getBlobInsertStr(expectedVal2),
getBlobInsertStr(expectedVal3), };
createTableWithColTypes(types, vals);
runImport(getArgv());
Path outputFile = new Path(getTablePath(), "part-m-00000.avro");
DataFileReader<GenericRecord> reader = read(outputFile);
GenericRecord record = reader.next();
// Verify that all columns are imported correctly.
ByteBuffer buf = (ByteBuffer) record.get(getColName(0));
String returnVal = new String(buf.array());
assertEquals(getColName(0), expectedVal1, returnVal);
buf = (ByteBuffer) record.get(getColName(1));
returnVal = new String(buf.array());
assertEquals(getColName(1), expectedVal2, returnVal);
buf = (ByteBuffer) record.get(getColName(2));
returnVal = new String(buf.array());
assertEquals(getColName(2), expectedVal3, returnVal);
}
@Test
public void testClobAvroImportInline() throws IOException, SQLException {
// TODO: add tests for CLOB support for Avro import
}
@Test
public void testClobAvroImportExternal() throws IOException, SQLException {
// TODO: add tests for CLOB support for Avro import
}
@Test
public void testClobCompressedAvroImportInline()
throws IOException, SQLException {
// TODO: add tests for CLOB support for Avro import
}
@Test
public void testClobCompressedAvroImportExternal()
throws IOException, SQLException {
// TODO: add tests for CLOB support for Avro import
}
@Test
public void testClobAvroImportMultiCols() throws IOException, SQLException {
// TODO: add tests for CLOB support for Avro import
}
}