| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pinot.integration.tests; |
| |
| import java.io.BufferedReader; |
| import java.io.File; |
| import java.io.InputStream; |
| import java.io.InputStreamReader; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import javax.annotation.Nullable; |
| import org.apache.avro.file.DataFileWriter; |
| import org.apache.avro.generic.GenericData; |
| import org.apache.avro.generic.GenericDatumWriter; |
| import org.apache.commons.io.FileUtils; |
| import org.apache.pinot.spi.config.table.FieldConfig; |
| import org.apache.pinot.spi.data.FieldSpec; |
| import org.apache.pinot.spi.data.Schema; |
| import org.apache.pinot.util.TestUtils; |
| import org.testng.annotations.AfterClass; |
| import org.testng.annotations.BeforeClass; |
| import org.testng.annotations.Test; |
| |
| import static org.testng.Assert.assertEquals; |
| import static org.testng.Assert.assertNotNull; |
| import static org.testng.Assert.assertTrue; |
| import static org.testng.AssertJUnit.fail; |
| |
| |
| /** |
| * Cluster integration test for near realtime text search (lucene) and realtime text search (native). |
| */ |
| public class TextIndicesRealtimeClusterIntegrationTest extends BaseClusterIntegrationTest { |
| private static final String TEXT_COLUMN_NAME = "skills"; |
| private static final String TEXT_COLUMN_NAME_NATIVE = "skills_native"; |
| private static final String TIME_COLUMN_NAME = "millisSinceEpoch"; |
| private static final int NUM_SKILLS = 24; |
| private static final int NUM_MATCHING_SKILLS = 4; |
| private static final int NUM_RECORDS = NUM_SKILLS * 1000; |
| private static final int NUM_MATCHING_RECORDS = NUM_MATCHING_SKILLS * 1000; |
| private static final int NUM_MATCHING_RECORDS_NATIVE = 7000; |
| |
| private static final String TEST_TEXT_COLUMN_QUERY = |
| "SELECT COUNT(*) FROM mytable WHERE TEXT_MATCH(skills, '\"machine learning\" AND spark')"; |
| |
| private static final String TEST_TEXT_COLUMN_QUERY_NATIVE = |
| "SELECT COUNT(*) FROM mytable WHERE TEXT_CONTAINS(skills_native, 'm.*') AND TEXT_CONTAINS(skills_native, " |
| + "'spark')"; |
| |
| @Override |
| public String getTimeColumnName() { |
| return TIME_COLUMN_NAME; |
| } |
| |
| // TODO: Support Lucene index on HLC consuming segments |
| @Override |
| protected boolean useLlc() { |
| return true; |
| } |
| |
| @Nullable |
| @Override |
| protected String getSortedColumn() { |
| return null; |
| } |
| |
| @Nullable |
| @Override |
| protected List<String> getInvertedIndexColumns() { |
| return Collections.singletonList(TEXT_COLUMN_NAME_NATIVE); |
| } |
| |
| @Override |
| protected List<String> getNoDictionaryColumns() { |
| return Collections.singletonList(TEXT_COLUMN_NAME); |
| } |
| |
| @Nullable |
| @Override |
| protected List<String> getRangeIndexColumns() { |
| return null; |
| } |
| |
| @Nullable |
| @Override |
| protected List<String> getBloomFilterColumns() { |
| return null; |
| } |
| |
| @Override |
| protected List<FieldConfig> getFieldConfigs() { |
| Map<String, String> propertiesMap = new HashMap<>(); |
| propertiesMap.put(FieldConfig.TEXT_FST_TYPE, FieldConfig.TEXT_NATIVE_FST_LITERAL); |
| |
| return Arrays.asList( |
| new FieldConfig(TEXT_COLUMN_NAME, FieldConfig.EncodingType.RAW, FieldConfig.IndexType.TEXT, null, null), |
| new FieldConfig(TEXT_COLUMN_NAME_NATIVE, FieldConfig.EncodingType.RAW, FieldConfig.IndexType.TEXT, null, |
| propertiesMap)); |
| } |
| |
| @BeforeClass |
| public void setUp() |
| throws Exception { |
| TestUtils.ensureDirectoriesExistAndEmpty(_tempDir); |
| |
| // Start the Pinot cluster |
| startZk(); |
| startController(); |
| startBroker(); |
| startServer(); |
| |
| // Start Kafka |
| startKafka(); |
| |
| // Create the Avro file |
| File avroFile = createAvroFile(); |
| |
| // Create and upload the schema and table config |
| Schema schema = new Schema.SchemaBuilder().setSchemaName(DEFAULT_SCHEMA_NAME) |
| .addSingleValueDimension(TEXT_COLUMN_NAME, FieldSpec.DataType.STRING) |
| .addSingleValueDimension(TEXT_COLUMN_NAME_NATIVE, FieldSpec.DataType.STRING) |
| .addDateTime(TIME_COLUMN_NAME, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build(); |
| addSchema(schema); |
| addTableConfig(createRealtimeTableConfig(avroFile)); |
| |
| // Push data into Kafka |
| pushAvroIntoKafka(Collections.singletonList(avroFile)); |
| |
| // Wait until the table is queryable |
| TestUtils.waitForCondition(aVoid -> { |
| try { |
| return getCurrentCountStarResult() >= 0; |
| } catch (Exception e) { |
| return null; |
| } |
| }, 10_000L, "Failed to get COUNT(*) result"); |
| } |
| |
| @AfterClass |
| public void tearDown() |
| throws Exception { |
| dropRealtimeTable(getTableName()); |
| stopServer(); |
| stopBroker(); |
| stopController(); |
| stopKafka(); |
| stopZk(); |
| FileUtils.deleteDirectory(_tempDir); |
| } |
| |
| private File createAvroFile() |
| throws Exception { |
| // Read all skills from the skill file |
| InputStream inputStream = getClass().getClassLoader().getResourceAsStream("data/text_search_data/skills.txt"); |
| assertNotNull(inputStream); |
| List<String> skills = new ArrayList<>(NUM_SKILLS); |
| try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) { |
| String line; |
| while ((line = reader.readLine()) != null) { |
| skills.add(line); |
| } |
| } |
| assertEquals(skills.size(), NUM_SKILLS); |
| |
| File avroFile = new File(_tempDir, "data.avro"); |
| org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); |
| avroSchema.setFields(Arrays.asList(new org.apache.avro.Schema.Field(TEXT_COLUMN_NAME, |
| org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING), null, null), |
| new org.apache.avro.Schema.Field(TEXT_COLUMN_NAME_NATIVE, |
| org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING), null, null), |
| new org.apache.avro.Schema.Field(TIME_COLUMN_NAME, |
| org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG), null, null))); |
| try (DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) { |
| fileWriter.create(avroSchema, avroFile); |
| for (int i = 0; i < NUM_RECORDS; i++) { |
| GenericData.Record record = new GenericData.Record(avroSchema); |
| record.put(TEXT_COLUMN_NAME, skills.get(i % NUM_SKILLS)); |
| record.put(TEXT_COLUMN_NAME_NATIVE, skills.get(i % NUM_SKILLS)); |
| record.put(TIME_COLUMN_NAME, System.currentTimeMillis()); |
| fileWriter.append(record); |
| } |
| } |
| return avroFile; |
| } |
| |
| @Test |
| public void testTextSearchCountQuery() |
| throws Exception { |
| // Keep posting queries until all records are consumed |
| long previousResult = 0; |
| while (getCurrentCountStarResult() < NUM_RECORDS) { |
| long result = getTextColumnQueryResult(TEST_TEXT_COLUMN_QUERY); |
| assertTrue(result >= previousResult); |
| previousResult = result; |
| Thread.sleep(100); |
| } |
| |
| //Lucene index on consuming segments to update the latest records |
| TestUtils.waitForCondition(aVoid -> { |
| try { |
| return getTextColumnQueryResult(TEST_TEXT_COLUMN_QUERY) == NUM_MATCHING_RECORDS; |
| } catch (Exception e) { |
| fail("Caught exception while getting text column query result"); |
| return false; |
| } |
| }, 10_000L, "Failed to reach expected number of matching records"); |
| } |
| |
| @Test |
| public void testTextSearchCountQueryNative() |
| throws Exception { |
| // Keep posting queries until all records are consumed |
| long previousResult = 0; |
| while (getCurrentCountStarResult() < NUM_RECORDS) { |
| long result = getTextColumnQueryResult(TEST_TEXT_COLUMN_QUERY_NATIVE); |
| assertTrue(result >= previousResult); |
| previousResult = result; |
| Thread.sleep(100); |
| } |
| |
| assertTrue(getTextColumnQueryResult(TEST_TEXT_COLUMN_QUERY_NATIVE) == NUM_MATCHING_RECORDS_NATIVE); |
| } |
| |
| private long getTextColumnQueryResult(String query) |
| throws Exception { |
| return postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(); |
| } |
| } |