blob: 6a1a911b7241448394ab12a8f848176057339f56 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
<@pp.dropOutputFile />
<@pp.changeOutputFile name="org/apache/drill/exec/store/kudu/KuduRecordWriter.java" />
package org.apache.drill.exec.store.kudu;
import java.nio.charset.StandardCharsets;
import java.io.IOException;
import java.lang.UnsupportedOperationException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.expr.holders.*;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.store.*;
import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
import org.apache.drill.exec.util.DecimalUtility;
import org.apache.drill.exec.vector.*;
import org.apache.drill.exec.vector.complex.reader.FieldReader;
import org.apache.drill.exec.vector.complex.fn.JsonOutput;
import com.google.common.collect.Lists;
import org.apache.kudu.client.*;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.io.api.Binary;
import org.joda.time.DateTimeUtils;
import io.netty.buffer.DrillBuf;
public abstract class KuduRecordWriter extends AbstractRecordWriter implements RecordWriter {
private PartialRow row;
public void setUp(PartialRow row) {
this.row = row;
}
<#list vv.types as type>
<#list type.minor as minor>
<#list vv.modes as mode>
<#if mode.prefix == "Repeated" ||
minor.class == "TinyInt" ||
minor.class == "UInt1" ||
minor.class == "UInt2" ||
minor.class == "SmallInt" ||
minor.class == "Time" ||
minor.class == "Decimal9" ||
minor.class == "Decimal18" ||
minor.class == "Date" ||
minor.class == "UInt4" ||
minor.class == "Decimal28Sparse" ||
minor.class == "Decimal38Sparse" ||
minor.class?contains("Interval")
>
<#else>
@Override
public FieldConverter getNew${mode.prefix}${minor.class}Converter(int fieldId, String fieldName, FieldReader reader) {
return new ${mode.prefix}${minor.class}KuduConverter(fieldId, fieldName, reader);
}
public class ${mode.prefix}${minor.class}KuduConverter extends FieldConverter {
private Nullable${minor.class}Holder holder = new Nullable${minor.class}Holder();
public ${mode.prefix}${minor.class}KuduConverter(int fieldId, String fieldName, FieldReader reader) {
super(fieldId, fieldName, reader);
}
@Override
public void writeField() throws IOException {
<#if mode.prefix == "Nullable" >
if (!reader.isSet()) {
return;
}
</#if>
reader.read(holder);
<#if minor.class == "Float4">
row.addFloat(fieldId, holder.value);
<#elseif minor.class == "TimeStamp">
row.addLong(fieldId, holder.value*1000);
<#elseif minor.class == "Int">
row.addInt(fieldId, holder.value);
<#elseif minor.class == "BigInt">
row.addLong(fieldId, holder.value);
<#elseif minor.class == "Float8">
row.addDouble(fieldId, holder.value);
<#elseif minor.class == "Bit">
row.addBoolean(fieldId, holder.value == 1);
<#elseif minor.class == "VarChar" >
byte[] bytes = new byte[holder.end - holder.start];
holder.buffer.getBytes(holder.start, bytes);
row.addString(fieldId, new String(bytes, StandardCharsets.UTF_8));
<#elseif minor.class == "VarBinary">
byte[] bytes = new byte[holder.end - holder.start];
holder.buffer.getBytes(holder.start, bytes);
row.addBinary(fieldId, bytes);
reader.read(holder);
<#else>
throw new UnsupportedOperationException();
</#if>
}
}
</#if>
</#list>
</#list>
</#list>
}