blob: d65e60be9d6636633d9aa2dde286e90e6e3eff1f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.repository.schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UTFDataFormatException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.List;
import java.util.Map;
public class SchemaRecordWriter {
static final int INLINE_RECORD_INDICATOR = 1;
static final int EXTERNAL_FILE_INDICATOR = 8;
public static final int MAX_ALLOWED_UTF_LENGTH = 65_535;
private static final Logger logger = LoggerFactory.getLogger(SchemaRecordWriter.class);
private static final int CACHE_BUFFER_SIZE = 65536;
private static final ByteArrayCache byteArrayCache = new ByteArrayCache(32, CACHE_BUFFER_SIZE);
public void writeRecord(final Record record, final OutputStream out) throws IOException {
// write sentinel value to indicate that there is a record. This allows the reader to then read one
// byte and check if -1. If so, the reader knows there are no more records. If not, then the reader
// knows that it should be able to continue reading.
out.write(INLINE_RECORD_INDICATOR);
final byte[] buffer = byteArrayCache.checkOut();
try {
writeRecordFields(record, out, buffer);
} finally {
byteArrayCache.checkIn(buffer);
}
}
private void writeRecordFields(final Record record, final OutputStream out, final byte[] buffer) throws IOException {
writeRecordFields(record, record.getSchema(), out, buffer);
}
private void writeRecordFields(final Record record, final RecordSchema schema, final OutputStream out, final byte[] buffer) throws IOException {
final DataOutputStream dos = out instanceof DataOutputStream ? (DataOutputStream) out : new DataOutputStream(out);
for (final RecordField field : schema.getFields()) {
final Object value = record.getFieldValue(field);
try {
writeFieldRepetitionAndValue(field, value, dos, buffer);
} catch (final Exception e) {
throw new IOException("Failed to write field '" + field.getFieldName() + "'", e);
}
}
}
private void writeFieldRepetitionAndValue(final RecordField field, final Object value, final DataOutputStream dos, final byte[] buffer) throws IOException {
switch (field.getRepetition()) {
case EXACTLY_ONE: {
if (value == null) {
throw new IllegalArgumentException("Record does not have a value for the '" + field.getFieldName() + "' but the field is required");
}
writeFieldValue(field, value, dos, buffer);
break;
}
case ZERO_OR_MORE: {
if (value == null) {
dos.writeInt(0);
break;
}
if (!(value instanceof Collection)) {
throw new IllegalArgumentException("Record contains a value of type '" + value.getClass() +
"' for the '" + field.getFieldName() + "' but expected a Collection because the Repetition for the field is " + field.getRepetition());
}
final Collection<?> collection = (Collection<?>) value;
dos.writeInt(collection.size());
for (final Object fieldValue : collection) {
writeFieldValue(field, fieldValue, dos, buffer);
}
break;
}
case ZERO_OR_ONE: {
if (value == null) {
dos.write(0);
break;
}
dos.write(1);
writeFieldValue(field, value, dos, buffer);
break;
}
}
}
private boolean allSingleByteInUtf8(final String value) {
for (int i = 0; i < value.length(); i++) {
final char ch = value.charAt(i);
if (ch < 1 || ch > 127) {
return false;
}
}
return true;
}
@SuppressWarnings("unchecked")
private void writeFieldValue(final RecordField field, final Object value, final DataOutputStream out, final byte[] buffer) throws IOException {
switch (field.getFieldType()) {
case BOOLEAN:
out.writeBoolean((boolean) value);
break;
case BYTE_ARRAY:
final byte[] array = (byte[]) value;
out.writeInt(array.length);
out.write(array);
break;
case INT:
out.writeInt((Integer) value);
break;
case LONG:
out.writeLong((Long) value);
break;
case STRING:
writeUTFLimited(out, (String) value, field.getFieldName());
break;
case LONG_STRING:
// In many cases, we will see a String value that consists solely of values in the range of
// 1-127, which means that in UTF-8 they will translate into a single byte each. If all characters
// in the string adhere to this, then we can skip calling String.getBytes() because that will allocate
// a new byte[] every time, which results in a lot of pressure on the garbage collector.
final String string = (String) value;
final int length = string.length();
if (length <= buffer.length && allSingleByteInUtf8(string)) {
out.writeInt(length);
for (int i = 0; i < length; i++) {
final char ch = string.charAt(i);
buffer[i] = (byte) ch;
}
out.write(buffer, 0, length);
} else {
final byte[] charArray = ((String) value).getBytes(StandardCharsets.UTF_8);
out.writeInt(charArray.length);
out.write(charArray);
}
break;
case MAP:
final Map<Object, Object> map = (Map<Object, Object>) value;
out.writeInt(map.size());
final List<RecordField> subFields = field.getSubFields();
final RecordField keyField = subFields.get(0);
final RecordField valueField = subFields.get(1);
for (final Map.Entry<Object, Object> entry : map.entrySet()) {
writeFieldRepetitionAndValue(keyField, entry.getKey(), out, buffer);
writeFieldRepetitionAndValue(valueField, entry.getValue(), out, buffer);
}
break;
case UNION:
final NamedValue namedValue = (NamedValue) value;
writeUTFLimited(out, namedValue.getName(), field.getFieldName());
final Record childRecord = (Record) namedValue.getValue();
writeRecordFields(childRecord, out, buffer);
break;
case COMPLEX:
final Record record = (Record) value;
writeRecordFields(record, out, buffer);
break;
}
}
private void writeUTFLimited(final DataOutputStream out, final String utfString, final String fieldName) throws IOException {
try {
out.writeUTF(utfString);
} catch (UTFDataFormatException e) {
final String truncated = utfString.substring(0, getCharsInUTF8Limit(utfString, MAX_ALLOWED_UTF_LENGTH));
logger.warn("Truncating repository record value for field '{}'! Attempted to write {} chars that encode to a UTF8 byte length greater than "
+ "supported maximum ({}), truncating to {} chars.",
(fieldName == null) ? "" : fieldName, utfString.length(), MAX_ALLOWED_UTF_LENGTH, truncated.length());
if (logger.isDebugEnabled()) {
logger.warn("String value was:\n{}", truncated);
}
out.writeUTF(truncated);
}
}
static int getCharsInUTF8Limit(final String str, final int utf8Limit) {
// Calculate how much of String fits within UTF8 byte limit based on RFC3629.
//
// Java String values use char[] for storage, so character values >0xFFFF that
// map to 4 byte UTF8 representations are not considered.
final int charsInOriginal = str.length();
int bytesInUTF8 = 0;
for (int i = 0; i < charsInOriginal; i++) {
final int curr = str.charAt(i);
if (curr < 0x0080) {
bytesInUTF8++;
} else if (curr < 0x0800) {
bytesInUTF8 += 2;
} else {
bytesInUTF8 += 3;
}
if (bytesInUTF8 > utf8Limit) {
return i;
}
}
return charsInOriginal;
}
public void writeExternalFileReference(final DataOutputStream out, final File externalFile) throws IOException {
out.write(EXTERNAL_FILE_INDICATOR);
out.writeUTF(externalFile.getAbsolutePath());
}
}