blob: 58b3ae2bd497c110ff68fc6563bb25e0789a5010 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hive.streaming;
import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.BUCKET_COUNT;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileFilter;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.Validator;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
import org.apache.hadoop.hive.metastore.api.TxnInfo;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService;
import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.DriverFactory;
import org.apache.hadoop.hive.ql.IDriver;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.BucketCodec;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.ql.io.orc.RecordReader;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.txn.compactor.Worker;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableLongObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.orc.impl.OrcAcidUtils;
import org.apache.orc.tools.FileDump;
import org.apache.thrift.TException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestStreaming {
private static final Logger LOG = LoggerFactory.getLogger(TestStreaming.class);
public static class RawFileSystem extends RawLocalFileSystem {
private static final URI NAME;
static {
try {
NAME = new URI("raw:///");
} catch (URISyntaxException se) {
throw new IllegalArgumentException("bad uri", se);
}
}
@Override
public URI getUri() {
return NAME;
}
@Override
public String getScheme() {
return "raw";
}
@Override
public FileStatus getFileStatus(Path path) throws IOException {
File file = pathToFile(path);
if (!file.exists()) {
throw new FileNotFoundException("Can'table find " + path);
}
// get close enough
short mod = 0;
if (file.canRead()) {
mod |= 0444;
}
if (file.canWrite()) {
mod |= 0200;
}
if (file.canExecute()) {
mod |= 0111;
}
return new FileStatus(file.length(), file.isDirectory(),
1, 1024,
file.lastModified(), file.lastModified(),
FsPermission.createImmutable(mod), "owen", "users", path);
}
}
private static final String COL1 = "id";
private static final String COL2 = "msg";
private static HiveConf conf = null;
private IDriver driver;
private final IMetaStoreClient msClient;
// partitioned table
private final static String dbName = "testing";
private final static String tblName = "alerts";
private final static String[] fieldNames = new String[]{COL1, COL2};
static List<String> partitionVals;
private static Path partLoc;
private static Path partLoc2;
// unpartitioned table
private final static String dbName2 = "testing2";
private final static String tblName2 = "alerts";
private final static String[] fieldNames2 = new String[]{COL1, COL2};
// for bucket join testing
private final static String dbName3 = "testing3";
private final static String tblName3 = "dimensionTable";
private final static String dbName4 = "testing4";
private final static String tblName4 = "factTable";
List<String> partitionVals2;
private final String PART1_CONTINENT = "Asia";
private final String PART1_COUNTRY = "India";
@Rule
public TemporaryFolder dbFolder = new TemporaryFolder();
public TestStreaming() throws Exception {
partitionVals = new ArrayList<String>(2);
partitionVals.add(PART1_CONTINENT);
partitionVals.add(PART1_COUNTRY);
partitionVals2 = new ArrayList<String>(1);
partitionVals2.add(PART1_COUNTRY);
conf = new HiveConf(this.getClass());
conf.set("fs.raw.impl", RawFileSystem.class.getName());
conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
TxnDbUtil.setConfValues(conf);
conf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true);
conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
dbFolder.create();
//1) Start from a clean slate (metastore)
TxnDbUtil.cleanDb(conf);
TxnDbUtil.prepDb(conf);
//2) obtain metastore clients
msClient = new HiveMetaStoreClient(conf);
}
@Before
public void setup() throws Exception {
SessionState.start(new CliSessionState(conf));
driver = DriverFactory.newDriver(conf);
driver.setMaxRows(200002);//make sure Driver returns all results
// drop and recreate the necessary databases and tables
dropDB(msClient, dbName);
String[] colNames = new String[]{COL1, COL2};
String[] colTypes = new String[]{serdeConstants.INT_TYPE_NAME, serdeConstants.STRING_TYPE_NAME};
String[] bucketCols = new String[]{COL1};
String loc1 = dbFolder.newFolder(dbName + ".db").toString();
String[] partNames = new String[]{"Continent", "Country"};
partLoc = createDbAndTable(driver, dbName, tblName, partitionVals, colNames, colTypes, bucketCols, partNames, loc1,
1);
dropDB(msClient, dbName2);
String loc2 = dbFolder.newFolder(dbName2 + ".db").toString();
partLoc2 = createDbAndTable(driver, dbName2, tblName2, null, colNames, colTypes, bucketCols, null, loc2, 2);
String loc3 = dbFolder.newFolder("testing5.db").toString();
createStoreSales("testing5", loc3);
runDDL(driver, "drop table testBucketing3.streamedtable");
runDDL(driver, "drop table testBucketing3.finaltable");
runDDL(driver, "drop table testBucketing3.nobucket");
}
@After
public void cleanup() {
msClient.close();
driver.close();
}
private void createStoreSales(String dbName, String loc) throws Exception {
String dbUri = "raw://" + new Path(loc).toUri().toString();
String tableLoc = dbUri + Path.SEPARATOR + "store_sales";
boolean success = runDDL(driver, "create database IF NOT EXISTS " + dbName + " location '" + dbUri + "'");
Assert.assertTrue(success);
success = runDDL(driver, "use " + dbName);
Assert.assertTrue(success);
success = runDDL(driver, "drop table if exists store_sales");
Assert.assertTrue(success);
success = runDDL(driver, "create table store_sales\n" +
"(\n" +
" ss_sold_date_sk int,\n" +
" ss_sold_time_sk int,\n" +
" ss_item_sk int,\n" +
" ss_customer_sk int,\n" +
" ss_cdemo_sk int,\n" +
" ss_hdemo_sk int,\n" +
" ss_addr_sk int,\n" +
" ss_store_sk int,\n" +
" ss_promo_sk int,\n" +
" ss_ticket_number int,\n" +
" ss_quantity int,\n" +
" ss_wholesale_cost decimal(7,2),\n" +
" ss_list_price decimal(7,2),\n" +
" ss_sales_price decimal(7,2),\n" +
" ss_ext_discount_amt decimal(7,2),\n" +
" ss_ext_sales_price decimal(7,2),\n" +
" ss_ext_wholesale_cost decimal(7,2),\n" +
" ss_ext_list_price decimal(7,2),\n" +
" ss_ext_tax decimal(7,2),\n" +
" ss_coupon_amt decimal(7,2),\n" +
" ss_net_paid decimal(7,2),\n" +
" ss_net_paid_inc_tax decimal(7,2),\n" +
" ss_net_profit decimal(7,2)\n" +
")\n" +
" partitioned by (dt string)\n" +
"clustered by (ss_store_sk, ss_promo_sk)\n" +
"INTO 4 BUCKETS stored as orc " + " location '" + tableLoc + "'" +
" TBLPROPERTIES ('orc.compress'='NONE', 'transactional'='true')");
Assert.assertTrue(success);
success = runDDL(driver, "alter table store_sales add partition(dt='2015')");
Assert.assertTrue(success);
}
/**
* make sure it works with table where bucket col is not 1st col
*
* @throws Exception
*/
@Test
public void testBucketingWhereBucketColIsNotFirstCol() throws Exception {
List<String> partitionVals = new ArrayList<String>();
partitionVals.add("2015");
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase("testing5")
.withTable("store_sales")
.withStaticPartitionValues(partitionVals)
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withRecordWriter(writer)
.withHiveConf(conf)
.connect();
connection.beginTransaction();
StringBuilder row = new StringBuilder();
for (int i = 0; i < 10; i++) {
for (int ints = 0; ints < 11; ints++) {
row.append(ints).append(',');
}
for (int decs = 0; decs < 12; decs++) {
row.append(i + 0.1).append(',');
}
row.setLength(row.length() - 1);
connection.write(row.toString().getBytes());
}
connection.commitTransaction();
connection.close();
ArrayList<String> res = queryTable(driver, "select row__id.bucketid, * from testing5.store_sales");
for (String re : res) {
System.out.println(re);
}
}
/**
* Test that streaming can write to unbucketed table.
*/
@Test
public void testNoBuckets() throws Exception {
queryTable(driver, "drop table if exists default.streamingnobuckets");
queryTable(driver, "create table default.streamingnobuckets (a string, b string) stored as orc " +
"TBLPROPERTIES('transactional'='true')");
queryTable(driver, "insert into default.streamingnobuckets values('foo','bar')");
List<String> rs = queryTable(driver, "select * from default.streamingnobuckets");
Assert.assertEquals(1, rs.size());
Assert.assertEquals("foo\tbar", rs.get(0));
StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase("Default")
.withTable("streamingNoBuckets")
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withTransactionBatchSize(2)
.withRecordWriter(wr)
.withHiveConf(conf)
.connect();
connection.beginTransaction();
connection.write("a1,b2".getBytes());
connection.write("a3,b4".getBytes());
TxnStore txnHandler = TxnUtils.getTxnStore(conf);
ShowLocksResponse resp = txnHandler.showLocks(new ShowLocksRequest());
Assert.assertEquals(resp.getLocksSize(), 1);
Assert.assertEquals("streamingnobuckets", resp.getLocks().get(0).getTablename());
Assert.assertEquals("default", resp.getLocks().get(0).getDbname());
connection.commitTransaction();
connection.beginTransaction();
connection.write("a5,b6".getBytes());
connection.write("a7,b8".getBytes());
connection.commitTransaction();
connection.close();
Assert.assertEquals("", 0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912));
rs = queryTable(driver, "select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/delta_0000001_0000001_0000/bucket_00000"));
Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2"));
Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
Assert.assertTrue(rs.get(4), rs.get(4).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\ta7\tb8"));
Assert.assertTrue(rs.get(4), rs.get(4).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
queryTable(driver, "update default.streamingnobuckets set a=0, b=0 where a='a7'");
queryTable(driver, "delete from default.streamingnobuckets where a='a1'");
rs = queryTable(driver, "select a, b from default.streamingnobuckets order by a, b");
int row = 0;
Assert.assertEquals("at row=" + row, "0\t0", rs.get(row++));
Assert.assertEquals("at row=" + row, "a3\tb4", rs.get(row++));
Assert.assertEquals("at row=" + row, "a5\tb6", rs.get(row++));
Assert.assertEquals("at row=" + row, "foo\tbar", rs.get(row++));
queryTable(driver, "alter table default.streamingnobuckets compact 'major'");
runWorker(conf);
rs = queryTable(driver, "select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/base_0000005_v0000025/bucket_00000"));
Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/base_0000005_v0000025/bucket_00000"));
Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/base_0000005_v0000025/bucket_00000"));
Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t0\t0"));
Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/base_0000005_v0000025/bucket_00000"));
}
@Test
public void testGetDeltaPath() throws Exception {
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName)
.withTable(tblName)
.withRecordWriter(writer)
.withHiveConf(conf)
.connect();
Path path = connection.getDeltaFileLocation(partitionVals, 0,
5L, 5L, 9);
Assert.assertTrue(path.toString().endsWith("testing.db/alerts/continent"
+ "=Asia/country=India/delta_0000005_0000005_0009/bucket_00000"));
}
@Test
public void testCommitWithKeyValue() throws Exception {
queryTable(driver, "drop table if exists default.keyvalue");
queryTable(driver, "create table default.keyvalue (a string, b string) stored as orc " +
"TBLPROPERTIES('transactional'='true')");
queryTable(driver, "insert into default.keyvalue values('foo','bar')");
queryTable(driver, "ALTER TABLE default.keyvalue SET TBLPROPERTIES('_metamykey' = 'myvalue')");
List<String> rs = queryTable(driver, "select * from default.keyvalue");
Assert.assertEquals(1, rs.size());
Assert.assertEquals("foo\tbar", rs.get(0));
StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase("Default")
.withTable("keyvalue")
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withTransactionBatchSize(2)
.withRecordWriter(wr)
.withHiveConf(conf)
.connect();
connection.beginTransaction();
connection.write("a1,b2".getBytes());
connection.write("a3,b4".getBytes());
connection.commitTransaction(null, "_metamykey", "myvalue");
connection.close();
rs = queryTable(driver, "select ROW__ID, a, b, INPUT__FILE__NAME from default.keyvalue order by ROW__ID");
Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2"));
Assert.assertTrue(rs.get(1), rs.get(1).endsWith("keyvalue/delta_0000002_0000003/bucket_00000"));
Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
Assert.assertTrue(rs.get(2), rs.get(2).endsWith("keyvalue/delta_0000002_0000003/bucket_00000"));
rs = queryTable(driver, "SHOW TBLPROPERTIES default.keyvalue('_metamykey')");
Assert.assertEquals(rs.get(0), "_metamykey\tmyvalue", rs.get(0));
}
@Test
public void testConnectionWithWriteId() throws Exception {
queryTable(driver, "drop table if exists default.writeidconnection");
queryTable(driver, "create table default.writeidconnection (a string, b string) stored as orc " +
"TBLPROPERTIES('transactional'='true')");
queryTable(driver, "insert into default.writeidconnection values('a0','bar')");
List<String> rs = queryTable(driver, "select * from default.writeidconnection");
Assert.assertEquals(1, rs.size());
Assert.assertEquals("a0\tbar", rs.get(0));
StrictDelimitedInputWriter writerT = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection transactionConnection = HiveStreamingConnection.newBuilder()
.withDatabase("Default")
.withTable("writeidconnection")
.withRecordWriter(writerT)
.withHiveConf(conf)
.connect();
transactionConnection.beginTransaction();
Table tObject = transactionConnection.getTable();
Long writeId = transactionConnection.getCurrentWriteId();
Assert.assertNotNull(tObject);
Assert.assertNotNull(writeId);
StrictDelimitedInputWriter writerOne = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection connectionOne = HiveStreamingConnection.newBuilder()
.withDatabase("Default")
.withTable("writeidconnection")
.withRecordWriter(writerOne)
.withHiveConf(conf)
.withWriteId(writeId)
.withStatementId(1)
.withTableObject(tObject)
.connect();
StrictDelimitedInputWriter writerTwo = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection connectionTwo = HiveStreamingConnection.newBuilder()
.withDatabase("Default")
.withRecordWriter(writerTwo)
.withHiveConf(conf)
.withWriteId(writeId)
.withStatementId(2)
.withTableObject(tObject)
.connect();
Assert.assertNotNull(connectionOne);
Assert.assertNotNull(connectionTwo);
connectionOne.beginTransaction();
connectionTwo.beginTransaction();
connectionOne.write("a1,b2".getBytes());
connectionTwo.write("a5,b6".getBytes());
connectionOne.write("a3,b4".getBytes());
connectionOne.commitTransaction();
connectionTwo.commitTransaction();
Assert.assertEquals(HiveStreamingConnection.TxnState.PREPARED_FOR_COMMIT,
connectionOne.getCurrentTransactionState());
Assert.assertEquals(HiveStreamingConnection.TxnState.PREPARED_FOR_COMMIT,
connectionTwo.getCurrentTransactionState());
try {
connectionOne.beginTransaction();
Assert.fail("second beginTransaction should have thrown a "
+ "StreamingException");
} catch (StreamingException e) {
}
connectionOne.close();
connectionTwo.close();
rs = queryTable(driver, "select ROW__ID, a, b, "
+ "INPUT__FILE__NAME from default.writeidconnection order by ROW__ID");
// Nothing here since it hasn't been committed
Assert.assertEquals(1, rs.size());
transactionConnection.commitTransaction();
rs = queryTable(driver, "select ROW__ID, a, b, "
+ "INPUT__FILE__NAME from default.writeidconnection order by a");
Assert.assertEquals(4, rs.size());
Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\ta0\tbar"));
Assert.assertTrue(rs.get(0), rs.get(0).endsWith("bucket_00000"));
Assert.assertTrue(rs.get(1), rs.get(1).contains("\"rowid\":0}\ta1\tb2"));
Assert.assertTrue(rs.get(1), rs.get(1).endsWith("bucket_00000"));
Assert.assertTrue(rs.get(2), rs.get(2).contains("\"rowid\":1}\ta3\tb4"));
Assert.assertTrue(rs.get(2), rs.get(2).endsWith("bucket_00000"));
Assert.assertTrue(rs.get(3), rs.get(3).contains("\ta5\tb6"));
Assert.assertTrue(rs.get(3), rs.get(3).endsWith("bucket_00000"));
}
@Test
public void testAllTypesDelimitedWriter() throws Exception {
queryTable(driver, "drop table if exists default.alltypes");
queryTable(driver,
"create table if not exists default.alltypes ( bo boolean, ti tinyint, si smallint, i int, bi bigint, " +
"f float, d double, de decimal(10,3), ts timestamp, da date, s string, c char(5), vc varchar(5), " +
"m map<string, string>, l array<int>, st struct<c1:int, c2:string> ) " +
"stored as orc TBLPROPERTIES('transactional'='true')");
StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter('|')
.withCollectionDelimiter(',')
.withMapKeyDelimiter(':')
.build();
HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase("default")
.withTable("alltypes")
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withTransactionBatchSize(2)
.withRecordWriter(wr)
.withHiveConf(conf)
.connect();
String row1 = "true|10|100|1000|10000|4.0|20.0|4.2222|1969-12-31 " +
"15:59:58.174|1970-01-01|string|hello|hello|k1:v1|100,200|10,foo";
String row2 = "false|20|200|2000|20000|8.0|40.0|2.2222|1970-12-31 15:59:58.174|1971-01-01|abcd|world|world|" +
"k4:v4|200,300|20,bar";
connection.beginTransaction();
connection.write(row1.getBytes());
connection.write(row2.getBytes());
connection.commitTransaction();
connection.close();
List<String> rs = queryTable(driver, "select ROW__ID, bo, ti, si, i, bi, f, d, de, ts, da, s, c, vc, m, l, st," +
" INPUT__FILE__NAME from default.alltypes order by ROW__ID");
Assert.assertEquals(2, rs.size());
String gotRow1 = rs.get(0);
String expectedPrefixRow1 = "{\"writeid\":1,\"bucketid\":536870912," +
"\"rowid\":0}\ttrue\t10\t100\t1000\t10000\t4.0\t20.0\t4.222\t1969-12-31 15:59:58.174\t1970-01-01\tstring" +
"\thello\thello\t{\"k1\":\"v1\"}\t[100,200]\t{\"c1\":10,\"c2\":\"foo\"}";
String expectedSuffixRow1 = "alltypes/delta_0000001_0000002/bucket_00000";
String gotRow2 = rs.get(1);
String expectedPrefixRow2 = "{\"writeid\":1,\"bucketid\":536870912," +
"\"rowid\":1}\tfalse\t20\t200\t2000\t20000\t8.0\t40.0\t2.222\t1970-12-31 15:59:58.174\t1971-01-01\tabcd" +
"\tworld\tworld\t{\"k4\":\"v4\"}\t[200,300]\t{\"c1\":20,\"c2\":\"bar\"}";
String expectedSuffixRow2 = "alltypes/delta_0000001_0000002/bucket_00000";
Assert.assertTrue(gotRow1, gotRow1.startsWith(expectedPrefixRow1));
Assert.assertTrue(gotRow1, gotRow1.endsWith(expectedSuffixRow1));
Assert.assertTrue(gotRow2, gotRow2.startsWith(expectedPrefixRow2));
Assert.assertTrue(gotRow2, gotRow2.endsWith(expectedSuffixRow2));
}
@Test
public void testAllTypesDelimitedWriterInputStream() throws Exception {
queryTable(driver, "drop table if exists default.alltypes");
queryTable(driver,
"create table if not exists default.alltypes ( bo boolean, ti tinyint, si smallint, i int, bi bigint, " +
"f float, d double, de decimal(10,3), ts timestamp, da date, s string, c char(5), vc varchar(5), " +
"m map<string, string>, l array<int>, st struct<c1:int, c2:string> ) " +
"stored as orc TBLPROPERTIES('transactional'='true')");
StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter('|')
.withCollectionDelimiter(',')
.withMapKeyDelimiter(':')
.withLineDelimiterPattern("\n")
.build();
StreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase("default")
.withTable("alltypes")
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withTransactionBatchSize(2)
.withRecordWriter(wr)
.withHiveConf(conf)
.connect();
String row1 = "true|10|100|1000|10000|4.0|20.0|4.2222|1969-12-31 " +
"15:59:58.174|1970-01-01|string|hello|hello|k1:v1|100,200|10,foo";
String row2 = "false|20|200|2000|20000|8.0|40.0|2.2222|1970-12-31 15:59:58.174|1971-01-01|abcd|world|world|" +
"k4:v4|200,300|20,bar";
String allRows = row1 + "\n" + row2 + "\n";
ByteArrayInputStream bais = new ByteArrayInputStream(allRows.getBytes());
connection.beginTransaction();
connection.write(bais);
connection.commitTransaction();
connection.close();
bais.close();
List<String> rs = queryTable(driver, "select ROW__ID, bo, ti, si, i, bi, f, d, de, ts, da, s, c, vc, m, l, st," +
" INPUT__FILE__NAME from default.alltypes order by ROW__ID");
Assert.assertEquals(2, rs.size());
String gotRow1 = rs.get(0);
String expectedPrefixRow1 = "{\"writeid\":1,\"bucketid\":536870912," +
"\"rowid\":0}\ttrue\t10\t100\t1000\t10000\t4.0\t20.0\t4.222\t1969-12-31 15:59:58.174\t1970-01-01\tstring" +
"\thello\thello\t{\"k1\":\"v1\"}\t[100,200]\t{\"c1\":10,\"c2\":\"foo\"}";
String expectedSuffixRow1 = "alltypes/delta_0000001_0000002/bucket_00000";
String gotRow2 = rs.get(1);
String expectedPrefixRow2 = "{\"writeid\":1,\"bucketid\":536870912," +
"\"rowid\":1}\tfalse\t20\t200\t2000\t20000\t8.0\t40.0\t2.222\t1970-12-31 15:59:58.174\t1971-01-01\tabcd" +
"\tworld\tworld\t{\"k4\":\"v4\"}\t[200,300]\t{\"c1\":20,\"c2\":\"bar\"}";
String expectedSuffixRow2 = "alltypes/delta_0000001_0000002/bucket_00000";
Assert.assertTrue(gotRow1, gotRow1.startsWith(expectedPrefixRow1));
Assert.assertTrue(gotRow1, gotRow1.endsWith(expectedSuffixRow1));
Assert.assertTrue(gotRow2, gotRow2.startsWith(expectedPrefixRow2));
Assert.assertTrue(gotRow2, gotRow2.endsWith(expectedSuffixRow2));
}
@Test
public void testAutoRollTransactionBatch() throws Exception {
queryTable(driver, "drop table if exists default.streamingnobuckets");
queryTable(driver, "create table default.streamingnobuckets (a string, b string) stored as orc " +
"TBLPROPERTIES('transactional'='true')");
queryTable(driver, "insert into default.streamingnobuckets values('foo','bar')");
List<String> rs = queryTable(driver, "select * from default.streamingnobuckets");
Assert.assertEquals(1, rs.size());
Assert.assertEquals("foo\tbar", rs.get(0));
StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase("default")
.withTable("streamingnobuckets")
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withRecordWriter(wr)
.withHiveConf(conf)
.withTransactionBatchSize(2)
.connect();
connection.beginTransaction();
connection.write("a1,b2".getBytes());
connection.write("a3,b4".getBytes());
connection.commitTransaction();
connection.beginTransaction();
connection.write("a5,b6".getBytes());
connection.write("a7,b8".getBytes());
connection.commitTransaction();
// should have rolled over to next transaction batch
connection.beginTransaction();
connection.write("a9,b10".getBytes());
connection.write("a11,b12".getBytes());
connection.commitTransaction();
connection.beginTransaction();
connection.write("a13,b14".getBytes());
connection.write("a15,b16".getBytes());
connection.commitTransaction();
connection.close();
Assert.assertEquals("", 0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912));
rs = queryTable(driver, "select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/delta_0000001_0000001_0000/bucket_00000"));
Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2"));
Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
Assert.assertTrue(rs.get(4), rs.get(4).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\ta7\tb8"));
Assert.assertTrue(rs.get(4), rs.get(4).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
Assert.assertTrue(rs.get(5), rs.get(5).startsWith("{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\ta9\tb10"));
Assert.assertTrue(rs.get(5), rs.get(5).endsWith("streamingnobuckets/delta_0000004_0000005/bucket_00000"));
Assert.assertTrue(rs.get(6), rs.get(6).startsWith("{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\ta11\tb12"));
Assert.assertTrue(rs.get(6), rs.get(6).endsWith("streamingnobuckets/delta_0000004_0000005/bucket_00000"));
Assert.assertTrue(rs.get(7), rs.get(7).startsWith("{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\ta13\tb14"));
Assert.assertTrue(rs.get(7), rs.get(7).endsWith("streamingnobuckets/delta_0000004_0000005/bucket_00000"));
Assert.assertTrue(rs.get(8), rs.get(8).startsWith("{\"writeid\":5,\"bucketid\":536870912,\"rowid\":1}\ta15\tb16"));
Assert.assertTrue(rs.get(8), rs.get(8).endsWith("streamingnobuckets/delta_0000004_0000005/bucket_00000"));
queryTable(driver, "update default.streamingnobuckets set a=0, b=0 where a='a7'");
queryTable(driver, "delete from default.streamingnobuckets where a='a1'");
queryTable(driver, "update default.streamingnobuckets set a=0, b=0 where a='a15'");
queryTable(driver, "delete from default.streamingnobuckets where a='a9'");
rs = queryTable(driver, "select a, b from default.streamingnobuckets order by a, b");
int row = 0;
Assert.assertEquals("at row=" + row, "0\t0", rs.get(row++));
Assert.assertEquals("at row=" + row, "0\t0", rs.get(row++));
Assert.assertEquals("at row=" + row, "a11\tb12", rs.get(row++));
Assert.assertEquals("at row=" + row, "a13\tb14", rs.get(row++));
Assert.assertEquals("at row=" + row, "a3\tb4", rs.get(row++));
Assert.assertEquals("at row=" + row, "a5\tb6", rs.get(row++));
Assert.assertEquals("at row=" + row, "foo\tbar", rs.get(row++));
queryTable(driver, "alter table default.streamingnobuckets compact 'major'");
runWorker(conf);
rs = queryTable(driver, "select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/base_0000009_v0000029/bucket_00000"));
Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/base_0000009_v0000029/bucket_00000"));
Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/base_0000009_v0000029/bucket_00000"));
Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\ta11\tb12"));
Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/base_0000009_v0000029/bucket_00000"));
Assert.assertTrue(rs.get(4), rs.get(4).startsWith("{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\ta13\tb14"));
Assert.assertTrue(rs.get(4), rs.get(4).endsWith("streamingnobuckets/base_0000009_v0000029/bucket_00000"));
Assert.assertTrue(rs.get(5), rs.get(5).startsWith("{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t0\t0"));
Assert.assertTrue(rs.get(5), rs.get(5).endsWith("streamingnobuckets/base_0000009_v0000029/bucket_00000"));
}
/**
* this is a clone from TestHiveStreamingConnection.TxnStatement2....
*/
public static void runWorker(HiveConf hiveConf) throws Exception {
AtomicBoolean stop = new AtomicBoolean(true);
Worker t = new Worker();
t.setThreadId((int) t.getId());
t.setConf(hiveConf);
AtomicBoolean looped = new AtomicBoolean();
t.init(stop, looped);
t.run();
}
// stream data into streaming table with N buckets, then copy the data into another bucketed table
// check if bucketing in both was done in the same way
@Test
public void testStreamBucketingMatchesRegularBucketing() throws Exception {
int bucketCount = 100;
String dbUri = "raw://" + new Path(dbFolder.newFolder().toString()).toUri().toString();
String tableLoc = "'" + dbUri + Path.SEPARATOR + "streamedtable" + "'";
String tableLoc2 = "'" + dbUri + Path.SEPARATOR + "finaltable" + "'";
String tableLoc3 = "'" + dbUri + Path.SEPARATOR + "nobucket" + "'";
// disabling vectorization as this test yields incorrect results with vectorization
conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
try (IDriver driver = DriverFactory.newDriver(conf)) {
runDDL(driver, "create database testBucketing3");
runDDL(driver, "use testBucketing3");
runDDL(driver, "create table streamedtable ( key1 string,key2 int,data string ) clustered by ( key1,key2 ) into "
+ bucketCount + " buckets stored as orc location " + tableLoc + " TBLPROPERTIES ('transactional'='true')");
// In 'nobucket' table we capture bucketid from streamedtable to workaround a hive bug that prevents joins two identically bucketed tables
runDDL(driver, "create table nobucket ( bucketid int, key1 string,key2 int,data string ) location " + tableLoc3);
runDDL(driver,
"create table finaltable ( bucketid int, key1 string,key2 int,data string ) clustered by ( key1,key2 ) into "
+ bucketCount + " buckets stored as orc location " + tableLoc2 + " TBLPROPERTIES ('transactional'='true')");
String[] records = new String[]{
"PSFAHYLZVC,29,EPNMA",
"PPPRKWAYAU,96,VUTEE",
"MIAOFERCHI,3,WBDSI",
"CEGQAZOWVN,0,WCUZL",
"XWAKMNSVQF,28,YJVHU",
"XBWTSAJWME,2,KDQFO",
"FUVLQTAXAY,5,LDSDG",
"QTQMDJMGJH,6,QBOMA",
"EFLOTLWJWN,71,GHWPS",
"PEQNAOJHCM,82,CAAFI",
"MOEKQLGZCP,41,RUACR",
"QZXMCOPTID,37,LFLWE",
"EYALVWICRD,13,JEZLC",
"VYWLZAYTXX,16,DMVZX",
"OSALYSQIXR,47,HNZVE",
"JGKVHKCEGQ,25,KSCJB",
"WQFMMYDHET,12,DTRWA",
"AJOVAYZKZQ,15,YBKFO",
"YAQONWCUAU,31,QJNHZ",
"DJBXUEUOEB,35,IYCBL"
};
StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase("testBucketing3")
.withTable("streamedtable")
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withRecordWriter(wr)
.withHiveConf(conf)
.connect();
connection.beginTransaction();
for (String record : records) {
connection.write(record.getBytes());
}
connection.commitTransaction();
connection.close();
ArrayList<String> res1 = queryTable(driver, "select row__id.bucketid, * from streamedtable order by key2");
for (String re : res1) {
LOG.error(re);
}
driver.run("insert into nobucket select row__id.bucketid,* from streamedtable");
runDDL(driver, "insert into finaltable select * from nobucket");
ArrayList<String> res2 = queryTable(driver,
"select row__id.bucketid,* from finaltable where row__id.bucketid<>bucketid");
for (String s : res2) {
LOG.error(s);
}
Assert.assertTrue(res2.isEmpty());
} finally {
conf.unset(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.varname);
}
}
@Test
public void testTableValidation() throws Exception {
int bucketCount = 100;
String dbUri = "raw://" + new Path(dbFolder.newFolder().toString()).toUri().toString();
String tbl1 = "validation1";
String tbl2 = "validation2";
String tableLoc = "'" + dbUri + Path.SEPARATOR + tbl1 + "'";
String tableLoc2 = "'" + dbUri + Path.SEPARATOR + tbl2 + "'";
runDDL(driver, "create database testBucketing3");
runDDL(driver, "use testBucketing3");
runDDL(driver, "create table " + tbl1 + " ( key1 string, data string ) clustered by ( key1 ) into "
+ bucketCount + " buckets stored as orc location " + tableLoc + " TBLPROPERTIES ('transactional'='false')");
runDDL(driver, "create table " + tbl2 + " ( key1 string, data string ) clustered by ( key1 ) into "
+ bucketCount + " buckets stored as orc location " + tableLoc2 + " TBLPROPERTIES ('transactional'='false')");
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection connection = null;
try {
connection = HiveStreamingConnection.newBuilder()
.withDatabase("testBucketing3")
.withTable("validation2")
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withRecordWriter(writer)
.withHiveConf(conf)
.connect();
Assert.assertTrue("InvalidTable exception was not thrown", false);
} catch (InvalidTable e) {
// expecting this exception
} finally {
if (connection != null) {
connection.close();
}
}
try {
connection = HiveStreamingConnection.newBuilder()
.withDatabase("testBucketing3")
.withTable("validation2")
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withRecordWriter(writer)
.withHiveConf(conf)
.connect();
Assert.assertTrue("InvalidTable exception was not thrown", false);
} catch (InvalidTable e) {
// expecting this exception
} finally {
if (connection != null) {
connection.close();
}
}
}
/**
* @deprecated use {@link #checkDataWritten2(Path, long, long, int, String, boolean, String...)} -
* there is little value in using InputFormat directly
*/
@Deprecated
private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles,
String... records) throws Exception {
ValidWriteIdList writeIds = getTransactionContext(conf);
AcidUtils.Directory dir = AcidUtils.getAcidState(null, partitionPath, conf, writeIds, null, false, null, false);
Assert.assertEquals(0, dir.getObsolete().size());
Assert.assertEquals(0, dir.getOriginalFiles().size());
List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
System.out.println("Files found: ");
for (AcidUtils.ParsedDelta pd : current) {
System.out.println(pd.getPath().toString());
}
Assert.assertEquals(numExpectedFiles, current.size());
// find the absolute minimum transaction
long min = Long.MAX_VALUE;
long max = Long.MIN_VALUE;
for (AcidUtils.ParsedDelta pd : current) {
if (pd.getMaxWriteId() > max) {
max = pd.getMaxWriteId();
}
if (pd.getMinWriteId() < min) {
min = pd.getMinWriteId();
}
}
Assert.assertEquals(minTxn, min);
Assert.assertEquals(maxTxn, max);
InputFormat inf = new OrcInputFormat();
JobConf job = new JobConf();
job.set("mapred.input.dir", partitionPath.toString());
job.set(BUCKET_COUNT, Integer.toString(buckets));
job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, "id,msg");
job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "bigint:string");
AcidUtils.setAcidOperationalProperties(job, true, null);
job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
job.set(ValidWriteIdList.VALID_WRITEIDS_KEY, writeIds.writeToString());
job.set(ValidTxnList.VALID_TXNS_KEY, conf.get(ValidTxnList.VALID_TXNS_KEY));
InputSplit[] splits = inf.getSplits(job, buckets);
Assert.assertEquals(numExpectedFiles, splits.length);
org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr =
inf.getRecordReader(splits[0], job, Reporter.NULL);
NullWritable key = rr.createKey();
OrcStruct value = rr.createValue();
for (String record : records) {
Assert.assertEquals(true, rr.next(key, value));
Assert.assertEquals(record, value.toString());
}
Assert.assertEquals(false, rr.next(key, value));
}
/**
* @param validationQuery query to read from table to compare data against {@code records}
* @param records expected data. each row is CVS list of values
*/
private void checkDataWritten2(Path partitionPath, long minTxn, long maxTxn, int numExpectedFiles,
String validationQuery, boolean vectorize, String... records) throws Exception {
AcidUtils.Directory dir = AcidUtils.getAcidState(null, partitionPath, conf, getTransactionContext(conf), null, false, null, false);
Assert.assertEquals(0, dir.getObsolete().size());
Assert.assertEquals(0, dir.getOriginalFiles().size());
List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
System.out.println("Files found: ");
for (AcidUtils.ParsedDelta pd : current) {
System.out.println(pd.getPath().toString());
}
Assert.assertEquals(numExpectedFiles, current.size());
// find the absolute minimum transaction
long min = Long.MAX_VALUE;
long max = Long.MIN_VALUE;
for (AcidUtils.ParsedDelta pd : current) {
if (pd.getMaxWriteId() > max) {
max = pd.getMaxWriteId();
}
if (pd.getMinWriteId() < min) {
min = pd.getMinWriteId();
}
}
Assert.assertEquals(minTxn, min);
Assert.assertEquals(maxTxn, max);
boolean isVectorizationEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED);
if (vectorize) {
conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
}
String currStrategy = conf.getVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY);
for (String strategy : ((Validator.StringSet) HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.getValidator())
.getExpected()) {
//run it with each split strategy - make sure there are differences
conf.setVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY, strategy.toUpperCase());
List<String> actualResult = queryTable(driver, validationQuery);
for (int i = 0; i < actualResult.size(); i++) {
Assert.assertEquals("diff at [" + i + "]. actual=" + actualResult + " expected=" +
Arrays.toString(records), records[i], actualResult.get(i));
}
}
conf.setVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY, currStrategy);
conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorizationEnabled);
}
private ValidWriteIdList getTransactionContext(Configuration conf) throws Exception {
ValidTxnList validTxnList = msClient.getValidTxns();
conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
List<TableValidWriteIds> v = msClient.getValidWriteIds(Collections
.singletonList(TableName.getDbTable(dbName, tblName)), validTxnList.writeToString());
return TxnCommonUtils.createValidReaderWriteIdList(v.get(0));
}
private void checkNothingWritten(Path partitionPath) throws Exception {
AcidUtils.Directory dir = AcidUtils.getAcidState(null, partitionPath, conf, getTransactionContext(conf), null, false, null, false);
Assert.assertEquals(0, dir.getObsolete().size());
Assert.assertEquals(0, dir.getOriginalFiles().size());
List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
Assert.assertEquals(0, current.size());
}
@Test
public void testEndpointConnection() throws Exception {
// For partitioned table, partitionVals are specified
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName)
.withTable(tblName)
.withStaticPartitionValues(partitionVals)
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withRecordWriter(writer)
.withHiveConf(conf)
.connect();
connection.close();
// For unpartitioned table, partitionVals are not specified
connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName2)
.withTable(tblName2)
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withRecordWriter(writer)
.withHiveConf(conf)
.connect();
connection.close();
// For unpartitioned table, partition values are specified
try {
connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName2)
.withTable(tblName2)
.withStaticPartitionValues(partitionVals)
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withRecordWriter(writer)
.withHiveConf(conf)
.connect();
Assert.assertTrue("ConnectionError was not thrown", false);
connection.close();
} catch (ConnectionError e) {
// expecting this exception
String errMsg = "specifies partitions for un-partitioned table";
Assert.assertTrue(e.toString().endsWith(errMsg));
}
}
@Test
public void testAddPartition() throws Exception {
List<String> newPartVals = new ArrayList<String>(2);
newPartVals.add(PART1_CONTINENT);
newPartVals.add("Nepal");
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName)
.withTable(tblName)
.withStaticPartitionValues(newPartVals)
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withRecordWriter(writer)
.withHiveConf(conf)
.connect();
// Create partition
Assert.assertNotNull(connection);
connection.beginTransaction();
connection.write("3,Hello streaming - once again".getBytes());
connection.commitTransaction();
// Ensure partition is present
Partition p = msClient.getPartition(dbName, tblName, newPartVals);
Assert.assertNotNull("Did not find added partition", p);
}
@Test
public void testAddPartitionWithWriteId() throws Exception {
List<String> newPartVals = new ArrayList<String>(2);
newPartVals.add("WriteId_continent");
newPartVals.add("WriteId_country");
StrictDelimitedInputWriter writerT = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection transactionConnection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName)
.withTable(tblName)
.withStaticPartitionValues(newPartVals)
.withRecordWriter(writerT)
.withHiveConf(conf)
.connect();
transactionConnection.beginTransaction();
Table tObject = transactionConnection.getTable();
Long writeId = transactionConnection.getCurrentWriteId();
Assert.assertNotNull(tObject);
Assert.assertNotNull(writeId);
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName)
.withTable(tblName)
.withStaticPartitionValues(newPartVals)
.withRecordWriter(writer)
.withHiveConf(conf)
.withWriteId(writeId)
.withStatementId(1)
.withTableObject(tObject)
.connect();
Assert.assertNotNull(connection);
connection.beginTransaction();
connection.write("3,Hello streaming - once again".getBytes());
connection.commitTransaction();
Set<String> partitions = new HashSet<>(connection.getPartitions());
connection.close();
// Ensure partition is not present
try {
msClient.getPartition(dbName, tblName, newPartVals);
Assert.fail("Partition shouldn't exist so a NoSuchObjectException should have been raised");
} catch (NoSuchObjectException e) {}
transactionConnection.commitTransaction(partitions);
// Ensure partition is present
Partition p = msClient.getPartition(dbName, tblName, newPartVals);
Assert.assertNotNull("Did not find added partition", p);
}
@Test
public void testAddDynamicPartitionWithWriteId() throws Exception {
queryTable(driver, "drop table if exists default.writeiddynamic");
queryTable(driver, "create table default.writeiddynamic (a"
+ " string, b string) partitioned by (c string, d string)"
+ " stored as orc TBLPROPERTIES('transactional'='true')");
StrictDelimitedInputWriter writerT =
StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
HiveStreamingConnection transactionConnection =
HiveStreamingConnection.newBuilder().withDatabase("default")
.withTable("writeiddynamic").withRecordWriter(writerT)
.withHiveConf(conf).connect();
transactionConnection.beginTransaction();
Table tObject = transactionConnection.getTable();
Long writeId = transactionConnection.getCurrentWriteId();
Assert.assertNotNull(tObject);
Assert.assertNotNull(writeId);
StrictDelimitedInputWriter writerOne =
StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
HiveStreamingConnection connectionOne =
HiveStreamingConnection.newBuilder().withDatabase("default")
.withTable("writeiddynamic").withRecordWriter(writerOne)
.withHiveConf(conf).withWriteId(writeId).withStatementId(1)
.withTableObject(tObject).connect();
StrictDelimitedInputWriter writerTwo =
StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
HiveStreamingConnection connectionTwo =
HiveStreamingConnection.newBuilder().withDatabase("default")
.withTable("writeiddynamic")
.withRecordWriter(writerTwo)
.withHiveConf(conf).withWriteId(writeId).withStatementId(1)
.withTableObject(tObject)
.connect();
Assert.assertNotNull(connectionOne);
connectionTwo.beginTransaction();
connectionOne.beginTransaction();
connectionOne.write("1,2,3,4".getBytes());
connectionOne.write("1,2,5,6".getBytes());
connectionTwo.write("1,2,30,40".getBytes());
connectionOne.write("1,2,7,8".getBytes());
connectionTwo.write("1,2,50,60".getBytes());
connectionOne.write("1,2,9,10".getBytes());
connectionOne.commitTransaction();
connectionTwo.commitTransaction();
Set<String> partitionsOne = new HashSet<>(connectionOne.getPartitions());
Assert.assertEquals(4, partitionsOne.size());
Set<String> partitionsTwo = new HashSet<>(connectionTwo.getPartitions());
Assert.assertEquals(2, partitionsTwo.size());
connectionOne.close();
connectionTwo.close();
try {
String partitionName = partitionsOne.iterator().next();
msClient.getPartition("default", "writeiddynamic", partitionName);
Assert.fail(
"Partition shouldn't exist so a NoSuchObjectException should have been raised");
} catch (NoSuchObjectException e) {
}
partitionsOne.addAll(partitionsTwo);
Set<String> allPartitions = partitionsOne;
transactionConnection.commitTransaction(allPartitions);
// Ensure partition is present
for (String partition : allPartitions) {
Partition p =
msClient.getPartition("default", "writeiddynamic",
partition);
Assert.assertNotNull("Did not find added partition", p);
}
}
@Test
public void testTransactionBatchEmptyCommit() throws Exception {
// 1) to partitioned table
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName)
.withTable(tblName)
.withStaticPartitionValues(partitionVals)
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withRecordWriter(writer)
.withHiveConf(conf)
.connect();
connection.beginTransaction();
connection.commitTransaction();
Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
connection.getCurrentTransactionState());
connection.close();
// 2) To unpartitioned table
writer = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName2)
.withTable(tblName2)
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withRecordWriter(writer)
.withHiveConf(conf)
.connect();
connection.beginTransaction();
connection.commitTransaction();
Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
connection.getCurrentTransactionState());
connection.close();
}
@Test
public void testTransactionBatchSizeValidation() throws Exception {
final String schemes = conf.get(HiveConf.ConfVars.HIVE_BLOBSTORE_SUPPORTED_SCHEMES.varname);
// the output stream of this FS doesn't support hflush, so the below test will fail
conf.setVar(HiveConf.ConfVars.HIVE_BLOBSTORE_SUPPORTED_SCHEMES, "raw");
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
try {
HiveStreamingConnection.newBuilder()
.withDatabase(dbName)
.withTable(tblName)
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withRecordWriter(writer)
.withTransactionBatchSize(2)
.withHiveConf(conf)
.connect();
Assert.fail();
} catch (ConnectionError e) {
Assert.assertTrue("Expected connection error due to batch sizes",
e.getMessage().contains("only supports transaction batch"));
} finally {
conf.setVar(HiveConf.ConfVars.HIVE_BLOBSTORE_SUPPORTED_SCHEMES, schemes);
}
}
/**
* check that transactions that have not heartbeated and timedout get properly aborted
*
* @throws Exception
*/
@Test
public void testTimeOutReaper() throws Exception {
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName2)
.withTable(tblName2)
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withRecordWriter(writer)
.withHiveConf(conf)
.connect();
connection.beginTransaction();
conf.setTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, 0, TimeUnit.SECONDS);
//ensure txn timesout
conf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 2, TimeUnit.MILLISECONDS);
AcidHouseKeeperService houseKeeperService = new AcidHouseKeeperService();
houseKeeperService.setConf(conf);
houseKeeperService.run();
try {
//should fail because the TransactionBatch timed out
connection.commitTransaction();
} catch (TransactionError e) {
Assert.assertTrue("Expected aborted transaction", e.getCause() instanceof TxnAbortedException);
}
connection.close();
connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName2)
.withTable(tblName2)
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withRecordWriter(writer)
.withHiveConf(conf)
.connect();
connection.beginTransaction();
connection.commitTransaction();
connection.beginTransaction();
houseKeeperService.run();
try {
//should fail because the TransactionBatch timed out
connection.commitTransaction();
} catch (TransactionError e) {
Assert.assertTrue("Expected aborted transaction", e.getCause() instanceof TxnAbortedException);
}
connection.close();
}
@Test
public void testHeartbeat() throws Exception {
int transactionBatch = 20;
conf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 200, TimeUnit.MILLISECONDS);
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName2)
.withTable(tblName2)
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withTransactionBatchSize(transactionBatch)
.withRecordWriter(writer)
.withHiveConf(conf)
.connect();
try {
connection.beginTransaction();
ShowLocksRequest request = new ShowLocksRequest();
request.setDbname(dbName2);
request.setTablename(tblName2);
ShowLocksResponse response = msClient.showLocks(request);
Assert.assertEquals("Wrong number of locks: " + response, 1, response.getLocks().size());
ShowLocksResponseElement lock = response.getLocks().get(0);
long acquiredAt = lock.getAcquiredat();
long heartbeatAt = lock.getLastheartbeat();
response = msClient.showLocks(request);
Assert.assertEquals("Wrong number of locks2: " + response, 1, response.getLocks().size());
lock = response.getLocks().get(0);
Assert.assertEquals("Acquired timestamp didn'table match", acquiredAt, lock.getAcquiredat());
Assert.assertTrue("Expected new heartbeat (" + lock.getLastheartbeat() +
") == old heartbeat(" + heartbeatAt + ")", lock.getLastheartbeat() == heartbeatAt);
for (int i = 0; i < transactionBatch * 3; i++) {
connection.beginTransaction();
if (i % 10 == 0) {
connection.abortTransaction();
} else {
connection.commitTransaction();
}
Thread.sleep(10);
}
} finally {
conf.unset(HiveConf.ConfVars.HIVE_TXN_TIMEOUT.varname);
connection.close();
}
}
@Test
public void testTransactionBatchEmptyAbort() throws Exception {
// 1) to partitioned table
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName)
.withTable(tblName)
.withStaticPartitionValues(partitionVals)
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withRecordWriter(writer)
.withHiveConf(conf)
.connect();
connection.beginTransaction();
connection.abortTransaction();
Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED,
connection.getCurrentTransactionState());
connection.close();
// 2) to unpartitioned table
writer = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName2)
.withTable(tblName2)
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withRecordWriter(writer)
.withHiveConf(conf)
.connect();
connection.beginTransaction();
connection.abortTransaction();
Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED,
connection.getCurrentTransactionState());
connection.close();
}
@Test
public void testTransactionBatchCommitDelimited() throws Exception {
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName)
.withTable(tblName)
.withStaticPartitionValues(partitionVals)
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withHiveConf(conf)
.withRecordWriter(writer)
.withTransactionBatchSize(10)
.connect();
// 1st Txn
connection.beginTransaction();
Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN,
connection.getCurrentTransactionState());
connection.write("1,Hello streaming".getBytes());
connection.commitTransaction();
checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
connection.getCurrentTransactionState());
// 2nd Txn
connection.beginTransaction();
Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN,
connection.getCurrentTransactionState());
connection.write("2,Welcome to streaming".getBytes());
// data should not be visible
checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
connection.commitTransaction();
checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
"{2, Welcome to streaming}");
connection.close();
Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE,
connection.getCurrentTransactionState());
// To Unpartitioned table
writer = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName2)
.withTable(tblName2)
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withHiveConf(conf)
.withRecordWriter(writer)
.connect();
// 1st Txn
connection.beginTransaction();
Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN,
connection.getCurrentTransactionState());
connection.write("1,Hello streaming".getBytes());
connection.commitTransaction();
Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
connection.getCurrentTransactionState());
connection.close();
}
@Test
public void testTransactionBatchCommitRegex() throws Exception {
String regex = "([^,]*),(.*)";
StrictRegexWriter writer = StrictRegexWriter.newBuilder()
.withRegex(regex)
.build();
HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName)
.withTable(tblName)
.withStaticPartitionValues(partitionVals)
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withHiveConf(conf)
.withRecordWriter(writer)
.withTransactionBatchSize(10)
.connect();
// 1st Txn
connection.beginTransaction();
Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN,
connection.getCurrentTransactionState());
connection.write("1,Hello streaming".getBytes());
connection.commitTransaction();
checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
connection.getCurrentTransactionState());
// 2nd Txn
connection.beginTransaction();
Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN,
connection.getCurrentTransactionState());
connection.write("2,Welcome to streaming".getBytes());
// data should not be visible
checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
connection.commitTransaction();
checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
"{2, Welcome to streaming}");
connection.close();
Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE,
connection.getCurrentTransactionState());
// To Unpartitioned table
regex = "([^:]*):(.*)";
writer = StrictRegexWriter.newBuilder()
.withRegex(regex)
.build();
connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName2)
.withTable(tblName2)
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withHiveConf(conf)
.withRecordWriter(writer)
.connect();
// 1st Txn
connection.beginTransaction();
Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN,
connection.getCurrentTransactionState());
connection.write("1:Hello streaming".getBytes());
connection.commitTransaction();
Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
connection.getCurrentTransactionState());
connection.close();
}
@Test
public void testRegexInputStream() throws Exception {
String regex = "([^,]*),(.*)";
StrictRegexWriter writer = StrictRegexWriter.newBuilder()
// if unspecified, default one or [\r\n] will be used for line break
.withRegex(regex)
.build();
StreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName)
.withTable(tblName)
.withStaticPartitionValues(partitionVals)
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withHiveConf(conf)
.withRecordWriter(writer)
.connect();
String rows = "1,foo\r2,bar\r3,baz";
ByteArrayInputStream bais = new ByteArrayInputStream(rows.getBytes());
connection.beginTransaction();
connection.write(bais);
connection.commitTransaction();
bais.close();
connection.close();
List<String> rs = queryTable(driver, "select * from " + dbName + "." + tblName);
Assert.assertEquals(3, rs.size());
Assert.assertEquals("1\tfoo\tAsia\tIndia", rs.get(0));
Assert.assertEquals("2\tbar\tAsia\tIndia", rs.get(1));
Assert.assertEquals("3\tbaz\tAsia\tIndia", rs.get(2));
}
@Test
public void testTransactionBatchCommitJson() throws Exception {
StrictJsonWriter writer = StrictJsonWriter.newBuilder()
.build();
HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName)
.withTable(tblName)
.withStaticPartitionValues(partitionVals)
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withRecordWriter(writer)
.withHiveConf(conf)
.withTransactionBatchSize(10)
.connect();
// 1st Txn
connection.beginTransaction();
Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN,
connection.getCurrentTransactionState());
String rec1 = "{\"id\" : 1, \"msg\": \"Hello streaming\"}";
connection.write(rec1.getBytes());
connection.commitTransaction();
checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
connection.getCurrentTransactionState());
connection.close();
Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE,
connection.getCurrentTransactionState());
List<String> rs = queryTable(driver, "select * from " + dbName + "." + tblName);
Assert.assertEquals(1, rs.size());
}
@Test
public void testJsonInputStream() throws Exception {
StrictJsonWriter writer = StrictJsonWriter.newBuilder()
.withLineDelimiterPattern("\\|")
.build();
HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName)
.withTable(tblName)
.withStaticPartitionValues(partitionVals)
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withRecordWriter(writer)
.withHiveConf(conf)
.connect();
// 1st Txn
connection.beginTransaction();
Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, connection.getCurrentTransactionState());
String records = "{\"id\" : 1, \"msg\": \"Hello streaming\"}|{\"id\" : 2, \"msg\": \"Hello world\"}|{\"id\" : 3, " +
"\"msg\": \"Hello world!!\"}";
ByteArrayInputStream bais = new ByteArrayInputStream(records.getBytes());
connection.write(bais);
connection.commitTransaction();
bais.close();
connection.close();
List<String> rs = queryTable(driver, "select * from " + dbName + "." + tblName);
Assert.assertEquals(3, rs.size());
Assert.assertEquals("1\tHello streaming\tAsia\tIndia", rs.get(0));
Assert.assertEquals("2\tHello world\tAsia\tIndia", rs.get(1));
Assert.assertEquals("3\tHello world!!\tAsia\tIndia", rs.get(2));
}
@Test
public void testRemainingTransactions() throws Exception {
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName)
.withTable(tblName)
.withStaticPartitionValues(partitionVals)
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withRecordWriter(writer)
.withHiveConf(conf)
.connect();
connection.beginTransaction();
// 1) test with txn.Commit()
int batch = 0;
int initialCount = connection.remainingTransactions();
while (connection.remainingTransactions() > 0) {
connection.beginTransaction();
Assert.assertEquals(--initialCount, connection.remainingTransactions());
for (int rec = 0; rec < 2; ++rec) {
Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN,
connection.getCurrentTransactionState());
connection.write((batch * rec + ",Hello streaming").getBytes());
}
connection.commitTransaction();
Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
connection.getCurrentTransactionState());
++batch;
}
Assert.assertEquals(0, connection.remainingTransactions());
connection.close();
Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE,
connection.getCurrentTransactionState());
connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName)
.withTable(tblName)
.withStaticPartitionValues(partitionVals)
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withRecordWriter(writer)
.withHiveConf(conf)
.connect();
// 2) test with txn.Abort()
connection.beginTransaction();
batch = 0;
initialCount = connection.remainingTransactions();
while (connection.remainingTransactions() > 0) {
connection.beginTransaction();
Assert.assertEquals(--initialCount, connection.remainingTransactions());
for (int rec = 0; rec < 2; ++rec) {
Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN,
connection.getCurrentTransactionState());
connection.write((batch * rec + ",Hello streaming").getBytes());
}
connection.abortTransaction();
Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED,
connection.getCurrentTransactionState());
++batch;
}
Assert.assertEquals(0, connection.remainingTransactions());
connection.close();
Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE,
connection.getCurrentTransactionState());
}
@Test
public void testTransactionBatchAbort() throws Exception {
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName)
.withTable(tblName)
.withStaticPartitionValues(partitionVals)
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withRecordWriter(writer)
.withHiveConf(conf)
.connect();
connection.beginTransaction();
connection.write("1,Hello streaming".getBytes());
connection.write("2,Welcome to streaming".getBytes());
connection.abortTransaction();
checkNothingWritten(partLoc);
Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED,
connection.getCurrentTransactionState());
connection.close();
checkNothingWritten(partLoc);
}
@Test
public void testTransactionBatchAbortAndCommit() throws Exception {
String agentInfo = "UT_" + Thread.currentThread().getName();
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName)
.withTable(tblName)
.withStaticPartitionValues(partitionVals)
.withAgentInfo(agentInfo)
.withRecordWriter(writer)
.withHiveConf(conf)
.withTransactionBatchSize(10)
.connect();
connection.beginTransaction();
connection.write("1,Hello streaming".getBytes());
connection.write("2,Welcome to streaming".getBytes());
ShowLocksResponse resp = msClient.showLocks(new ShowLocksRequest());
Assert.assertEquals("LockCount", 1, resp.getLocksSize());
Assert.assertEquals("LockType", LockType.SHARED_READ, resp.getLocks().get(0).getType());
Assert.assertEquals("LockState", LockState.ACQUIRED, resp.getLocks().get(0).getState());
Assert.assertEquals("AgentInfo", agentInfo, resp.getLocks().get(0).getAgentInfo());
connection.abortTransaction();
checkNothingWritten(partLoc);
Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED,
connection.getCurrentTransactionState());
connection.beginTransaction();
connection.write("1,Hello streaming".getBytes());
connection.write("2,Welcome to streaming".getBytes());
connection.commitTransaction();
checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
"{2, Welcome to streaming}");
connection.close();
}
@Test
public void testMultipleTransactionBatchCommits() throws Exception {
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName)
.withTable(tblName)
.withStaticPartitionValues(partitionVals)
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withRecordWriter(writer)
.withTransactionBatchSize(10)
.withHiveConf(conf)
.connect();
connection.beginTransaction();
connection.write("1,Hello streaming".getBytes());
connection.commitTransaction();
String validationQuery = "select id, msg from " + dbName + "." + tblName + " order by id, msg";
checkDataWritten2(partLoc, 1, 10, 1, validationQuery, false, "1\tHello streaming");
connection.beginTransaction();
connection.write("2,Welcome to streaming".getBytes());
connection.commitTransaction();
checkDataWritten2(partLoc, 1, 10, 1, validationQuery, true, "1\tHello streaming",
"2\tWelcome to streaming");
connection.close();
connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName)
.withTable(tblName)
.withStaticPartitionValues(partitionVals)
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withRecordWriter(writer)
.withTransactionBatchSize(10)
.withHiveConf(conf)
.connect();
// 2nd Txn Batch
connection.beginTransaction();
connection.write("3,Hello streaming - once again".getBytes());
connection.commitTransaction();
checkDataWritten2(partLoc, 1, 20, 2, validationQuery, false, "1\tHello streaming",
"2\tWelcome to streaming", "3\tHello streaming - once again");
connection.beginTransaction();
connection.write("4,Welcome to streaming - once again".getBytes());
connection.commitTransaction();
checkDataWritten2(partLoc, 1, 20, 2, validationQuery, true, "1\tHello streaming",
"2\tWelcome to streaming", "3\tHello streaming - once again",
"4\tWelcome to streaming - once again");
Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
connection.getCurrentTransactionState());
connection.close();
}
@Test
public void testInterleavedTransactionBatchCommits() throws Exception {
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName)
.withTable(tblName)
.withStaticPartitionValues(partitionVals)
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withRecordWriter(writer)
.withHiveConf(conf)
.withTransactionBatchSize(10)
.connect();
// Acquire 1st Txn Batch
connection.beginTransaction();
// Acquire 2nd Txn Batch
StrictDelimitedInputWriter writer2 = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection connection2 = HiveStreamingConnection.newBuilder()
.withDatabase(dbName)
.withTable(tblName)
.withStaticPartitionValues(partitionVals)
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withRecordWriter(writer2)
.withHiveConf(conf)
.withTransactionBatchSize(10)
.connect();
connection2.beginTransaction();
// Interleaved writes to both batches
connection.write("1,Hello streaming".getBytes());
connection2.write("3,Hello streaming - once again".getBytes());
checkNothingWritten(partLoc);
connection2.commitTransaction();
String validationQuery = "select id, msg from " + dbName + "." + tblName + " order by id, msg";
checkDataWritten2(partLoc, 11, 20, 1,
validationQuery, true, "3\tHello streaming - once again");
connection.commitTransaction();
/*now both batches have committed (but not closed) so we for each primary file we expect a side
file to exist and indicate the true length of primary file*/
FileSystem fs = partLoc.getFileSystem(conf);
AcidUtils.Directory dir = AcidUtils.getAcidState(fs, partLoc, conf, getTransactionContext(conf), null, false, null, false);
for (AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
for (FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) {
Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());
Assert.assertTrue(lengthFile + " missing", fs.exists(lengthFile));
long lengthFileSize = fs.getFileStatus(lengthFile).getLen();
Assert.assertTrue("Expected " + lengthFile + " to be non empty. lengh=" +
lengthFileSize, lengthFileSize > 0);
long logicalLength = AcidUtils.getLogicalLength(fs, stat);
long actualLength = stat.getLen();
Assert.assertTrue("", logicalLength == actualLength);
}
}
checkDataWritten2(partLoc, 1, 20, 2,
validationQuery, false, "1\tHello streaming", "3\tHello streaming - once again");
connection.beginTransaction();
connection.write("2,Welcome to streaming".getBytes());
connection2.beginTransaction();
connection2.write("4,Welcome to streaming - once again".getBytes());
//here each batch has written data and committed (to bucket0 since table only has 1 bucket)
//so each of 2 deltas has 1 bucket0 and 1 bucket0_flush_length. Furthermore, each bucket0
//has now received more data(logically - it's buffered) but it is not yet committed.
//lets check that side files exist, etc
dir = AcidUtils.getAcidState(fs, partLoc, conf, getTransactionContext(conf), null, false, null, false);
for (AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
for (FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) {
Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());
Assert.assertTrue(lengthFile + " missing", fs.exists(lengthFile));
long lengthFileSize = fs.getFileStatus(lengthFile).getLen();
Assert.assertTrue("Expected " + lengthFile + " to be non empty. lengh=" +
lengthFileSize, lengthFileSize > 0);
long logicalLength = AcidUtils.getLogicalLength(fs, stat);
long actualLength = stat.getLen();
Assert.assertTrue("", logicalLength <= actualLength);
}
}
checkDataWritten2(partLoc, 1, 20, 2,
validationQuery, true, "1\tHello streaming", "3\tHello streaming - once again");
connection.commitTransaction();
checkDataWritten2(partLoc, 1, 20, 2,
validationQuery, false, "1\tHello streaming",
"2\tWelcome to streaming",
"3\tHello streaming - once again");
connection2.commitTransaction();
checkDataWritten2(partLoc, 1, 20, 2,
validationQuery, true, "1\tHello streaming",
"2\tWelcome to streaming",
"3\tHello streaming - once again",
"4\tWelcome to streaming - once again");
Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
connection.getCurrentTransactionState());
Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
connection2.getCurrentTransactionState());
connection.close();
connection2.close();
}
private static class WriterThd extends Thread {
private final StreamingConnection conn;
private final String data;
private Throwable error;
WriterThd(String data) throws Exception {
super("Writer_" + data);
RecordWriter writer = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName)
.withTable(tblName)
.withStaticPartitionValues(partitionVals)
.withRecordWriter(writer)
.withHiveConf(conf)
.connect();
this.conn = connection;
this.data = data;
setUncaughtExceptionHandler((thread, throwable) -> {
error = throwable;
LOG.error(connection.toTransactionString());
LOG.error("Thread " + thread.getName() + " died: " + throwable.getMessage(), throwable);
});
}
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
conn.beginTransaction();
conn.write(data.getBytes());
conn.write(data.getBytes());
conn.commitTransaction();
} // while
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (conn != null) {
try {
conn.close();
} catch (Exception e) {
LOG.error("txnBatch.close() failed: " + e.getMessage(), e);
}
}
}
}
}
@Test
public void testConcurrentTransactionBatchCommits() throws Exception {
List<WriterThd> writers = new ArrayList<WriterThd>(3);
writers.add(new WriterThd("1,Matrix"));
writers.add(new WriterThd("2,Gandhi"));
writers.add(new WriterThd("3,Silence"));
for (WriterThd w : writers) {
w.start();
}
for (WriterThd w : writers) {
w.join();
}
for (WriterThd w : writers) {
if (w.error != null) {
Assert.assertFalse("Writer thread" + w.getName() + " died: " + w.error.getMessage() +
" See log file for stack trace", true);
}
}
}
private ArrayList<SampleRec> dumpBucket(Path orcFile) throws IOException {
org.apache.hadoop.fs.FileSystem fs = org.apache.hadoop.fs.FileSystem.getLocal(new Configuration());
Reader reader = OrcFile.createReader(orcFile,
OrcFile.readerOptions(conf).filesystem(fs));
RecordReader rows = reader.rows();
StructObjectInspector inspector = (StructObjectInspector) reader
.getObjectInspector();
System.out.format("Found Bucket File : %s \n", orcFile.getName());
ArrayList<SampleRec> result = new ArrayList<SampleRec>();
while (rows.hasNext()) {
Object row = rows.next(null);
SampleRec rec = (SampleRec) deserializeDeltaFileRow(row, inspector)[5];
result.add(rec);
}
return result;
}
// Assumes stored data schema = [acid fields],string,int,string
// return array of 6 fields, where the last field has the actual data
private static Object[] deserializeDeltaFileRow(Object row, StructObjectInspector inspector) {
List<? extends StructField> fields = inspector.getAllStructFieldRefs();
WritableIntObjectInspector f0ins = (WritableIntObjectInspector) fields.get(0).getFieldObjectInspector();
WritableLongObjectInspector f1ins = (WritableLongObjectInspector) fields.get(1).getFieldObjectInspector();
WritableIntObjectInspector f2ins = (WritableIntObjectInspector) fields.get(2).getFieldObjectInspector();
WritableLongObjectInspector f3ins = (WritableLongObjectInspector) fields.get(3).getFieldObjectInspector();
WritableLongObjectInspector f4ins = (WritableLongObjectInspector) fields.get(4).getFieldObjectInspector();
StructObjectInspector f5ins = (StructObjectInspector) fields.get(5).getFieldObjectInspector();
int f0 = f0ins.get(inspector.getStructFieldData(row, fields.get(0)));
long f1 = f1ins.get(inspector.getStructFieldData(row, fields.get(1)));
int f2 = f2ins.get(inspector.getStructFieldData(row, fields.get(2)));
long f3 = f3ins.get(inspector.getStructFieldData(row, fields.get(3)));
long f4 = f4ins.get(inspector.getStructFieldData(row, fields.get(4)));
SampleRec f5 = deserializeInner(inspector.getStructFieldData(row, fields.get(5)), f5ins);
return new Object[]{f0, f1, f2, f3, f4, f5};
}
// Assumes row schema => string,int,string
private static SampleRec deserializeInner(Object row, StructObjectInspector inspector) {
List<? extends StructField> fields = inspector.getAllStructFieldRefs();
WritableStringObjectInspector f0ins = (WritableStringObjectInspector) fields.get(0).getFieldObjectInspector();
WritableIntObjectInspector f1ins = (WritableIntObjectInspector) fields.get(1).getFieldObjectInspector();
WritableStringObjectInspector f2ins = (WritableStringObjectInspector) fields.get(2).getFieldObjectInspector();
String f0 = f0ins.getPrimitiveJavaObject(inspector.getStructFieldData(row, fields.get(0)));
int f1 = f1ins.get(inspector.getStructFieldData(row, fields.get(1)));
String f2 = f2ins.getPrimitiveJavaObject(inspector.getStructFieldData(row, fields.get(2)));
return new SampleRec(f0, f1, f2);
}
@Test
public void testBucketing() throws Exception {
String agentInfo = "UT_" + Thread.currentThread().getName();
dropDB(msClient, dbName3);
dropDB(msClient, dbName4);
// 1) Create two bucketed tables
String dbLocation = dbFolder.newFolder(dbName3).getCanonicalPath() + ".db";
dbLocation = dbLocation.replaceAll("\\\\", "/"); // for windows paths
String[] colNames = "key1,key2,data".split(",");
String[] colTypes = "string,int,string".split(",");
String[] bucketNames = "key1,key2".split(",");
int bucketCount = 4;
createDbAndTable(driver, dbName3, tblName3, null, colNames, colTypes, bucketNames
, null, dbLocation, bucketCount);
String dbLocation2 = dbFolder.newFolder(dbName4).getCanonicalPath() + ".db";
dbLocation2 = dbLocation2.replaceAll("\\\\", "/"); // for windows paths
String[] colNames2 = "key3,key4,data2".split(",");
String[] colTypes2 = "string,int,string".split(",");
String[] bucketNames2 = "key3,key4".split(",");
createDbAndTable(driver, dbName4, tblName4, null, colNames2, colTypes2, bucketNames2
, null, dbLocation2, bucketCount);
// 2) Insert data into both tables
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName3)
.withTable(tblName3)
.withAgentInfo(agentInfo)
.withRecordWriter(writer)
.withHiveConf(conf)
.connect();
connection.beginTransaction();
connection.write("name0,1,Hello streaming".getBytes());
connection.write("name2,2,Welcome to streaming".getBytes());
connection.write("name4,2,more Streaming unlimited".getBytes());
connection.write("name5,2,even more Streaming unlimited".getBytes());
connection.commitTransaction();
connection.close();
StrictDelimitedInputWriter writer2 = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection connection2 = HiveStreamingConnection.newBuilder()
.withDatabase(dbName4)
.withTable(tblName4)
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withRecordWriter(writer2)
.withHiveConf(conf)
.connect();
connection2.beginTransaction();
connection2.write("name5,2,fact3".getBytes()); // bucket 0
connection2.write("name8,2,fact3".getBytes()); // bucket 1
connection2.write("name0,1,fact1".getBytes()); // bucket 2
connection2.commitTransaction();
connection2.close();
// 3 Check data distribution in buckets
HashMap<Integer, ArrayList<SampleRec>> actual1 = dumpAllBuckets(dbLocation, tblName3);
HashMap<Integer, ArrayList<SampleRec>> actual2 = dumpAllBuckets(dbLocation2, tblName4);
System.err.println("\n Table 1");
System.err.println(actual1);
System.err.println("\n Table 2");
System.err.println(actual2);
// assert bucket listing is as expected
Assert.assertEquals("number of buckets does not match expectation", actual1.values().size(), 3);
Assert.assertTrue("bucket 0 shouldn't have been created", actual1.get(0) == null);
Assert.assertEquals("records in bucket does not match expectation", actual1.get(1).size(), 1);
Assert.assertEquals("records in bucket does not match expectation", actual1.get(2).size(), 2);
Assert.assertEquals("records in bucket does not match expectation", actual1.get(3).size(), 1);
}
private void runCmdOnDriver(String cmd) {
boolean t = runDDL(driver, cmd);
Assert.assertTrue(cmd + " failed", t);
}
@Test
public void testFileDump() throws Exception {
String agentInfo = "UT_" + Thread.currentThread().getName();
dropDB(msClient, dbName3);
dropDB(msClient, dbName4);
// 1) Create two bucketed tables
String dbLocation = dbFolder.newFolder(dbName3).getCanonicalPath() + ".db";
dbLocation = dbLocation.replaceAll("\\\\", "/"); // for windows paths
String[] colNames = "key1,key2,data".split(",");
String[] colTypes = "string,int,string".split(",");
String[] bucketNames = "key1,key2".split(",");
int bucketCount = 4;
createDbAndTable(driver, dbName3, tblName3, null, colNames, colTypes, bucketNames
, null, dbLocation, bucketCount);
String dbLocation2 = dbFolder.newFolder(dbName4).getCanonicalPath() + ".db";
dbLocation2 = dbLocation2.replaceAll("\\\\", "/"); // for windows paths
String[] colNames2 = "key3,key4,data2".split(",");
String[] colTypes2 = "string,int,string".split(",");
String[] bucketNames2 = "key3,key4".split(",");
createDbAndTable(driver, dbName4, tblName4, null, colNames2, colTypes2, bucketNames2
, null, dbLocation2, bucketCount);
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName3)
.withTable(tblName3)
.withAgentInfo(agentInfo)
.withHiveConf(conf)
.withRecordWriter(writer)
.connect();
// 2) Insert data into both tables
connection.beginTransaction();
connection.write("name0,1,Hello streaming".getBytes());
connection.write("name2,2,Welcome to streaming".getBytes());
connection.write("name4,2,more Streaming unlimited".getBytes());
connection.write("name5,2,even more Streaming unlimited".getBytes());
connection.commitTransaction();
connection.close();
PrintStream origErr = System.err;
ByteArrayOutputStream myErr = new ByteArrayOutputStream();
// replace stderr and run command
System.setErr(new PrintStream(myErr));
FileDump.main(new String[]{dbLocation});
System.err.flush();
System.setErr(origErr);
String errDump = new String(myErr.toByteArray());
Assert.assertEquals(false, errDump.contains("file(s) are corrupted"));
// since this test runs on local file system which does not have an API to tell if files or
// open or not, we are testing for negative case even though the bucket files are still open
// for writes (transaction batch not closed yet)
Assert.assertEquals(false, errDump.contains("is still open for writes."));
StrictDelimitedInputWriter writer2 = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection connection2 = HiveStreamingConnection.newBuilder()
.withDatabase(dbName4)
.withTable(tblName4)
.withAgentInfo(agentInfo)
.withRecordWriter(writer2)
.withHiveConf(conf)
.connect();
connection2.beginTransaction();
connection2.write("name5,2,fact3".getBytes()); // bucket 0
connection2.write("name8,2,fact3".getBytes()); // bucket 1
connection2.write("name0,1,fact1".getBytes()); // bucket 2
// no data for bucket 3 -- expect 0 length bucket file
connection2.commitTransaction();
connection2.close();
origErr = System.err;
myErr = new ByteArrayOutputStream();
// replace stderr and run command
System.setErr(new PrintStream(myErr));
FileDump.main(new String[]{dbLocation});
System.out.flush();
System.err.flush();
System.setErr(origErr);
errDump = new String(myErr.toByteArray());
Assert.assertEquals(false, errDump.contains("Exception"));
Assert.assertEquals(false, errDump.contains("file(s) are corrupted"));
Assert.assertEquals(false, errDump.contains("is still open for writes."));
}
@Test
public void testFileDumpDeltaFilesWithStreamingOptimizations() throws Exception {
String agentInfo = "UT_" + Thread.currentThread().getName();
dropDB(msClient, dbName3);
dropDB(msClient, dbName4);
// 1) Create two bucketed tables
String dbLocation = dbFolder.newFolder(dbName3).getCanonicalPath() + ".db";
dbLocation = dbLocation.replaceAll("\\\\", "/"); // for windows paths
String[] colNames = "key1,key2,data".split(",");
String[] colTypes = "string,int,string".split(",");
String[] bucketNames = "key1,key2".split(",");
int bucketCount = 4;
createDbAndTable(driver, dbName3, tblName3, null, colNames, colTypes, bucketNames
, null, dbLocation, bucketCount);
// 2) Insert data into both tables
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName3)
.withTable(tblName3)
.withAgentInfo(agentInfo)
.withHiveConf(conf)
.withRecordWriter(writer)
.withStreamingOptimizations(true)
.connect();
connection.beginTransaction();
connection.write("name0,1,streaming".getBytes());
connection.write("name2,2,streaming".getBytes());
connection.write("name4,2,unlimited".getBytes());
connection.write("name5,2,unlimited".getBytes());
for (int i = 0; i < 6000; i++) {
if (i % 2 == 0) {
connection.write(("name" + i + "," + i + "," + "streaming").getBytes());
} else {
connection.write(("name" + i + "," + i + "," + "unlimited").getBytes());
}
}
connection.commitTransaction();
connection.close();
connection.close();
PrintStream origOut = System.out;
ByteArrayOutputStream myOut = new ByteArrayOutputStream();
// replace stderr and run command
System.setOut(new PrintStream(myOut));
FileDump.main(new String[]{dbLocation});
System.out.flush();
System.setOut(origOut);
String outDump = new String(myOut.toByteArray());
// make sure delta files are written with no indexes and no dictionary
Assert.assertEquals(true, outDump.contains("Compression: ZLIB"));
// no stats/indexes
Assert.assertEquals(true, outDump.contains("Column 0: count: 0 hasNull: false"));
Assert.assertEquals(true, outDump.contains("Column 1: count: 0 hasNull: false bytesOnDisk: 15 sum: 0"));
Assert.assertEquals(true, outDump.contains("Column 2: count: 0 hasNull: false bytesOnDisk: 15 sum: 0"));
Assert.assertEquals(true, outDump.contains("Column 3: count: 0 hasNull: false bytesOnDisk: 19 sum: 0"));
Assert.assertEquals(true, outDump.contains("Column 4: count: 0 hasNull: false bytesOnDisk: 17 sum: 0"));
Assert.assertEquals(true, outDump.contains("Column 5: count: 0 hasNull: false bytesOnDisk: 15 sum: 0"));
Assert.assertEquals(true, outDump.contains("Column 6: count: 0 hasNull: false"));
Assert.assertEquals(true, outDump.contains("Column 7: count: 0 hasNull: false bytesOnDisk: 3929"));
Assert.assertEquals(true, outDump.contains("Column 8: count: 0 hasNull: false bytesOnDisk: 1484 sum: 0"));
Assert.assertEquals(true, outDump.contains("Column 9: count: 0 hasNull: false bytesOnDisk: 816"));
// no dictionary
Assert.assertEquals(true, outDump.contains("Encoding column 7: DIRECT_V2"));
Assert.assertEquals(true, outDump.contains("Encoding column 9: DIRECT_V2"));
}
@Test
public void testFileDumpDeltaFilesWithoutStreamingOptimizations() throws Exception {
String agentInfo = "UT_" + Thread.currentThread().getName();
dropDB(msClient, dbName3);
dropDB(msClient, dbName4);
// 1) Create two bucketed tables
String dbLocation = dbFolder.newFolder(dbName3).getCanonicalPath() + ".db";
dbLocation = dbLocation.replaceAll("\\\\", "/"); // for windows paths
String[] colNames = "key1,key2,data".split(",");
String[] colTypes = "string,int,string".split(",");
String[] bucketNames = "key1,key2".split(",");
int bucketCount = 4;
createDbAndTable(driver, dbName3, tblName3, null, colNames, colTypes, bucketNames
, null, dbLocation, bucketCount);
// 2) Insert data into both tables
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName3)
.withTable(tblName3)
.withAgentInfo(agentInfo)
.withHiveConf(conf)
.withRecordWriter(writer)
.withStreamingOptimizations(false)
.connect();
connection.beginTransaction();
connection.write("name0,1,streaming".getBytes());
connection.write("name2,2,streaming".getBytes());
connection.write("name4,2,unlimited".getBytes());
connection.write("name5,2,unlimited".getBytes());
for (int i = 0; i < 6000; i++) {
if (i % 2 == 0) {
connection.write(("name" + i + "," + i + "," + "streaming").getBytes());
} else {
connection.write(("name" + i + "," + i + "," + "unlimited").getBytes());
}
}
connection.commitTransaction();
connection.close();
PrintStream origOut = System.out;
ByteArrayOutputStream myOut = new ByteArrayOutputStream();
// replace stderr and run command
System.setOut(new PrintStream(myOut));
FileDump.main(new String[]{dbLocation});
System.out.flush();
System.setOut(origOut);
String outDump = new String(myOut.toByteArray());
Assert.assertEquals(true, outDump.contains("Compression: ZLIB"));
Assert.assertEquals(true, outDump.contains("Encoding column 9: DICTIONARY"));
}
@Test
public void testFileDumpCorruptDataFiles() throws Exception {
dropDB(msClient, dbName3);
// 1) Create two bucketed tables
String dbLocation = dbFolder.newFolder(dbName3).getCanonicalPath() + ".db";
dbLocation = dbLocation.replaceAll("\\\\", "/"); // for windows paths
String[] colNames = "key1,key2,data".split(",");
String[] colTypes = "string,int,string".split(",");
String[] bucketNames = "key1,key2".split(",");
int bucketCount = 4;
createDbAndTable(driver, dbName3, tblName3, null, colNames, colTypes, bucketNames
, null, dbLocation, bucketCount);
// 2) Insert data into both tables
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName3)
.withTable(tblName3)
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withRecordWriter(writer)
.withHiveConf(conf)
.withTransactionBatchSize(10)
.connect();
// we need side file for this test, so we create 2 txn batch and test with only one
connection.beginTransaction();
connection.write("name0,1,Hello streaming".getBytes());
connection.write("name2,2,Welcome to streaming".getBytes());
connection.write("name4,2,more Streaming unlimited".getBytes());
connection.write("name5,2,even more Streaming unlimited".getBytes());
connection.commitTransaction();
// intentionally corrupt some files
Path path = new Path(dbLocation);
Collection<String> files = FileDump.getAllFilesInPath(path, conf);
for (String file : files) {
if (file.contains("bucket_00000")) {
// empty out the file
corruptDataFile(file, conf, Integer.MIN_VALUE);
} else if (file.contains("bucket_00001")) {
corruptDataFile(file, conf, -1);
} else if (file.contains("bucket_00002")) {
corruptDataFile(file, conf, 100);
} else if (file.contains("bucket_00003")) {
corruptDataFile(file, conf, 100);
}
}
PrintStream origErr = System.err;
ByteArrayOutputStream myErr = new ByteArrayOutputStream();
// replace stderr and run command
System.setErr(new PrintStream(myErr));
FileDump.main(new String[]{dbLocation});
System.err.flush();
System.setErr(origErr);
String errDump = new String(myErr.toByteArray());
Assert.assertEquals(false, errDump.contains("Exception"));
Assert.assertEquals(true, errDump.contains("3 file(s) are corrupted"));
Assert.assertEquals(false, errDump.contains("is still open for writes."));
origErr = System.err;
myErr = new ByteArrayOutputStream();
// replace stderr and run command
System.setErr(new PrintStream(myErr));
FileDump.main(new String[]{dbLocation, "--recover", "--skip-dump"});
System.err.flush();
System.setErr(origErr);
errDump = new String(myErr.toByteArray());
Assert.assertEquals(true, errDump.contains("bucket_00001 recovered successfully!"));
Assert.assertEquals(true, errDump.contains("No readable footers found. Creating empty orc file."));
Assert.assertEquals(true, errDump.contains("bucket_00002 recovered successfully!"));
Assert.assertEquals(true, errDump.contains("bucket_00003 recovered successfully!"));
Assert.assertEquals(false, errDump.contains("Exception"));
Assert.assertEquals(false, errDump.contains("is still open for writes."));
// test after recovery
origErr = System.err;
myErr = new ByteArrayOutputStream();
// replace stdout and run command
System.setErr(new PrintStream(myErr));
FileDump.main(new String[]{dbLocation});
System.err.flush();
System.setErr(origErr);
errDump = new String(myErr.toByteArray());
Assert.assertEquals(false, errDump.contains("Exception"));
Assert.assertEquals(false, errDump.contains("file(s) are corrupted"));
Assert.assertEquals(false, errDump.contains("is still open for writes."));
// after recovery there shouldn'table be any *_flush_length files
files = FileDump.getAllFilesInPath(path, conf);
for (String file : files) {
Assert.assertEquals(false, file.contains("_flush_length"));
}
connection.close();
}
private void corruptDataFile(final String file, final Configuration conf, final int addRemoveBytes)
throws Exception {
Path bPath = new Path(file);
Path cPath = new Path(bPath.getParent(), bPath.getName() + ".corrupt");
FileSystem fs = bPath.getFileSystem(conf);
FileStatus fileStatus = fs.getFileStatus(bPath);
int len = addRemoveBytes == Integer.MIN_VALUE ? 0 : (int) fileStatus.getLen() + addRemoveBytes;
byte[] buffer = new byte[len];
FSDataInputStream fdis = fs.open(bPath);
fdis.readFully(0, buffer, 0, (int) Math.min(fileStatus.getLen(), buffer.length));
fdis.close();
FSDataOutputStream fdos = fs.create(cPath, true);
fdos.write(buffer, 0, buffer.length);
fdos.close();
fs.delete(bPath, false);
fs.rename(cPath, bPath);
}
@Test
public void testFileDumpCorruptSideFiles() throws Exception {
dropDB(msClient, dbName3);
// 1) Create two bucketed tables
String dbLocation = dbFolder.newFolder(dbName3).getCanonicalPath() + ".db";
dbLocation = dbLocation.replaceAll("\\\\", "/"); // for windows paths
String[] colNames = "key1,key2,data".split(",");
String[] colTypes = "string,int,string".split(",");
String[] bucketNames = "key1,key2".split(",");
int bucketCount = 4;
createDbAndTable(driver, dbName3, tblName3, null, colNames, colTypes, bucketNames
, null, dbLocation, bucketCount);
// 2) Insert data into both tables
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase(dbName3)
.withTable(tblName3)
.withAgentInfo("UT_" + Thread.currentThread().getName())
.withRecordWriter(writer)
.withHiveConf(conf)
.withTransactionBatchSize(10)
.connect();
connection.beginTransaction();
connection.write("name0,1,Hello streaming".getBytes());
connection.write("name2,2,Welcome to streaming".getBytes());
connection.write("name4,2,more Streaming unlimited".getBytes());
connection.write("name5,2,even more Streaming unlimited".getBytes());
connection.write("name6,3,aHello streaming".getBytes());
connection.commitTransaction();
Map<String, List<Long>> offsetMap = new HashMap<String, List<Long>>();
recordOffsets(conf, dbLocation, offsetMap);
connection.beginTransaction();
connection.write("name01,11,-Hello streaming".getBytes());
connection.write("name21,21,-Welcome to streaming".getBytes());
connection.write("name41,21,-more Streaming unlimited".getBytes());
connection.write("name51,21,-even more Streaming unlimited".getBytes());
connection.write("name02,12,--Hello streaming".getBytes());
connection.write("name22,22,--Welcome to streaming".getBytes());
connection.write("name42,22,--more Streaming unlimited".getBytes());
connection.write("name52,22,--even more Streaming unlimited".getBytes());
connection.write("name7,4,aWelcome to streaming".getBytes());
connection.write("name8,5,amore Streaming unlimited".getBytes());
connection.write("name9,6,aeven more Streaming unlimited".getBytes());
connection.write("name10,7,bHello streaming".getBytes());
connection.write("name11,8,bWelcome to streaming".getBytes());
connection.write("name12,9,bmore Streaming unlimited".getBytes());
connection.write("name13,10,beven more Streaming unlimited".getBytes());
connection.commitTransaction();
recordOffsets(conf, dbLocation, offsetMap);
// intentionally corrupt some files
Path path = new Path(dbLocation);
Collection<String> files = FileDump.getAllFilesInPath(path, conf);
for (String file : files) {
if (file.contains("bucket_00000")) {
corruptSideFile(file, conf, offsetMap, "bucket_00000", -1); // corrupt last entry
} else if (file.contains("bucket_00001")) {
corruptSideFile(file, conf, offsetMap, "bucket_00001", 0); // empty out side file
} else if (file.contains("bucket_00002")) {
corruptSideFile(file, conf, offsetMap, "bucket_00002", 3); // total 3 entries (2 valid + 1 fake)
} else if (file.contains("bucket_00003")) {
corruptSideFile(file, conf, offsetMap, "bucket_00003", 10); // total 10 entries (2 valid + 8 fake)
}
}
PrintStream origErr = System.err;
ByteArrayOutputStream myErr = new ByteArrayOutputStream();
// replace stderr and run command
System.setErr(new PrintStream(myErr));
FileDump.main(new String[]{dbLocation});
System.err.flush();
System.setErr(origErr);
String errDump = new String(myErr.toByteArray());
Assert.assertEquals(true, errDump.contains("bucket_00000_flush_length [length: 11"));
Assert.assertEquals(true, errDump.contains("bucket_00001_flush_length [length: 0"));
Assert.assertEquals(true, errDump.contains("bucket_00002_flush_length [length: 24"));
Assert.assertEquals(true, errDump.contains("bucket_00003_flush_length [length: 80"));
Assert.assertEquals(false, errDump.contains("Exception"));
Assert.assertEquals(true, errDump.contains("4 file(s) are corrupted"));
Assert.assertEquals(false, errDump.contains("is still open for writes."));
origErr = System.err;
myErr = new ByteArrayOutputStream();
// replace stderr and run command
System.setErr(new PrintStream(myErr));
FileDump.main(new String[]{dbLocation, "--recover", "--skip-dump"});
System.err.flush();
System.setErr(origErr);
errDump = new String(myErr.toByteArray());
Assert.assertEquals(true, errDump.contains("bucket_00000 recovered successfully!"));
Assert.assertEquals(true, errDump.contains("bucket_00001 recovered successfully!"));
Assert.assertEquals(true, errDump.contains("bucket_00002 recovered successfully!"));
Assert.assertEquals(true, errDump.contains("bucket_00003 recovered successfully!"));
List<Long> offsets = offsetMap.get("bucket_00000");
Assert.assertEquals(true, errDump.contains("Readable footerOffsets: " + offsets.toString()));
offsets = offsetMap.get("bucket_00001");
Assert.assertEquals(true, errDump.contains("Readable footerOffsets: " + offsets.toString()));
offsets = offsetMap.get("bucket_00002");
Assert.assertEquals(true, errDump.contains("Readable footerOffsets: " + offsets.toString()));
offsets = offsetMap.get("bucket_00003");
Assert.assertEquals(true, errDump.contains("Readable footerOffsets: " + offsets.toString()));
Assert.assertEquals(false, errDump.contains("Exception"));
Assert.assertEquals(false, errDump.contains("is still open for writes."));
// test after recovery
origErr = System.err;
myErr = new ByteArrayOutputStream();
// replace stdout and run command
System.setErr(new PrintStream(myErr));
FileDump.main(new String[]{dbLocation});
System.err.flush();
System.setErr(origErr);
errDump = new String(myErr.toByteArray());
Assert.assertEquals(false, errDump.contains("Exception"));
Assert.assertEquals(false, errDump.contains("file(s) are corrupted"));
Assert.assertEquals(false, errDump.contains("is still open for writes."));
// after recovery there shouldn'table be any *_flush_length files
files = FileDump.getAllFilesInPath(path, conf);
for (String file : files) {
Assert.assertEquals(false, file.contains("_flush_length"));
}
connection.close();
}
private void corruptSideFile(final String file, final HiveConf conf,
final Map<String, List<Long>> offsetMap, final String key, final int numEntries)
throws IOException {
Path dataPath = new Path(file);
Path sideFilePath = OrcAcidUtils.getSideFile(dataPath);
Path cPath = new Path(sideFilePath.getParent(), sideFilePath.getName() + ".corrupt");
FileSystem fs = sideFilePath.getFileSystem(conf);
List<Long> offsets = offsetMap.get(key);
long lastOffset = offsets.get(offsets.size() - 1);
FSDataOutputStream fdos = fs.create(cPath, true);
// corrupt last entry
if (numEntries < 0) {
byte[] lastOffsetBytes = longToBytes(lastOffset);
for (int i = 0; i < offsets.size() - 1; i++) {
fdos.writeLong(offsets.get(i));
}
fdos.write(lastOffsetBytes, 0, 3);
} else if (numEntries > 0) {
int firstRun = Math.min(offsets.size(), numEntries);
// add original entries
for (int i = 0; i < firstRun; i++) {
fdos.writeLong(offsets.get(i));
}
// add fake entries
int remaining = numEntries - firstRun;
for (int i = 0; i < remaining; i++) {
fdos.writeLong(lastOffset + ((i + 1) * 100));
}
}
fdos.close();
fs.delete(sideFilePath, false);
fs.rename(cPath, sideFilePath);
}
private byte[] longToBytes(long x) {
ByteBuffer buffer = ByteBuffer.allocate(8);
buffer.putLong(x);
return buffer.array();
}
private void recordOffsets(final HiveConf conf, final String dbLocation,
final Map<String, List<Long>> offsetMap) throws IOException {
Path path = new Path(dbLocation);
Collection<String> files = FileDump.getAllFilesInPath(path, conf);
for (String file : files) {
Path bPath = new Path(file);
FileSystem fs = bPath.getFileSystem(conf);
FileStatus fileStatus = fs.getFileStatus(bPath);
long len = fileStatus.getLen();
if (file.contains("bucket_00000")) {
if (offsetMap.containsKey("bucket_00000")) {
List<Long> offsets = offsetMap.get("bucket_00000");
offsets.add(len);
offsetMap.put("bucket_00000", offsets);
} else {
List<Long> offsets = new ArrayList<Long>();
offsets.add(len);
offsetMap.put("bucket_00000", offsets);
}
} else if (file.contains("bucket_00001")) {
if (offsetMap.containsKey("bucket_00001")) {
List<Long> offsets = offsetMap.get("bucket_00001");
offsets.add(len);
offsetMap.put("bucket_00001", offsets);
} else {
List<Long> offsets = new ArrayList<Long>();
offsets.add(len);
offsetMap.put("bucket_00001", offsets);
}
} else if (file.contains("bucket_00002")) {
if (offsetMap.containsKey("bucket_00002")) {
List<Long> offsets = offsetMap.get("bucket_00002");
offsets.add(len);
offsetMap.put("bucket_00002", offsets);
} else {
List<Long> offsets = new ArrayList<Long>();
offsets.add(len);
offsetMap.put("bucket_00002", offsets);
}
} else if (file.contains("bucket_00003")) {
if (offsetMap.containsKey("bucket_00003")) {
List<Long> offsets = offsetMap.get("bucket_00003");
offsets.add(len);
offsetMap.put("bucket_00003", offsets);
} else {
List<Long> offsets = new ArrayList<Long>();
offsets.add(len);
offsetMap.put("bucket_00003", offsets);
}
}
}
}
@Test
public void testErrorHandling()
throws Exception {
String agentInfo = "UT_" + Thread.currentThread().getName();
runCmdOnDriver("create database testErrors");
runCmdOnDriver("use testErrors");
runCmdOnDriver(
"create table T(a int, b int) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
StrictDelimitedInputWriter innerWriter = StrictDelimitedInputWriter.newBuilder()
.withFieldDelimiter(',')
.build();
HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
.withDatabase("testErrors")
.withTable("T")
.withAgentInfo(agentInfo)
.withTransactionBatchSize(2)
.withRecordWriter(innerWriter)
.withHiveConf(conf)
.connect();
connection.beginTransaction();
FaultyWriter writer = new FaultyWriter(innerWriter);
connection.close();
Exception expectedEx = null;
GetOpenTxnsInfoResponse r = msClient.showTxns();
Assert.assertEquals("HWM didn'table match", 17, r.getTxn_high_water_mark());
List<TxnInfo> ti = r.getOpen_txns();
Assert.assertEquals("wrong status ti(0)",
org.apache.hadoop.hive.metastore.api.TxnState.ABORTED,
ti.get(0).getState());
Assert.assertEquals("wrong status ti(1)",
org.apache.hadoop.hive.metastore.api.TxnState.ABORTED,
ti.get(1).getState());
try {
connection.beginTransaction();
} catch (StreamingException ex) {
expectedEx = ex;
}
Assert.assertTrue("beginTransaction() should have failed",
expectedEx != null && expectedEx.getMessage().contains("Streaming connection is closed already."));
connection = HiveStreamingConnection.newBuilder()
.withDatabase("testErrors")
.withTable("T")
.withAgentInfo(agentInfo)
.withTransactionBatchSize(2)
.withRecordWriter(innerWriter)
.withHiveConf(conf)
.connect();
expectedEx = null;
try {
connection.write("name0,1,Hello streaming".getBytes());
} catch (StreamingException ex) {
expectedEx = ex;
}
Assert.assertTrue("write() should have failed",
expectedEx != null && expectedEx.getMessage().equals("Transaction batch is null. Missing beginTransaction?"));
expectedEx = null;
try {
connection.commitTransaction();
} catch (StreamingException ex) {
expectedEx = ex;
}
Assert.assertTrue("commitTransaction() should have failed",
expectedEx != null && expectedEx.getMessage().equals("Transaction batch is null. Missing beginTransaction?"));
connection = HiveStreamingConnection.newBuilder()
.withDatabase("testErrors")
.withTable("T")
.withAgentInfo(agentInfo)
.withTransactionBatchSize(2)
.withRecordWriter(writer)
.withHiveConf(conf)
.connect();
connection.beginTransaction();
connection.write("name2,2,Welcome to streaming".getBytes());
connection.write("name4,2,more Streaming unlimited".getBytes());
connection.write("name5,2,even more Streaming unlimited".getBytes());
connection.commitTransaction();
//test toString()
String s = connection.toTransactionString();
Assert.assertTrue("Actual: " + s, s.contains("LastUsed " + JavaUtils.txnIdToString(connection.getCurrentTxnId())));
Assert.assertTrue("Actual: " + s, s.contains("TxnStatus[CO]"));
expectedEx = null;
connection.beginTransaction();
writer.enableErrors();
try {
connection.write("name6,2,Doh!".getBytes());
} catch (StreamingIOFailure ex) {
expectedEx = ex;
}
Assert.assertTrue("Wrong exception: " + (expectedEx != null ? expectedEx.getMessage() : "?"),
expectedEx != null && expectedEx.getMessage().contains("Simulated fault occurred"));
expectedEx = null;
try {
connection.commitTransaction();
} catch (StreamingException ex) {
expectedEx = ex;
}
Assert.assertTrue("commitTransaction() should have failed",
expectedEx != null && expectedEx.getMessage().equals("Transaction state is not OPEN. Missing beginTransaction?"));
//test toString()
s = connection.toTransactionString();
Assert.assertTrue("Actual: " + s, s.contains("LastUsed " + JavaUtils.txnIdToString(connection.getCurrentTxnId())));
Assert.assertTrue("Actual: " + s, s.contains("TxnStatus[CA]"));
r = msClient.showTxns();
Assert.assertEquals("HWM didn't match", 19, r.getTxn_high_water_mark());
ti = r.getOpen_txns();
Assert.assertEquals("wrong status ti(0)",
org.apache.hadoop.hive.metastore.api.TxnState.ABORTED,
ti.get(0).getState());
Assert.assertEquals("wrong status ti(1)",
org.apache.hadoop.hive.metastore.api.TxnState.ABORTED,
ti.get(1).getState());
//txnid 3 was committed and thus not open
Assert.assertEquals("wrong status ti(2)",
org.apache.hadoop.hive.metastore.api.TxnState.ABORTED,
ti.get(2).getState());
connection.close();
writer.disableErrors();
connection = HiveStreamingConnection.newBuilder()
.withDatabase("testErrors")
.withTable("T")
.withAgentInfo(agentInfo)
.withTransactionBatchSize(2)
.withRecordWriter(writer)
.withHiveConf(conf)
.connect();
connection.beginTransaction();
connection.write("name2,2,Welcome to streaming".getBytes());
writer.enableErrors();
expectedEx = null;
try {
connection.commitTransaction();
} catch (StreamingIOFailure ex) {
expectedEx = ex;
}
Assert.assertTrue("Wrong exception: " + (expectedEx != null ? expectedEx.getMessage() : "?"),
expectedEx != null && expectedEx.getMessage().contains("Simulated fault occurred"));
r = msClient.showTxns();
Assert.assertEquals("HWM didn'table match", 21, r.getTxn_high_water_mark());
ti = r.getOpen_txns();
Assert.assertEquals("wrong status ti(3)",
org.apache.hadoop.hive.metastore.api.TxnState.ABORTED,
ti.get(3).getState());
Assert.assertEquals("wrong status ti(4)",
org.apache.hadoop.hive.metastore.api.TxnState.ABORTED,
ti.get(4).getState());
}
// assumes un partitioned table
// returns a map<bucketNum, list<record> >
private HashMap<Integer, ArrayList<SampleRec>> dumpAllBuckets(String dbLocation, String tableName)
throws IOException {
HashMap<Integer, ArrayList<SampleRec>> result = new HashMap<Integer, ArrayList<SampleRec>>();
for (File deltaDir : new File(dbLocation + "/" + tableName).listFiles()) {
if (!deltaDir.getName().startsWith("delta")) {
continue;
}
File[] bucketFiles = deltaDir.listFiles(new FileFilter() {
@Override
public boolean accept(File pathname) {
String name = pathname.getName();
return !name.startsWith("_") && !name.startsWith(".");
}
});
for (File bucketFile : bucketFiles) {
if (bucketFile.toString().endsWith("length")) {
continue;
}
Integer bucketNum = getBucketNumber(bucketFile);
ArrayList<SampleRec> recs = dumpBucket(new Path(bucketFile.toString()));
result.put(bucketNum, recs);
}
}
return result;
}
//assumes bucket_NNNNN format of file name
private Integer getBucketNumber(File bucketFile) {
String fname = bucketFile.getName();
int start = fname.indexOf('_');
String number = fname.substring(start + 1, fname.length());
return Integer.parseInt(number);
}
// delete db and all tables in it
public static void dropDB(IMetaStoreClient client, String databaseName) {
try {
for (String table : client.listTableNamesByFilter(databaseName, "", (short) -1)) {
client.dropTable(databaseName, table, true, true);
}
client.dropDatabase(databaseName);
} catch (TException e) {
}
}
///////// -------- UTILS ------- /////////
// returns Path of the partition created (if any) else Path of table
private static Path createDbAndTable(IDriver driver, String databaseName,
String tableName, List<String> partVals,
String[] colNames, String[] colTypes,
String[] bucketCols,
String[] partNames, String dbLocation, int bucketCount)
throws Exception {
String dbUri = "raw://" + new Path(dbLocation).toUri().toString();
String tableLoc = dbUri + Path.SEPARATOR + tableName;
runDDL(driver, "create database IF NOT EXISTS " + databaseName + " location '" + dbUri + "'");
runDDL(driver, "use " + databaseName);
String crtTbl = "create table " + tableName +
" ( " + getTableColumnsStr(colNames, colTypes) + " )" +
getPartitionStmtStr(partNames) +
" clustered by ( " + join(bucketCols, ",") + " )" +
" into " + bucketCount + " buckets " +
" stored as orc " +
" location '" + tableLoc + "'" +
" TBLPROPERTIES ('transactional'='true') ";
runDDL(driver, crtTbl);
if (partNames != null && partNames.length != 0) {
return addPartition(driver, tableName, partVals, partNames);
}
return new Path(tableLoc);
}
private static Path addPartition(IDriver driver, String tableName, List<String> partVals, String[] partNames)
throws Exception {
String partSpec = getPartsSpec(partNames, partVals);
String addPart = "alter table " + tableName + " add partition ( " + partSpec + " )";
runDDL(driver, addPart);
return getPartitionPath(driver, tableName, partSpec);
}
private static Path getPartitionPath(IDriver driver, String tableName, String partSpec) throws Exception {
ArrayList<String> res = queryTable(driver, "describe extended " + tableName + " PARTITION (" + partSpec + ")");
String partInfo = res.get(res.size() - 1);
int start = partInfo.indexOf("location:") + "location:".length();
int end = partInfo.indexOf(",", start);
return new Path(partInfo.substring(start, end));
}
private static String getTableColumnsStr(String[] colNames, String[] colTypes) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < colNames.length; ++i) {
sb.append(colNames[i]).append(" ").append(colTypes[i]);
if (i < colNames.length - 1) {
sb.append(",");
}
}
return sb.toString();
}
// converts partNames into "partName1 string, partName2 string"
private static String getTablePartsStr(String[] partNames) {
if (partNames == null || partNames.length == 0) {
return "";
}
StringBuilder sb = new StringBuilder();
for (int i = 0; i < partNames.length; ++i) {
sb.append(partNames[i]).append(" string");
if (i < partNames.length - 1) {
sb.append(",");
}
}
return sb.toString();
}
// converts partNames,partVals into "partName1=val1, partName2=val2"
private static String getPartsSpec(String[] partNames, List<String> partVals) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < partVals.size(); ++i) {
sb.append(partNames[i]).append(" = '").append(partVals.get(i)).append("'");
if (i < partVals.size() - 1) {
sb.append(",");
}
}
return sb.toString();
}
private static String join(String[] values, String delimiter) {
if (values == null) {
return null;
}
StringBuilder strbuf = new StringBuilder();
boolean first = true;
for (Object value : values) {
if (!first) {
strbuf.append(delimiter);
} else {
first = false;
}
strbuf.append(value.toString());
}
return strbuf.toString();
}
private static String getPartitionStmtStr(String[] partNames) {
if (partNames == null || partNames.length == 0) {
return "";
}
return " partitioned by (" + getTablePartsStr(partNames) + " )";
}
private static boolean runDDL(IDriver driver, String sql) {
LOG.debug(sql);
System.out.println(sql);
//LOG.debug("Running Hive Query: "+ sql);
try {
driver.run(sql);
return true;
} catch (CommandProcessorException e) {
LOG.error("Statement: " + sql + " failed: " + e);
return false;
}
}
private static ArrayList<String> queryTable(IDriver driver, String query) throws IOException {
try {
driver.run(query);
} catch (CommandProcessorException e) {
throw new RuntimeException(query + " failed: " + e);
}
ArrayList<String> res = new ArrayList<String>();
driver.getResults(res);
return res;
}
private static class SampleRec {
public String field1;
public int field2;
public String field3;
public SampleRec(String field1, int field2, String field3) {
this.field1 = field1;
this.field2 = field2;
this.field3 = field3;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SampleRec that = (SampleRec) o;
if (field2 != that.field2) {
return false;
}
if (field1 != null ? !field1.equals(that.field1) : that.field1 != null) {
return false;
}
return !(field3 != null ? !field3.equals(that.field3) : that.field3 != null);
}
@Override
public int hashCode() {
int result = field1 != null ? field1.hashCode() : 0;
result = 31 * result + field2;
result = 31 * result + (field3 != null ? field3.hashCode() : 0);
return result;
}
@Override
public String toString() {
return " { " +
"'" + field1 + '\'' +
"," + field2 +
",'" + field3 + '\'' +
" }";
}
}
/**
* This is test-only wrapper around the real RecordWriter.
* It can simulate faults from lower levels to test error handling logic.
*/
private static final class FaultyWriter implements RecordWriter {
private final RecordWriter delegate;
private boolean shouldThrow = false;
private FaultyWriter(RecordWriter delegate) {
assert delegate != null;
this.delegate = delegate;
}
@Override
public void init(final StreamingConnection connection, final long minWriteId, final long maxWriteID)
throws StreamingException {
delegate.init(connection, minWriteId, maxWriteID);
}
@Override
public void write(long writeId, byte[] record) throws StreamingException {
delegate.write(writeId, record);
produceFault();
}
@Override
public void write(final long writeId, final InputStream inputStream) throws StreamingException {
delegate.write(writeId, inputStream);
produceFault();
}
@Override
public void flush() throws StreamingException {
delegate.flush();
produceFault();
}
@Override
public void close() throws StreamingException {
delegate.close();
}
@Override
public Set<String> getPartitions() {
return delegate.getPartitions();
}
/**
* allows testing of "unexpected" errors
*
* @throws StreamingIOFailure
*/
private void produceFault() throws StreamingIOFailure {
if (shouldThrow) {
throw new StreamingIOFailure("Simulated fault occurred");
}
}
void enableErrors() {
shouldThrow = true;
}
void disableErrors() {
shouldThrow = false;
}
@Override
public Path getDeltaFileLocation(List<String> partitionValues,
Integer bucketId, Long minWriteId, Long maxWriteId, Integer statementId,
Table table) throws StreamingException {
return null;
}
}
}