blob: a9ee538cf55d0bc6dfa575c488ebbda281f7e041 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.pojo;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.physical.impl.OutputMutator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.util.StdConverter;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
* Dynamically reads values from the given list of records.
* Creates writers based on given schema.
*
* @param <T> type of given values, if contains various types, use Object class
*/
@JsonTypeName("dynamic-pojo-record-reader")
public class DynamicPojoRecordReader<T> extends AbstractPojoRecordReader<List<T>> {
@JsonProperty
private LinkedHashMap<String, Class<?>> schema;
public DynamicPojoRecordReader(LinkedHashMap<String, Class<?>> schema, List<List<T>> records) {
super(records);
validateAndSetSchema(schema);
}
public DynamicPojoRecordReader(LinkedHashMap<String, Class<?>> schema, List<List<T>> records, int maxRecordsToRead) {
super(records, maxRecordsToRead);
validateAndSetSchema(schema);
}
/**
* Initiates writers based on given schema which contains field name and its type.
*
* @param output output mutator
* @return list of pojo writers
*/
@Override
protected List<PojoWriter> setupWriters(OutputMutator output) throws ExecutionSetupException {
List<PojoWriter> writers = new ArrayList<>();
for (Map.Entry<String, Class<?>> field : schema.entrySet()) {
writers.add(initWriter(field.getValue(), field.getKey(), output));
}
return writers;
}
@Override
protected Object getFieldValue(List<T> row, int fieldPosition) {
return row.get(fieldPosition);
}
@Override
public String toString() {
return "DynamicPojoRecordReader{" +
"records = " + records +
"}";
}
private void validateAndSetSchema(LinkedHashMap<String, Class<?>> schema) {
Preconditions.checkState(schema != null && !schema.isEmpty(), "Undefined schema is not allowed.");
this.schema = schema;
}
/**
* An utility class that converts from {@link com.fasterxml.jackson.databind.JsonNode}
* to DynamicPojoRecordReader during physical plan fragment deserialization.
*/
public static class Converter<T> extends StdConverter<JsonNode, DynamicPojoRecordReader<T>> {
private static final TypeReference<LinkedHashMap<String, Class<?>>> schemaType =
new TypeReference<LinkedHashMap<String, Class<?>>>() {};
private final ObjectMapper mapper;
public Converter(ObjectMapper mapper)
{
this.mapper = mapper;
}
@Override
public DynamicPojoRecordReader<T> convert(JsonNode value) {
LinkedHashMap<String, Class<T>> schema = mapper.convertValue(value.get("schema"), schemaType);
List<List<T>> records = new ArrayList<>();
JsonNode serializedRecords = value.get("records");
for (JsonNode serializedRecord : serializedRecords) {
List<T> record = new ArrayList<>(schema.size());
Iterator<JsonNode> recordsIterator = serializedRecord.elements();
for (Class<T> fieldType : schema.values()) {
record.add(mapper.convertValue(recordsIterator.next(), fieldType));
}
records.add(record);
}
int maxRecordsToRead = value.get("recordsPerBatch").asInt();
return new DynamicPojoRecordReader(schema, records, maxRecordsToRead);
}
}
}