blob: 062fc67470eb7ddbd8a18f1951cc92ef30ebbba0 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.kudu.client;
import static org.apache.kudu.test.ClientTestUtil.countRowsInScan;
import static org.apache.kudu.test.ClientTestUtil.createBasicSchemaInsert;
import static org.apache.kudu.test.ClientTestUtil.getBasicCreateTableOptions;
import static org.apache.kudu.test.ClientTestUtil.getBasicTableOptionsWithNonCoveredRange;
import static org.apache.kudu.test.ClientTestUtil.scanTableToStrings;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.List;
import com.google.common.collect.ImmutableList;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.test.ClientTestUtil;
import org.apache.kudu.test.KuduTestHarness;
public class TestKuduSession {
private static final String tableName = "TestKuduSession";
private static final Schema basicSchema = ClientTestUtil.getBasicSchema();
private KuduClient client;
private AsyncKuduClient asyncClient;
@Rule
public KuduTestHarness harness = new KuduTestHarness();
@Before
public void setUp() {
client = harness.getClient();
asyncClient = harness.getAsyncClient();
}
@Test(timeout = 100000)
public void testBasicOps() throws Exception {
KuduTable table = client.createTable(tableName, basicSchema, getBasicCreateTableOptions());
KuduSession session = client.newSession();
for (int i = 0; i < 10; i++) {
session.apply(createInsert(table, i));
}
assertEquals(10, countRowsInScan(client.newScannerBuilder(table).build()));
OperationResponse resp = session.apply(createInsert(table, 0));
assertTrue(resp.hasRowError());
session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
for (int i = 10; i < 20; i++) {
session.apply(createInsert(table, i));
}
session.flush();
assertEquals(20, countRowsInScan(client.newScannerBuilder(table).build()));
}
@Test(timeout = 100000)
public void testIgnoreAllDuplicateRows() throws Exception {
KuduTable table = client.createTable(tableName, basicSchema, getBasicCreateTableOptions());
KuduSession session = client.newSession();
session.setIgnoreAllDuplicateRows(true);
for (int i = 0; i < 10; i++) {
session.apply(createInsert(table, i));
}
// Test all of the various flush modes to be sure we correctly handle errors in
// individual operations and batches.
for (SessionConfiguration.FlushMode mode : SessionConfiguration.FlushMode.values()) {
session.setFlushMode(mode);
for (int i = 0; i < 10; i++) {
OperationResponse resp = session.apply(createInsert(table, i));
if (mode == SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC) {
assertFalse(resp.hasRowError());
}
}
if (mode == SessionConfiguration.FlushMode.MANUAL_FLUSH) {
List<OperationResponse> responses = session.flush();
for (OperationResponse resp : responses) {
assertFalse(resp.hasRowError());
}
} else if (mode == SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND) {
while (session.hasPendingOperations()) {
Thread.sleep(100);
}
assertEquals(0, session.countPendingErrors());
}
}
}
@Test(timeout = 100000)
public void testIgnoreAllNotFoundRows() throws Exception {
KuduTable table = client.createTable(tableName, basicSchema, getBasicCreateTableOptions());
KuduSession session = client.newSession();
session.setIgnoreAllNotFoundRows(true);
// Test all of the various flush modes to be sure we correctly handle errors in
// individual operations and batches.
for (SessionConfiguration.FlushMode mode : SessionConfiguration.FlushMode.values()) {
session.setFlushMode(mode);
for (int i = 0; i < 10; i++) {
session.apply(createDelete(table, i));
OperationResponse resp = session.apply(createInsert(table, i));
if (mode == SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC) {
assertFalse(resp.hasRowError());
}
}
if (mode == SessionConfiguration.FlushMode.MANUAL_FLUSH) {
List<OperationResponse> responses = session.flush();
for (OperationResponse resp : responses) {
assertFalse(resp.hasRowError());
}
} else if (mode == SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND) {
while (session.hasPendingOperations()) {
Thread.sleep(100);
}
assertEquals(0, session.countPendingErrors());
}
}
}
@Test(timeout = 100000)
public void testBatchWithSameRow() throws Exception {
KuduTable table = client.createTable(tableName, basicSchema, getBasicCreateTableOptions());
KuduSession session = client.newSession();
session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
// Insert 25 rows, one per batch, along with 50 updates for each, and a delete at the end,
// while also clearing the cache between each batch half the time. The delete is added here
// so that a misplaced update would fail if it happens later than its delete.
for (int i = 0; i < 25; i++) {
session.apply(createInsert(table, i));
for (int j = 0; j < 50; j++) {
Update update = table.newUpdate();
PartialRow row = update.getRow();
row.addInt(basicSchema.getColumnByIndex(0).getName(), i);
row.addInt(basicSchema.getColumnByIndex(1).getName(), 1000);
session.apply(update);
}
Delete del = table.newDelete();
PartialRow row = del.getRow();
row.addInt(basicSchema.getColumnByIndex(0).getName(), i);
session.apply(del);
session.flush();
if (i % 2 == 0) {
asyncClient.emptyTabletsCacheForTable(table.getTableId());
}
}
assertEquals(0, countRowsInScan(client.newScannerBuilder(table).build()));
}
@Test(timeout = 100000)
public void testDeleteWithFullRow() throws Exception {
KuduTable table = client.createTable(tableName, basicSchema, getBasicCreateTableOptions());
KuduSession session = client.newSession();
session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
List<PartialRow> rows = new ArrayList<>();
for (int i = 0; i < 25; i++) {
Insert insert = createInsert(table, i);
rows.add(insert.getRow());
session.apply(insert);
}
session.flush();
for (PartialRow row : rows) {
Operation del;
if (row.getInt(0) % 2 == 0) {
del = table.newDelete();
} else {
del = table.newDeleteIgnore();
}
del.setRow(row);
session.apply(del);
}
session.flush();
assertEquals(0, session.countPendingErrors());
assertEquals(0, countRowsInScan(client.newScannerBuilder(table).build()));
}
/** Regression test for KUDU-3198. Delete with full row from a 64-column table. */
@Test(timeout = 100000)
public void testDeleteWithFullRowFrom64ColumnTable() throws Exception {
ArrayList<ColumnSchema> columns = new ArrayList<>(64);
columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build());
for (int i = 1; i < 64; i++) {
columns.add(new ColumnSchema.ColumnSchemaBuilder("column_" + i, Type.STRING)
.nullable(true)
.build());
}
Schema schema = new Schema(columns);
KuduTable table = client.createTable(tableName, schema, getBasicCreateTableOptions());
KuduSession session = client.newSession();
session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
// Insert 25 rows and then delete them.
List<PartialRow> rows = new ArrayList<>();
for (int i = 0; i < 25; i++) {
Insert insert = table.newInsert();
PartialRow row = insert.getRow();
row.addInt(0, 1);
for (int j = 1; j < 64; j++) {
if (j % 2 == 0) {
row.setNull(j);
} else {
row.addString(j, "val_" + j);
}
}
rows.add(row);
session.apply(insert);
}
session.flush();
for (PartialRow row : rows) {
Operation del;
if (row.getInt(0) % 2 == 0) {
del = table.newDelete();
} else {
del = table.newDeleteIgnore();
}
del.setRow(row);
session.apply(del);
}
session.flush();
assertEquals(0, session.countPendingErrors());
assertEquals(0, countRowsInScan(client.newScannerBuilder(table).build()));
}
/**
* Regression test for KUDU-1402. Calls to session.flush() should return an empty list
* instead of null.
* @throws Exception
*/
@Test(timeout = 100000)
public void testEmptyFlush() throws Exception {
KuduSession session = client.newSession();
session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
List<OperationResponse> result = session.flush();
assertNotNull(result);
assertTrue(result.isEmpty());
}
/**
* Regression test for KUDU-1226. Calls to session.flush() concurrent with AUTO_FLUSH_BACKGROUND
* can end up giving ConvertBatchToListOfResponsesCB a list with nulls if a tablet was already
* flushed. Only happens with multiple tablets.
*/
@Test(timeout = 100000)
public void testConcurrentFlushes() throws Exception {
CreateTableOptions builder = getBasicCreateTableOptions();
int numTablets = 4;
int numRowsPerTablet = 100;
// Create a 4 tablets table split on 1000, 2000, and 3000.
for (int i = 1; i < numTablets; i++) {
PartialRow split = basicSchema.newPartialRow();
split.addInt(0, i * numRowsPerTablet);
builder.addSplitRow(split);
}
KuduTable table = client.createTable(tableName, basicSchema, builder);
// Configure the session to background flush as often as it can (every 1ms).
KuduSession session = client.newSession();
session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
session.setFlushInterval(1);
// Fill each tablet in parallel 1 by 1 then flush. Without the fix this would quickly get an
// NPE.
for (int i = 0; i < numRowsPerTablet; i++) {
for (int j = 0; j < numTablets; j++) {
session.apply(createInsert(table, i + (numRowsPerTablet * j)));
}
session.flush();
}
}
@Test(timeout = 10000)
public void testOverWritingValues() throws Exception {
final KuduTable table =
client.createTable(tableName, basicSchema, getBasicCreateTableOptions());
final KuduSession session = client.newSession();
Insert insert = createInsert(table, 0);
PartialRow row = insert.getRow();
// Overwrite all the normal columns.
int magicNumber = 9999;
row.addInt(1, magicNumber);
row.addInt(2, magicNumber);
row.addBoolean(4, false);
// Spam the string column since it's backed by an array.
for (int i = 0; i <= magicNumber; i++) {
row.addString(3, i + "");
}
// We're supposed to keep a constant size.
assertEquals(5, row.getVarLengthData().size());
session.apply(insert);
KuduScanner scanner = client.newScannerBuilder(table).build();
RowResult rr = scanner.nextRows().next();
assertEquals(magicNumber, rr.getInt(1));
assertEquals(magicNumber, rr.getInt(2));
assertEquals(magicNumber + "", rr.getString(3));
assertEquals(false, rr.getBoolean(4));
// Test setting a value post-apply.
try {
row.addInt(1, 0);
fail("Row should be frozen and throw");
} catch (IllegalStateException ex) {
// Ok.
}
}
@Test(timeout = 10000)
public void testUpsert() throws Exception {
KuduTable table = client.createTable(tableName, basicSchema, getBasicCreateTableOptions());
KuduSession session = client.newSession();
// Test an Upsert that acts as an Insert.
assertFalse(session.apply(createUpsert(table, 1, 1, false)).hasRowError());
List<String> rowStrings = scanTableToStrings(table);
assertEquals(1, rowStrings.size());
assertEquals(
"INT32 key=1, INT32 column1_i=1, INT32 column2_i=3, " +
"STRING column3_s=a string, BOOL column4_b=true",
rowStrings.get(0));
// Test an Upsert that acts as an Update.
assertFalse(session.apply(createUpsert(table, 1, 2, false)).hasRowError());
rowStrings = scanTableToStrings(table);
assertEquals(
"INT32 key=1, INT32 column1_i=2, INT32 column2_i=3, " +
"STRING column3_s=a string, BOOL column4_b=true",
rowStrings.get(0));
}
@Test(timeout = 10000)
public void testInsertIgnoreAfterInsertHasNoRowError() throws Exception {
KuduTable table = client.createTable(tableName, basicSchema, getBasicCreateTableOptions());
KuduSession session = client.newSession();
session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
session.apply(createInsert(table, 1));
session.apply(createUpsert(table, 1, 1, false));
session.apply(createInsertIgnore(table, 1));
List<OperationResponse> results = session.flush();
for (OperationResponse result : results) {
assertFalse(result.toString(), result.hasRowError());
}
List<String> rowStrings = scanTableToStrings(table);
assertEquals(1, rowStrings.size());
assertEquals(
"INT32 key=1, INT32 column1_i=1, INT32 column2_i=3, " +
"STRING column3_s=a string, BOOL column4_b=true",
rowStrings.get(0));
}
@Test(timeout = 10000)
public void testInsertAfterInsertIgnoreHasRowError() throws Exception {
KuduTable table = client.createTable(tableName, basicSchema, getBasicCreateTableOptions());
KuduSession session = client.newSession();
session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
session.apply(createInsertIgnore(table, 1));
session.apply(createInsert(table, 1));
List<OperationResponse> results = session.flush();
assertFalse(results.get(0).toString(), results.get(0).hasRowError());
assertTrue(results.get(1).toString(), results.get(1).hasRowError());
assertTrue(results.get(1).getRowError().getErrorStatus().isAlreadyPresent());
List<String> rowStrings = scanTableToStrings(table);
assertEquals(1, rowStrings.size());
assertEquals(
"INT32 key=1, INT32 column1_i=2, INT32 column2_i=3, " +
"STRING column3_s=a string, BOOL column4_b=true",
rowStrings.get(0));
}
@Test(timeout = 10000)
public void testInsertIgnore() throws Exception {
KuduTable table = client.createTable(tableName, basicSchema, getBasicCreateTableOptions());
KuduSession session = client.newSession();
// Test insert ignore implements normal insert.
assertFalse(session.apply(createInsertIgnore(table, 1)).hasRowError());
List<String> rowStrings = scanTableToStrings(table);
assertEquals(
"INT32 key=1, INT32 column1_i=2, INT32 column2_i=3, " +
"STRING column3_s=a string, BOOL column4_b=true",
rowStrings.get(0));
// Test insert ignore does not return a row error.
assertFalse(session.apply(createInsertIgnore(table, 1)).hasRowError());
rowStrings = scanTableToStrings(table);
assertEquals(
"INT32 key=1, INT32 column1_i=2, INT32 column2_i=3, " +
"STRING column3_s=a string, BOOL column4_b=true",
rowStrings.get(0));
}
@Test(timeout = 10000)
public void testUpdateIgnore() throws Exception {
KuduTable table = client.createTable(tableName, basicSchema, getBasicCreateTableOptions());
KuduSession session = client.newSession();
// Test update ignore does not return a row error.
assertFalse(session.apply(createUpdateIgnore(table, 1, 1, false)).hasRowError());
assertEquals(0, scanTableToStrings(table).size());
assertFalse(session.apply(createInsert(table, 1)).hasRowError());
assertEquals(1, scanTableToStrings(table).size());
// Test update ignore implements normal update.
assertFalse(session.apply(createUpdateIgnore(table, 1, 2, false)).hasRowError());
List<String> rowStrings = scanTableToStrings(table);
assertEquals(1, rowStrings.size());
assertEquals(
"INT32 key=1, INT32 column1_i=2, INT32 column2_i=3, " +
"STRING column3_s=a string, BOOL column4_b=true",
rowStrings.get(0));
}
@Test(timeout = 10000)
public void testDeleteIgnore() throws Exception {
KuduTable table = client.createTable(tableName, basicSchema, getBasicCreateTableOptions());
KuduSession session = client.newSession();
// Test delete ignore does not return a row error.
assertFalse(session.apply(createDeleteIgnore(table, 1)).hasRowError());
assertFalse(session.apply(createInsert(table, 1)).hasRowError());
assertEquals(1, scanTableToStrings(table).size());
// Test delete ignore implements normal delete.
assertFalse(session.apply(createDeleteIgnore(table, 1)).hasRowError());
assertEquals(0, scanTableToStrings(table).size());
}
@Test(timeout = 10000)
public void testInsertManualFlushNonCoveredRange() throws Exception {
CreateTableOptions createOptions = getBasicTableOptionsWithNonCoveredRange();
createOptions.setNumReplicas(1);
client.createTable(tableName, basicSchema, createOptions);
KuduTable table = client.openTable(tableName);
KuduSession session = client.newSession();
session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
// Insert in reverse sorted order so that more table location lookups occur
// (the extra results in table location lookups always occur past the inserted key).
List<Integer> nonCoveredKeys = ImmutableList.of(350, 300, 199, 150, 100, -1, -50);
for (int key : nonCoveredKeys) {
assertNull(session.apply(createBasicSchemaInsert(table, key)));
}
List<OperationResponse> results = session.flush();
assertEquals(nonCoveredKeys.size(), results.size());
for (OperationResponse result : results) {
assertTrue(result.hasRowError());
assertTrue(result.getRowError().getErrorStatus().isNotFound());
}
// Insert a batch of some valid and some invalid.
for (int key = 90; key < 110; key++) {
session.apply(createBasicSchemaInsert(table, key));
}
results = session.flush();
int failures = 0;
for (OperationResponse result : results) {
if (result.hasRowError()) {
failures++;
assertTrue(result.getRowError().getErrorStatus().isNotFound());
}
}
assertEquals(10, failures);
}
@Test(timeout = 10000)
public void testInsertManualFlushResponseOrder() throws Exception {
CreateTableOptions createOptions = getBasicTableOptionsWithNonCoveredRange();
createOptions.setNumReplicas(1);
client.createTable(tableName, basicSchema, createOptions);
KuduTable table = client.openTable(tableName);
KuduSession session = client.newSession();
session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
// Insert a batch of some valid and some invalid.
for (int i = 0; i < 10; i++) {
assertNull(session.apply(createBasicSchemaInsert(table, 100 + i * 10)));
assertNull(session.apply(createBasicSchemaInsert(table, 200 + i * 10)));
}
List<OperationResponse> results = session.flush();
assertEquals(20, results.size());
for (int i = 0; i < 20; i++) {
OperationResponse result = results.get(i);
if (i % 2 == 0) {
assertTrue(result.hasRowError());
assertTrue(result.getRowError().getErrorStatus().isNotFound());
} else {
assertFalse(result.hasRowError());
}
}
}
@Test(timeout = 10000)
public void testNonCoveredRangeException() throws Exception {
CreateTableOptions createOptions = getBasicTableOptionsWithNonCoveredRange();
createOptions.setNumReplicas(1);
client.createTable(tableName, basicSchema, createOptions);
KuduTable table = client.openTable(tableName);
Insert insert = createInsert(table, 150);
//AUTO_FLUSH_SYNC case
KuduSession session = client.newSession();
session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
OperationResponse apply = session.apply(insert);
assertTrue(apply.hasRowError());
System.err.println(apply.getRowError().getErrorStatus().getMessage());
assertTrue(apply.getRowError().getErrorStatus().getMessage().contains(
"does not exist in table: TestKuduSession"));
//AUTO_FLUSH_BACKGROUND case
session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
assertEquals(null, session.apply(insert));
List<OperationResponse> autoFlushResult = session.flush();
assertEquals(1, autoFlushResult.size());
OperationResponse responseAuto = autoFlushResult.get(0);
assertTrue(responseAuto.hasRowError());
assertTrue(responseAuto.getRowError().getErrorStatus().getMessage().contains(
"does not exist in table: TestKuduSession"));
//MANUAL_FLUSH case
session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
assertEquals(null, session.apply(insert));
List<OperationResponse> manualFlushResult = session.flush();
assertEquals(1, manualFlushResult.size());
OperationResponse responseManual = manualFlushResult.get(0);
assertTrue(responseManual.hasRowError());
assertTrue(responseManual.getRowError().getErrorStatus().getMessage().contains(
"does not exist in table: TestKuduSession"));
}
@Test(timeout = 10000)
public void testInsertAutoFlushSyncNonCoveredRange() throws Exception {
CreateTableOptions createOptions = getBasicTableOptionsWithNonCoveredRange();
createOptions.setNumReplicas(1);
client.createTable(tableName, basicSchema, createOptions);
KuduTable table = client.openTable(tableName);
KuduSession session = client.newSession();
session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
List<Integer> nonCoveredKeys = ImmutableList.of(350, 300, 199, 150, 100, -1, -50);
for (int key : nonCoveredKeys) {
OperationResponse response = session.apply(createBasicSchemaInsert(table, key));
assertTrue(response.hasRowError());
assertTrue(response.getRowError().getErrorStatus().isNotFound());
}
}
@Test(timeout = 10000)
public void testInsertAutoFlushBackgroundNonCoveredRange() throws Exception {
CreateTableOptions createOptions = getBasicTableOptionsWithNonCoveredRange();
createOptions.setNumReplicas(1);
client.createTable(tableName, basicSchema, createOptions);
KuduTable table = client.openTable(tableName);
AsyncKuduSession session = asyncClient.newSession();
session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
List<Integer> nonCoveredKeys = ImmutableList.of(350, 300, 199, 150, 100, -1, -50);
for (int key : nonCoveredKeys) {
OperationResponse result = session.apply(createBasicSchemaInsert(table, key)).join(5000);
assertTrue(result.hasRowError());
assertTrue(result.getRowError().getErrorStatus().isNotFound());
}
RowErrorsAndOverflowStatus errors = session.getPendingErrors();
assertEquals(nonCoveredKeys.size(), errors.getRowErrors().length);
for (RowError error : errors.getRowErrors()) {
assertTrue(error.getErrorStatus().isNotFound());
}
// Insert a batch of some valid and some invalid.
for (int key = 90; key < 110; key++) {
session.apply(createBasicSchemaInsert(table, key));
}
session.flush().join(5000);
errors = session.getPendingErrors();
assertEquals(10, errors.getRowErrors().length);
for (RowError error : errors.getRowErrors()) {
assertTrue(error.getErrorStatus().isNotFound());
}
}
private Insert createInsert(KuduTable table, int key) {
return createBasicSchemaInsert(table, key);
}
private Upsert createUpsert(KuduTable table, int key, int secondVal, boolean hasNull) {
Upsert upsert = table.newUpsert();
populateUpdateRow(upsert.getRow(), key, secondVal, hasNull);
return upsert;
}
private UpdateIgnore createUpdateIgnore(KuduTable table, int key, int secondVal,
boolean hasNull) {
UpdateIgnore updateIgnore = table.newUpdateIgnore();
populateUpdateRow(updateIgnore.getRow(), key, secondVal, hasNull);
return updateIgnore;
}
private void populateUpdateRow(PartialRow row, int key, int secondVal, boolean hasNull) {
row.addInt(0, key);
row.addInt(1, secondVal);
row.addInt(2, 3);
if (hasNull) {
row.setNull(3);
} else {
row.addString(3, "a string");
}
row.addBoolean(4, true);
}
private Delete createDelete(KuduTable table, int key) {
Delete delete = table.newDelete();
PartialRow row = delete.getRow();
row.addInt(0, key);
return delete;
}
private DeleteIgnore createDeleteIgnore(KuduTable table, int key) {
DeleteIgnore deleteIgnore = table.newDeleteIgnore();
PartialRow row = deleteIgnore.getRow();
row.addInt(0, key);
return deleteIgnore;
}
protected InsertIgnore createInsertIgnore(KuduTable table, int key) {
InsertIgnore insertIgnore = table.newInsertIgnore();
PartialRow row = insertIgnore.getRow();
row.addInt(0, key);
row.addInt(1, 2);
row.addInt(2, 3);
row.addString(3, "a string");
row.addBoolean(4, true);
return insertIgnore;
}
}