blob: 66e8a24c9effb8a0ee4eb75b9a8e5763605dd3aa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.enumerable;
import org.apache.calcite.DataContext;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.util.BuiltInMethod;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.compile.ClassBuilder;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.planner.sql.SchemaUtilites;
import org.apache.drill.exec.record.ColumnConverter;
import org.apache.drill.exec.record.ColumnConverterFactory;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.record.metadata.TupleSchema;
import org.codehaus.commons.compiler.CompileException;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.StreamSupport;
/**
* {@link ManagedReader} implementation that compiles and executes specified code,
* calls the method on it for obtaining the values, and reads the results using column converters.
*/
public class EnumerableRecordReader implements ManagedReader<SchemaNegotiator> {
private static final String CLASS_NAME = "Baz";
private final List<SchemaPath> columns;
private final Map<String, Integer> fieldsMap;
private final String code;
private final String schemaPath;
private ColumnConverter converter;
private Iterator<Map<String, Object>> records;
private ResultSetLoader loader;
public EnumerableRecordReader(List<SchemaPath> columns, Map<String, Integer> fieldsMap, String code, String schemaPath) {
this.columns = columns;
this.fieldsMap = fieldsMap;
this.code = code;
this.schemaPath = schemaPath;
}
@SuppressWarnings("unchecked")
private void setup(OperatorContext context) {
SchemaPlus rootSchema = context.getFragmentContext().getFullRootSchema();
DataContext root = new DrillDataContext(
schemaPath != null ? SchemaUtilites.searchSchemaTree(rootSchema, SchemaUtilites.getSchemaPathAsList(schemaPath)) : rootSchema,
new JavaTypeFactoryImpl(),
Collections.emptyMap());
try {
Class<?> implementationClass = ClassBuilder.getCompiledClass(code, CLASS_NAME,
context.getFragmentContext().getConfig(), context.getFragmentContext().getOptions());
Iterable<?> iterable =
(Iterable<Map<String, Object>>) implementationClass.getMethod(BuiltInMethod.BINDABLE_BIND.method.getName(), DataContext.class)
.invoke(implementationClass.newInstance(), root);
if (fieldsMap.keySet().size() == 1) {
// for the case of projecting single column, its value is returned
records = StreamSupport.stream(iterable.spliterator(), false)
.map(this::wrap)
.iterator();
} else {
// for the case when all columns were projected, array is returned
records = StreamSupport.stream(iterable.spliterator(), false)
.map(row -> wrap((Object[]) row))
.iterator();
}
} catch (CompileException | IOException | ClassTransformationException | ReflectiveOperationException e) {
throw new RuntimeException("Exception happened when executing generated code", e.getCause());
}
}
@SuppressWarnings("unchecked")
private Map<String, Object> wrap(Object[] values) {
Map<String, Object> row = new HashMap<>();
columns.stream()
.map(SchemaPath::getRootSegmentPath)
.forEach(fieldName -> {
if (fieldName.equals(SchemaPath.DYNAMIC_STAR)) {
row.putAll((Map<? extends String, ?>) values[fieldsMap.get(fieldName)]);
} else {
row.put(fieldName, values[fieldsMap.get(fieldName)]);
}
});
return row;
}
@SuppressWarnings("unchecked")
private Map<String, Object> wrap(Object value) {
SchemaPath schemaPath = columns.iterator().next();
if (schemaPath.equals(SchemaPath.STAR_COLUMN)) {
return (Map<String, Object>) value;
}
return Collections.singletonMap(schemaPath.getRootSegmentPath(), value);
}
@Override
public boolean open(SchemaNegotiator negotiator) {
TupleMetadata providedSchema = negotiator.providedSchema();
loader = negotiator.build();
setup(negotiator.context());
ColumnConverterFactory factory = new ColumnConverterFactory(providedSchema);
converter = factory.getRootConverter(providedSchema, new TupleSchema(), loader.writer());
return true;
}
@Override
public boolean next() {
RowSetLoader rowWriter = loader.writer();
while (!rowWriter.isFull()) {
if (records.hasNext()) {
processRecord(rowWriter, records.next());
} else {
return false;
}
}
return true;
}
private void processRecord(RowSetLoader writer, Map<String, Object> record) {
writer.start();
converter.convert(record);
writer.save();
}
@Override
public void close() {
}
}