blob: e23413f9b6b1b850769bb71c5ed1e00df56c2a9a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.orc.impl.writer;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.apache.orc.OrcProto;
import org.apache.orc.TypeDescription;
import org.apache.orc.impl.CryptoUtils;
import org.apache.orc.impl.IntegerWriter;
import org.apache.orc.impl.PositionRecorder;
import org.apache.orc.impl.SerializationUtils;
import org.apache.orc.impl.StreamName;
import java.io.IOException;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.TimeZone;
import java.util.function.Consumer;
public class TimestampTreeWriter extends TreeWriterBase {
public static final int MILLIS_PER_SECOND = 1000;
public static final String BASE_TIMESTAMP_STRING = "2015-01-01 00:00:00";
private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
private final IntegerWriter seconds;
private final IntegerWriter nanos;
private final boolean isDirectV2;
private final boolean alwaysUTC;
private final TimeZone localTimezone;
private final long epoch;
private final boolean useProleptic;
public TimestampTreeWriter(TypeDescription schema,
WriterEncryptionVariant encryption,
WriterContext writer,
boolean instantType) throws IOException {
super(schema, encryption, writer);
this.isDirectV2 = isNewWriteFormat(writer);
this.seconds = createIntegerWriter(writer.createStream(
new StreamName(id, OrcProto.Stream.Kind.DATA, encryption)),
true, isDirectV2, writer);
this.nanos = createIntegerWriter(writer.createStream(
new StreamName(id, OrcProto.Stream.Kind.SECONDARY, encryption)),
false, isDirectV2, writer);
if (rowIndexPosition != null) {
recordPosition(rowIndexPosition);
}
this.alwaysUTC = instantType || writer.getUseUTCTimestamp();
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
if (this.alwaysUTC) {
dateFormat.setTimeZone(UTC);
localTimezone = null;
epoch = dateFormat.parse(TimestampTreeWriter.BASE_TIMESTAMP_STRING).getTime() /
TimestampTreeWriter.MILLIS_PER_SECOND;
} else {
localTimezone = TimeZone.getDefault();
dateFormat.setTimeZone(localTimezone);
epoch = dateFormat.parse(TimestampTreeWriter.BASE_TIMESTAMP_STRING).getTime() /
TimestampTreeWriter.MILLIS_PER_SECOND;
}
} catch (ParseException e) {
throw new IOException("Unable to create base timestamp tree writer", e);
}
useProleptic = writer.getProlepticGregorian();
}
@Override
OrcProto.ColumnEncoding.Builder getEncoding() {
OrcProto.ColumnEncoding.Builder result = super.getEncoding();
result.setKind(isDirectV2 ? OrcProto.ColumnEncoding.Kind.DIRECT_V2
: OrcProto.ColumnEncoding.Kind.DIRECT);
return result;
}
@Override
public void writeBatch(ColumnVector vector, int offset,
int length) throws IOException {
super.writeBatch(vector, offset, length);
TimestampColumnVector vec = (TimestampColumnVector) vector;
vec.changeCalendar(useProleptic, true);
if (vector.isRepeating) {
if (vector.noNulls || !vector.isNull[0]) {
// ignore the bottom three digits from the vec.time field
final long secs = vec.time[0] / MILLIS_PER_SECOND;
final int newNanos = vec.nanos[0];
// set the millis based on the top three digits of the nanos
long millis = secs * MILLIS_PER_SECOND + newNanos / 1_000_000;
if (millis < 0 && newNanos > 999_999) {
millis -= MILLIS_PER_SECOND;
}
long utc = vec.isUTC() || alwaysUTC ?
millis : SerializationUtils.convertToUtc(localTimezone, millis);
indexStatistics.updateTimestamp(utc);
if (createBloomFilter) {
if (bloomFilter != null) {
bloomFilter.addLong(millis);
}
bloomFilterUtf8.addLong(utc);
}
final long nano = formatNanos(vec.nanos[0]);
for (int i = 0; i < length; ++i) {
seconds.write(secs - epoch);
nanos.write(nano);
}
}
} else {
for (int i = 0; i < length; ++i) {
if (vec.noNulls || !vec.isNull[i + offset]) {
// ignore the bottom three digits from the vec.time field
final long secs = vec.time[i + offset] / MILLIS_PER_SECOND;
final int newNanos = vec.nanos[i + offset];
// set the millis based on the top three digits of the nanos
long millis = secs * MILLIS_PER_SECOND + newNanos / 1_000_000;
if (millis < 0 && newNanos > 999_999) {
millis -= MILLIS_PER_SECOND;
}
long utc = vec.isUTC() || alwaysUTC ?
millis : SerializationUtils.convertToUtc(localTimezone, millis);
seconds.write(secs - epoch);
nanos.write(formatNanos(newNanos));
indexStatistics.updateTimestamp(utc);
if (createBloomFilter) {
if (bloomFilter != null) {
bloomFilter.addLong(millis);
}
bloomFilterUtf8.addLong(utc);
}
}
}
}
}
@Override
public void writeStripe(int requiredIndexEntries) throws IOException {
super.writeStripe(requiredIndexEntries);
if (rowIndexPosition != null) {
recordPosition(rowIndexPosition);
}
}
static long formatNanos(int nanos) {
if (nanos == 0) {
return 0;
} else if (nanos % 100 != 0) {
return ((long) nanos) << 3;
} else {
nanos /= 100;
int trailingZeros = 1;
while (nanos % 10 == 0 && trailingZeros < 7) {
nanos /= 10;
trailingZeros += 1;
}
return ((long) nanos) << 3 | trailingZeros;
}
}
@Override
void recordPosition(PositionRecorder recorder) throws IOException {
super.recordPosition(recorder);
seconds.getPosition(recorder);
nanos.getPosition(recorder);
}
@Override
public long estimateMemory() {
return super.estimateMemory() + seconds.estimateMemory() +
nanos.estimateMemory();
}
@Override
public long getRawDataSize() {
return fileStatistics.getNumberOfValues() *
JavaDataModel.get().lengthOfTimestamp();
}
@Override
public void flushStreams() throws IOException {
super.flushStreams();
seconds.flush();
nanos.flush();
}
@Override
public void prepareStripe(int stripeId) {
super.prepareStripe(stripeId);
Consumer<byte[]> updater = CryptoUtils.modifyIvForStripe(stripeId);
seconds.changeIv(updater);
nanos.changeIv(updater);
}
}