blob: 0bb78b2109b7cba1fc29fa9be937e31ee0dffca9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.frame.write;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.field.FieldReaders;
import org.apache.druid.frame.key.KeyColumn;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionDictionarySelector;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.data.IndexedInts;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Utility methods used by {@link FrameWriter} implementations.
*/
public class FrameWriterUtils
{
public static final byte NULL_STRING_MARKER = (byte) 0xFF; /* cannot appear in a valid utf-8 byte sequence */
public static final byte[] NULL_STRING_MARKER_ARRAY = new byte[]{NULL_STRING_MARKER};
public static final String RESERVED_FIELD_PREFIX = "___druid";
/**
* Writes a frame header to a memory locations.
*/
public static long writeFrameHeader(
final WritableMemory memory,
final long startPosition,
final FrameType frameType,
final long totalSize,
final int numRows,
final int numRegions,
final boolean permuted
)
{
long currentPosition = startPosition;
memory.putByte(currentPosition, frameType.version());
currentPosition += Byte.BYTES;
memory.putLong(currentPosition, totalSize);
currentPosition += Long.BYTES;
memory.putInt(currentPosition, numRows);
currentPosition += Integer.BYTES;
memory.putInt(currentPosition, numRegions);
currentPosition += Integer.BYTES;
memory.putByte(currentPosition, permuted ? (byte) 1 : (byte) 0);
currentPosition += Byte.BYTES;
return currentPosition - startPosition;
}
/**
* Retrieves UTF-8 byte buffers from a {@link DimensionSelector}, which is expected to be the kind of
* selector you get for an {@code STRING} column.
*
* Null strings are returned as {@link #NULL_STRING_MARKER_ARRAY}.
*
* @param selector the selector
* @param multiValue if true, return an array that corresponds exactly to {@link DimensionSelector#getRow()}.
* if false, always return a single-valued array. In particular, this means [] is
* returned as [NULL_STRING_MARKER_ARRAY].
*
* @return UTF-8 strings. The list itself is never null.
*/
public static List<ByteBuffer> getUtf8ByteBuffersFromStringSelector(
final DimensionSelector selector,
final boolean multiValue
)
{
final IndexedInts row = selector.getRow();
final int size = row.size();
if (multiValue) {
final List<ByteBuffer> retVal = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
retVal.add(getUtf8ByteBufferFromStringSelector(selector, row.get(i)));
}
return retVal;
} else {
// If !multivalue, always return exactly one buffer.
if (size == 0) {
return Collections.singletonList(ByteBuffer.wrap(NULL_STRING_MARKER_ARRAY));
} else if (size == 1) {
return Collections.singletonList(getUtf8ByteBufferFromStringSelector(selector, row.get(0)));
} else {
throw new ISE("Encountered unexpected multi-value row");
}
}
}
/**
* Retrieves UTF-8 byte buffers from a {@link ColumnValueSelector}, which is expected to be the kind of
* selector you get for an {@code ARRAY<STRING>} column.
*
* Null strings are returned as {@code null}.
*
* If the entire array returned by {@link BaseObjectColumnValueSelector#getObject()} is null, returns either
* null or {@link #NULL_STRING_MARKER_ARRAY} depending on the value of "useNullArrays".
*
* @param selector array selector
*
* @return UTF-8 strings. The list itself may be null.
*/
@Nullable
public static List<ByteBuffer> getUtf8ByteBuffersFromStringArraySelector(
@SuppressWarnings("rawtypes") final BaseObjectColumnValueSelector selector
)
{
Object row = selector.getObject();
if (row == null) {
return null;
} else if (row instanceof String) {
return Collections.singletonList(getUtf8ByteBufferFromString((String) row));
}
final List<ByteBuffer> retVal = new ArrayList<>();
if (row instanceof List) {
for (int i = 0; i < ((List<?>) row).size(); i++) {
retVal.add(getUtf8ByteBufferFromString(((List<String>) row).get(i)));
}
} else if (row instanceof Object[]) {
for (Object value : (Object[]) row) {
retVal.add(getUtf8ByteBufferFromString((String) value));
}
} else {
throw new ISE("Unexpected type %s found", row.getClass().getName());
}
return retVal;
}
/**
* Retrieves a numeric list from a Java object, given that the object is an instance of something that can be returned
* from {@link ColumnValueSelector#getObject()} of valid numeric array selectors representations
*
* While {@link BaseObjectColumnValueSelector} specifies that only instances of {@code Object[]} can be returned from
* the numeric array selectors, this method also handles a few more cases which can be encountered if the selector is
* directly implemented on top of the group by stuff
*/
@Nullable
public static List<? extends Number> getNumericArrayFromObject(Object row)
{
if (row == null) {
return null;
} else if (row instanceof Number) {
return Collections.singletonList((Number) row);
}
final List<Number> retVal = new ArrayList<>();
if (row instanceof List) {
for (int i = 0; i < ((List<?>) row).size(); i++) {
retVal.add((Number) ((List<?>) row).get(i));
}
} else if (row instanceof Object[]) {
for (Object value : (Object[]) row) {
retVal.add((Number) value);
}
} else {
throw new ISE("Unexpected type %s found", row.getClass().getName());
}
return retVal;
}
/**
* Checks the provided signature for any disallowed field names. Returns any that are found.
*/
public static Set<String> findDisallowedFieldNames(final RowSignature signature)
{
return signature.getColumnNames()
.stream()
.filter(s -> s.startsWith(RESERVED_FIELD_PREFIX))
.collect(Collectors.toSet());
}
/**
* Verifies whether the provided sortColumns are all sortable, and are a prefix of the signature. This is required
* because it allows us to treat the sort key as a chunk of bytes.
*
* Exits quietly if the sort columns are OK. Throws an exception if there is a problem.
*
* @throws IllegalArgumentException if there is a problem
*/
public static void verifySortColumns(
final List<KeyColumn> keyColumns,
final RowSignature signature
)
{
if (!areKeyColumnsPrefixOfSignature(keyColumns, signature)) {
throw new IAE(
"Sort column [%s] must be a prefix of the signature",
keyColumns.stream().map(KeyColumn::columnName).collect(Collectors.joining(", "))
);
}
// Verify that all sort columns are comparable.
for (final KeyColumn keyColumn : keyColumns) {
final ColumnType columnType = signature.getColumnType(keyColumn.columnName()).orElse(null);
if (columnType == null || !FieldReaders.create(keyColumn.columnName(), columnType).isComparable()) {
throw new IAE("Sort column [%s] is not comparable (type = %s)", keyColumn.columnName(), columnType);
}
}
}
/**
* Copies "len" bytes from {@code src.position()} to "dstPosition" in "memory". Does not update the position of src.
*
* @throws InvalidNullByteException if "allowNullBytes" is false and a null byte is encountered
*/
public static void copyByteBufferToMemory(
final ByteBuffer src,
final WritableMemory dst,
final long dstPosition,
final int len,
final boolean allowNullBytes
)
{
if (src.remaining() < len) {
throw new ISE("Insufficient source space available");
}
if (dst.getCapacity() - dstPosition < len) {
throw new ISE("Insufficient destination space available");
}
final int srcEnd = src.position() + len;
long q = dstPosition;
for (int p = src.position(); p < srcEnd; p++, q++) {
final byte b = src.get(p);
if (!allowNullBytes && b == 0) {
ByteBuffer duplicate = src.duplicate();
duplicate.limit(srcEnd);
throw InvalidNullByteException.builder()
.value(StringUtils.fromUtf8(duplicate))
.position(p - src.position())
.build();
}
dst.putByte(q, b);
}
}
/**
* Extracts a ByteBuffer from the provided dictionary selector.
*
* Null strings are returned as {@link #NULL_STRING_MARKER_ARRAY}.
*/
private static ByteBuffer getUtf8ByteBufferFromStringSelector(
final DimensionDictionarySelector selector,
final int dictionaryId
)
{
if (selector.supportsLookupNameUtf8()) {
final ByteBuffer buf = selector.lookupNameUtf8(dictionaryId);
if (buf == null || (NullHandling.replaceWithDefault() && buf.remaining() == 0)) {
return ByteBuffer.wrap(NULL_STRING_MARKER_ARRAY);
} else {
return buf;
}
} else {
return FrameWriterUtils.getUtf8ByteBufferFromString(selector.lookupName(dictionaryId));
}
}
/**
* Extracts a ByteBuffer from the string. Null strings are returned as {@link #NULL_STRING_MARKER_ARRAY}.
*/
private static ByteBuffer getUtf8ByteBufferFromString(@Nullable final String data)
{
if (NullHandling.isNullOrEquivalent(data)) {
return ByteBuffer.wrap(NULL_STRING_MARKER_ARRAY);
} else {
return ByteBuffer.wrap(StringUtils.toUtf8(data));
}
}
/**
* Returns whether the provided key columns are all a prefix of the signature.
*/
private static boolean areKeyColumnsPrefixOfSignature(
final List<KeyColumn> keyColumns,
final RowSignature signature
)
{
if (keyColumns.size() > signature.size()) {
return false;
}
for (int i = 0; i < keyColumns.size(); i++) {
if (!keyColumns.get(i).columnName().equals(signature.getColumnName(i))) {
return false;
}
}
return true;
}
public static RowSignature replaceUnknownTypesWithNestedColumns(final RowSignature rowSignature)
{
RowSignature.Builder retBuilder = RowSignature.builder();
for (int i = 0; i < rowSignature.size(); ++i) {
String columnName = rowSignature.getColumnName(i);
ColumnType columnType = rowSignature.getColumnType(i).orElse(ColumnType.NESTED_DATA);
retBuilder.add(columnName, columnType);
}
return retBuilder.build();
}
}