blob: 5b0171718b668bcafd37398d7d797bca9bccca5e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.integration.tests;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.datasketches.theta.UpdateSketch;
import org.apache.datasketches.theta.UpdateSketchBuilder;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
public class ThetaSketchIntegrationTest extends BaseClusterIntegrationTest {
private static final String DIM_NAME = "dimName";
private static final String DIM_VALUE = "dimValue";
private static final String SHARD_ID = "shardId";
private static final String THETA_SKETCH = "thetaSketchCol";
@BeforeClass
public void setup()
throws Exception {
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
// Start the Pinot cluster
startZk();
startController();
startBroker();
startServer();
// create & upload schema AND table config
Schema schema = new Schema.SchemaBuilder().setSchemaName(DEFAULT_SCHEMA_NAME)
.addSingleValueDimension(DIM_NAME, FieldSpec.DataType.STRING)
.addSingleValueDimension(DIM_VALUE, FieldSpec.DataType.STRING)
.addSingleValueDimension(SHARD_ID, FieldSpec.DataType.INT)
.addSingleValueDimension(THETA_SKETCH, FieldSpec.DataType.BYTES).build();
addSchema(schema);
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(DEFAULT_TABLE_NAME).build();
addTableConfig(tableConfig);
// create & upload segments
File avroFile = createAvroFile();
ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFile, tableConfig, schema, 0, _segmentDir, _tarDir);
uploadSegments(DEFAULT_TABLE_NAME, _tarDir);
waitForAllDocsLoaded(60_000);
}
@Override
protected long getCountStarResult() {
/*
Uploaded table content:
row# dimName dimValue shardId thetaSketchCol
---- ======= ======== ======= ==============
1 Course Math 1 ...
2 Course History 1 ...
3 Course Biology 1 ...
4 Gender Female 1 ...
5 Gender Male 1 ...
6 Course Math 2 ...
7 Course History 2 ...
8 Course Biology 2 ...
9 Gender Female 2 ...
10 Gender Male 2 ...
*/
return 10;
}
@Test
public void testThetaSketchQuery()
throws Exception {
/*
Original data:
Gender Course Shard#1 Shard#2
-------- ------- ------- -------
Female Math 50 110
Female History 60 120
Female Biology 70 130
Male Math 80 140
Male History 90 150
Male Biology 100 160
*/
// gender = female
{
String query = "select distinctCountThetaSketch(thetaSketchCol) from " + DEFAULT_TABLE_NAME
+ " where dimName = 'gender' and dimValue = 'Female'";
int expected = 50 + 60 + 70 + 110 + 120 + 130;
runAndAssert(query, expected);
query = "select distinctCountThetaSketch(thetaSketchCol, '', 'dimName = ''gender'' and dimValue = ''Female''', "
+ "'$1') from " + DEFAULT_TABLE_NAME;
runAndAssert(query, expected);
query = "select distinctCountThetaSketch(thetaSketchCol, '', "
+ "'dimName = ''gender''', 'dimValue = ''Female''', 'SET_INTERSECT($1, $2)') from " + DEFAULT_TABLE_NAME;
runAndAssert(query, expected);
}
// gender = male
{
String query = "select distinctCountThetaSketch(thetaSketchCol) from " + DEFAULT_TABLE_NAME
+ " where dimName = 'gender' and dimValue = 'Male'";
int expected = 80 + 90 + 100 + 140 + 150 + 160;
runAndAssert(query, expected);
query =
"select distinctCountThetaSketch(thetaSketchCol, '', 'dimName = ''gender'' and dimValue = ''Male''', '$1') "
+ "from " + DEFAULT_TABLE_NAME;
runAndAssert(query, expected);
query = "select distinctCountThetaSketch(thetaSketchCol, '', "
+ "'dimName = ''gender''', 'dimValue = ''Male''', 'SET_INTERSECT($1, $2)') from " + DEFAULT_TABLE_NAME;
runAndAssert(query, expected);
}
// course = math
{
String query = "select distinctCountThetaSketch(thetaSketchCol) from " + DEFAULT_TABLE_NAME
+ " where dimName = 'course' AND dimValue = 'Math'";
int expected = 50 + 80 + 110 + 140;
runAndAssert(query, expected);
query =
"select distinctCountThetaSketch(thetaSketchCol, '', 'dimName = ''course'' and dimValue = ''Math''', '$1') "
+ "from " + DEFAULT_TABLE_NAME;
runAndAssert(query, expected);
query = "select distinctCountThetaSketch(thetaSketchCol, '', "
+ "'dimName = ''course''', 'dimValue = ''Math''', 'SET_INTERSECT($1, $2)') from " + DEFAULT_TABLE_NAME;
runAndAssert(query, expected);
}
// gender = female INTERSECT course = math
{
String query = "select distinctCountThetaSketch(thetaSketchCol, '', "
+ "'dimName = ''gender'' and dimValue = ''Female''', 'dimName = ''course'' and dimValue = ''Math''', "
+ "'SET_INTERSECT($1, $2)') from " + DEFAULT_TABLE_NAME;
int expected = 50 + 110;
runAndAssert(query, expected);
query = "select distinctCountThetaSketch(thetaSketchCol, '', "
+ "'dimName = ''gender''', 'dimValue = ''Female''', 'dimName = ''course''', 'dimValue = ''Math''', "
+ "'SET_INTERSECT($1, $2, $3, $4)') from " + DEFAULT_TABLE_NAME;
runAndAssert(query, expected);
query = "select distinctCountThetaSketch(thetaSketchCol, '', "
+ "'dimName = ''gender''', 'dimValue = ''Female''', 'dimName = ''course''', 'dimValue = ''Math''', "
+ "'SET_INTERSECT(SET_INTERSECT($1, $2), SET_INTERSECT($3, $4))') from " + DEFAULT_TABLE_NAME;
runAndAssert(query, expected);
}
// gender = male UNION course = biology
{
String query = "select distinctCountThetaSketch(thetaSketchCol, '', "
+ "'dimName = ''gender'' and dimValue = ''Male''', 'dimName = ''course'' and dimValue = ''Biology''', "
+ "'SET_UNION($1, $2)') from " + DEFAULT_TABLE_NAME;
int expected = 70 + 80 + 90 + 100 + 130 + 140 + 150 + 160;
runAndAssert(query, expected);
query = "select distinctCountThetaSketch(thetaSketchCol, '', "
+ "'dimName = ''gender''', 'dimValue = ''Male''', 'dimName = ''course''', 'dimValue = ''Biology''', "
+ "'SET_UNION(SET_INTERSECT($1, $2), SET_INTERSECT($3, $4))') from " + DEFAULT_TABLE_NAME;
runAndAssert(query, expected);
}
// group by gender
{
String query = "select dimValue, distinctCountThetaSketch(thetaSketchCol) from " + DEFAULT_TABLE_NAME
+ " where dimName = 'gender' group by dimValue";
ImmutableMap<String, Integer> expected =
ImmutableMap.of("Female", 50 + 60 + 70 + 110 + 120 + 130, "Male", 80 + 90 + 100 + 140 + 150 + 160);
runAndAssert(query, expected);
}
}
private void runAndAssert(String query, int expected)
throws Exception {
JsonNode jsonNode = postQuery(query, _brokerBaseApiUrl);
int actual = Integer.parseInt(jsonNode.get("resultTable").get("rows").get(0).get(0).asText());
assertEquals(actual, expected);
}
private void runAndAssert(String query, Map<String, Integer> expectedGroupToValueMap)
throws Exception {
Map<String, Integer> actualGroupToValueMap = new HashMap<>();
JsonNode jsonNode = postQuery(query, _brokerBaseApiUrl);
jsonNode.get("resultTable").get("rows").forEach(node -> {
String group = node.get(0).textValue();
int value = node.get(1).intValue();
actualGroupToValueMap.put(group, value);
});
assertEquals(actualGroupToValueMap, expectedGroupToValueMap);
}
private File createAvroFile()
throws IOException {
// create avro schema
org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false);
avroSchema.setFields(ImmutableList.of(new Field(DIM_NAME, org.apache.avro.Schema.create(Type.STRING), null, null),
new Field(DIM_VALUE, org.apache.avro.Schema.create(Type.STRING), null, null),
new Field(SHARD_ID, org.apache.avro.Schema.create(Type.INT), null, null),
new Field(THETA_SKETCH, org.apache.avro.Schema.create(Type.BYTES), null, null)));
// create avro file
File avroFile = new File(_tempDir, "data.avro");
try (DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
fileWriter.create(avroSchema, avroFile);
int studentId = 0;
int cardinality = 50;
for (int shardId = 0; shardId < 2; shardId++) {
// populate student-course data (studentId, gender, course) for this shard id
String[] allGenders = {"Female", "Male"};
String[] allCountries = {"Math", "History", "Biology"};
Map<Pair<String, String>, List<Integer>> genderCourseToStudentIds = new HashMap<>();
for (String gender : allGenders) {
for (String course : allCountries) {
List<Integer> studentIds =
genderCourseToStudentIds.computeIfAbsent(ImmutablePair.of(gender, course), key -> new ArrayList<>());
for (int i = 0; i < cardinality; i++) {
studentIds.add(studentId++);
}
cardinality += 10;
}
}
// [gender dimension] calculate theta sketches & add them to avro file
for (String gender : allGenders) {
// calculate theta sketch
UpdateSketch sketch = new UpdateSketchBuilder().build();
genderCourseToStudentIds.forEach((genderCourse, studentIds) -> {
if (gender.equals(genderCourse.getLeft())) {
studentIds.forEach(sketch::update);
}
});
// create avro record
GenericData.Record record = new GenericData.Record(avroSchema);
record.put(DIM_NAME, "gender");
record.put(DIM_VALUE, gender);
record.put(SHARD_ID, shardId);
record.put(THETA_SKETCH, ByteBuffer.wrap(sketch.compact().toByteArray()));
// add avro record to file
fileWriter.append(record);
}
// [course dimension] calculate theta sketches & add them to avro file
for (String course : allCountries) {
// calculate theta sketch
UpdateSketch sketch = new UpdateSketchBuilder().build();
genderCourseToStudentIds.forEach((genderCourse, studentIds) -> {
if (course.equals(genderCourse.getRight())) {
studentIds.forEach(sketch::update);
}
});
// create avro record
GenericData.Record record = new GenericData.Record(avroSchema);
record.put(DIM_NAME, "course");
record.put(DIM_VALUE, course);
record.put(SHARD_ID, shardId);
record.put(THETA_SKETCH, ByteBuffer.wrap(sketch.compact().toByteArray()));
// add avro record to file
fileWriter.append(record);
}
}
}
return avroFile;
}
@AfterClass
public void tearDown()
throws IOException {
dropOfflineTable(DEFAULT_TABLE_NAME);
stopServer();
stopBroker();
stopController();
stopZk();
FileUtils.deleteDirectory(_tempDir);
}
}