blob: e6f5ff248f6aa5683d092970dabce3b35f430e64 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.integration.tests;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrationTestSet {
private static final int NUM_SERVERS = 2;
private static final String PRIMARY_KEY_COL = "clientId";
private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(DEFAULT_TABLE_NAME);
// Segment 1 contains records of pk value 100000 (partition 0)
private static final String UPLOADED_SEGMENT_1 = "mytable_10027_19736_0 %";
// Segment 2 contains records of pk value 100001 (partition 1)
private static final String UPLOADED_SEGMENT_2 = "mytable_10072_19919_1 %";
// Segment 3 contains records of pk value 100002 (partition 1)
private static final String UPLOADED_SEGMENT_3 = "mytable_10158_19938_2 %";
@BeforeClass
public void setUp()
throws Exception {
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
// Start the Pinot cluster
startZk();
// Start a customized controller with more frequent realtime segment validation
startController();
startBroker();
startServers(NUM_SERVERS);
// Unpack the Avro files
List<File> avroFiles = unpackAvroData(_tempDir);
// Start Kafka and push data into Kafka
startKafka();
pushAvroIntoKafka(avroFiles);
// Create and upload schema and table config
Schema schema = createSchema();
addSchema(schema);
TableConfig tableConfig = createUpsertTableConfig(avroFiles.get(0), PRIMARY_KEY_COL, getNumKafkaPartitions());
addTableConfig(tableConfig);
// Create and upload segments
ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir);
uploadSegments(getTableName(), TableType.REALTIME, _tarDir);
// Wait for all documents loaded
waitForAllDocsLoaded(600_000L);
}
@AfterClass
public void tearDown()
throws IOException {
dropRealtimeTable(getTableName());
stopServer();
stopBroker();
stopController();
stopKafka();
stopZk();
FileUtils.deleteDirectory(_tempDir);
}
@Override
protected String getSchemaFileName() {
return "upsert_table_test.schema";
}
@Override
protected String getSchemaName() {
return "upsertSchema";
}
@Override
protected String getAvroTarFileName() {
return "upsert_test.tar.gz";
}
@Override
protected boolean useLlc() {
return true;
}
@Override
protected String getPartitionColumn() {
return PRIMARY_KEY_COL;
}
@Override
protected long getCountStarResult() {
// Three distinct records are expected with pk values of 100000, 100001, 100002
return 3;
}
@Override
protected void waitForAllDocsLoaded(long timeoutMs)
throws Exception {
TestUtils.waitForCondition(aVoid -> {
try {
return getCurrentCountStarResultWithoutUpsert() == getCountStarResultWithoutUpsert();
} catch (Exception e) {
return null;
}
}, 100L, timeoutMs, "Failed to load all documents");
assertEquals(getCurrentCountStarResult(), getCountStarResult());
}
private long getCurrentCountStarResultWithoutUpsert() {
return getPinotConnection().execute("SELECT COUNT(*) FROM " + getTableName() + " OPTION(skipUpsert=true)")
.getResultSet(0).getLong(0);
}
private long getCountStarResultWithoutUpsert() {
// 3 Avro files, each with 100 documents, one copy from streaming source, one copy from batch source
return 600;
}
@Test
public void testSegmentAssignment()
throws Exception {
verifyIdealState();
// Run the real-time segment validation and check again
_controllerStarter.getRealtimeSegmentValidationManager().run();
verifyIdealState();
assertEquals(getCurrentCountStarResult(), getCountStarResult());
assertEquals(getCurrentCountStarResultWithoutUpsert(), getCountStarResultWithoutUpsert());
// Restart the servers and check again
restartServers();
verifyIdealState();
waitForAllDocsLoaded(600_000L);
}
private void verifyIdealState() {
IdealState idealState = HelixHelper.getTableIdealState(_helixManager, REALTIME_TABLE_NAME);
Map<String, Map<String, String>> segmentAssignment = idealState.getRecord().getMapFields();
assertEquals(segmentAssignment.size(), 5);
String serverForPartition0 = null;
String serverForPartition1 = null;
for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) {
String segmentName = entry.getKey();
Map<String, String> instanceStateMap = entry.getValue();
// Verify that all segments have the correct state
assertEquals(instanceStateMap.size(), 1);
Map.Entry<String, String> instanceIdAndState = instanceStateMap.entrySet().iterator().next();
String state = instanceIdAndState.getValue();
if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) {
assertEquals(state, SegmentStateModel.CONSUMING);
} else {
assertEquals(state, SegmentStateModel.ONLINE);
}
// Verify that all segments of the same partition are mapped to the same server
String instanceId = instanceIdAndState.getKey();
int partitionId = getSegmentPartitionId(segmentName);
if (partitionId == 0) {
if (serverForPartition0 == null) {
serverForPartition0 = instanceId;
} else {
assertEquals(instanceId, serverForPartition0);
}
} else {
assertEquals(partitionId, 1);
if (serverForPartition1 == null) {
serverForPartition1 = instanceId;
} else {
assertEquals(instanceId, serverForPartition1);
}
}
}
}
private static int getSegmentPartitionId(String segmentName) {
switch (segmentName) {
case UPLOADED_SEGMENT_1:
return 0;
case UPLOADED_SEGMENT_2:
case UPLOADED_SEGMENT_3:
return 1;
default:
return new LLCSegmentName(segmentName).getPartitionGroupId();
}
}
}