blob: 72bf46fdce00e372a98259c810d3c3555edf145d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.app.function;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.input.record.CharArrayRecord;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import com.teradata.tpcds.Results;
import com.teradata.tpcds.Session;
import com.teradata.tpcds.Table;
/**
* Each partition will be running a TPCDS data generator reader instance. The data generator will parallelize its work
* based on the number of partitions available. The reader is passed the parallelism level based on the number of
* partition instances.
*
* The function automatically handles generating the data for a single specified table or for all the tables.
*/
public class TPCDSDataGeneratorReader extends FunctionReader {
private final FunctionIdentifier functionIdentifier;
// Table name will be added to each generated record
private final static String TABLE_NAME_FIELD_NAME = "table_name";
// When generating the values, a list is created, at index 0, all the values for the parent record exist, if a
// child record is created, it is at index 1 in the list
private static final int PARENT_VALUES_INDEX = 0;
private static final int CHILD_VALUES_INDEX = 1;
// Table members
private final List<Table> selectedTables;
private final StringBuilder builder = new StringBuilder();
private final List<Iterator<List<List<String>>>> tableIterators = new ArrayList<>();
private Table currentTable;
private final int tableCount;
private int currentTableIndex;
// Parent tables will generate 2 records, 1 for the main, and another for the child's row, we'll push the parent
// record, and in the next rotation we'll push the child row, then go to next row generation
private String childRow = null;
// This will guide the next() method to know where to get the next record from (Parent or child record)
private boolean getFromParent = true;
private final boolean generateAllTables;
TPCDSDataGeneratorReader(String tableName, double scalingFactor, int parallelism, int partitionNumber,
FunctionIdentifier functionIdentifier) throws HyracksDataException {
this.functionIdentifier = functionIdentifier;
// Note: chunk numbers start with 1, so each chunk will be partition number + 1
// Create the session with the specified properties, the sessions also specifies the chunk to be generated
Session session = Session.getDefaultSession().withScale(scalingFactor).withParallelism(parallelism)
.withChunkNumber(partitionNumber + 1);
// If the tableName is null, then we're generating all the tables
generateAllTables = tableName == null;
// Get the table(s)
selectedTables = getTableFromStringTableName(tableName);
// These variables will monitor and assist with each table's data generation
currentTableIndex = 0;
tableCount = selectedTables.size();
currentTable = selectedTables.get(currentTableIndex);
// Iterators for the tables to generate the data for
for (Table table : selectedTables) {
Results result = Results.constructResults(table, session);
tableIterators.add(result.iterator());
}
}
/**
* Gets the table matching the provided string table name, throws an exception if no table is returned.
*
* @param tableName String table name to search for.
* @return Table if found, throws an exception otherwise.
*/
private List<Table> getTableFromStringTableName(String tableName) throws HyracksDataException {
// Get all the tables
if (generateAllTables) {
// Remove the DBGEN_VERSION table and all children tables, parent tables will generate them
return Table.getBaseTables().stream()
.filter(table -> !table.equals(Table.DBGEN_VERSION) && !table.isChild())
.collect(Collectors.toList());
}
// Search for the table
List<Table> matchedTables = Table.getBaseTables().stream()
.filter(table -> tableName.equalsIgnoreCase(table.getName())).collect(Collectors.toList());
// Ensure the table was found
if (matchedTables.isEmpty()) {
throw new RuntimeDataException(ErrorCode.TPCDS_INVALID_TABLE_NAME, getFunctionIdentifier().getName(),
tableName);
}
return matchedTables;
}
/**
* Gets the function identifier
*
* @return function identifier
*/
private FunctionIdentifier getFunctionIdentifier() {
return functionIdentifier;
}
@Override
public boolean hasNext() {
// Return children generated records of current table
if (generateAllTables && childRow != null) {
getFromParent = false;
return true;
}
// Current table still has more
if (tableIterators.get(currentTableIndex).hasNext()) {
return true;
}
// We went over all the tables
if (currentTableIndex == tableCount - 1) {
return false;
}
// Go to the next table
currentTableIndex++;
currentTable = selectedTables.get(currentTableIndex);
return hasNext();
}
@Override
public IRawRecord<char[]> next() throws IOException, InterruptedException {
CharArrayRecord record = new CharArrayRecord();
if (getFromParent) {
record.append((formatRecord(tableIterators.get(currentTableIndex).next())).toCharArray());
} else {
record.append(childRow.toCharArray());
childRow = null; // Always reset the child row after writing it
getFromParent = true;
}
record.endRecord();
return record;
}
/**
* Builds the string record from the generated values by the data generator. The field name for each value is
* extracted from the table from which the data is being generated.
*
* @param values List containing all the generated column values
*
* @return The built string record from the generated values
*/
private String formatRecord(List<List<String>> values) {
// Clear the builder (This is faster than re-creating the builder each iteration)
builder.setLength(0);
// Construct the record
constructRecord(values.get(PARENT_VALUES_INDEX), currentTable);
// Reference to the parent row to be returned, before resetting the builder again
String parentRow = builder.toString();
// In some cases, the generate generates 2 records, one for current table, and one for child table. If a child
// record is generated, we're gonna add it to a list, and start pushing it once all the parent records
// are done
if (generateAllTables && values.size() > 1) {
builder.setLength(0);
// Construct the record
constructRecord(values.get(CHILD_VALUES_INDEX), currentTable.getChild());
// Add it to the children rows list
childRow = builder.toString();
}
// Return parent row
return parentRow;
}
/**
* Constructs the record with the appropriate data types.
*
* @param values list containing all the generated values for all columns in a string format.
* @param table Table the record is being constructed for
*/
private void constructRecord(List<String> values, Table table) {
// Add the table name to the record
builder.append("{\"").append(TABLE_NAME_FIELD_NAME).append("\":\"").append(table.getName()).append("\"");
// Build the record data
for (int counter = 0; counter < values.size(); counter++) {
// If the value is null, no need to check for the column type
if (values.get(counter) == null) {
builder.append(",\"");
builder.append(table.getColumns()[counter].getName());
builder.append("\":");
builder.append(values.get(counter));
continue;
}
String fieldName = table.getColumns()[counter].getName();
String stringValue = values.get(counter);
// Convert the value to the appropriate type based on the column type
switch (table.getColumns()[counter].getType().getBase()) {
case INTEGER:
builder.append(",\"").append(fieldName).append("\":").append(Integer.valueOf(stringValue));
break;
case DECIMAL:
builder.append(",\"").append(fieldName).append("\":").append(Double.valueOf(stringValue));
break;
// IDENTIFIER type could be any value, so we're taking it as a string
// DATE and TIME are not supported, they are stored as strings and can be modified with date functions
// CHAR and VARCHAR are handled as strings
// any other type (default case) is handled as a string value
case IDENTIFIER:
case DATE:
case TIME:
case CHAR:
case VARCHAR:
default:
builder.append(",\"").append(fieldName).append("\":\"").append(stringValue).append("\"");
break;
}
}
builder.append("}");
}
}