blob: 3ec61fa5a7e36d58b18b6d110fc81cbd8c124b06 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Code } from '@blueprintjs/core';
import type { Field } from '../../components';
import { AutoForm, ExternalLink } from '../../components';
import { getLink } from '../../links';
import { compact, deepGet, deepSet, oneOf, oneOfKnown, typeIsKnown } from '../../utils';
import type { FlattenSpec } from '../flatten-spec/flatten-spec';
import {
AVRO_BYTES_DECODER_COMPLETIONS,
FEATURE_SPEC_COMPLETIONS,
PROTO_BYTES_DECODER_COMPLETIONS,
} from './input-format-completions';
export interface InputFormat {
readonly type: string;
readonly findColumnsFromHeader?: boolean;
readonly skipHeaderRows?: number;
readonly columns?: string[];
readonly delimiter?: string;
readonly listDelimiter?: string | null;
readonly pattern?: string;
readonly function?: string;
readonly flattenSpec?: FlattenSpec | null;
readonly featureSpec?: Record<string, boolean>;
readonly keepNullColumns?: boolean;
readonly assumeNewlineDelimited?: boolean;
readonly useJsonNodeReader?: boolean;
// type: kafka
readonly timestampColumnName?: string;
readonly topicColumnName?: string;
readonly headerFormat?: { type: 'string'; encoding?: string };
readonly headerColumnPrefix?: string;
readonly keyFormat?: InputFormat;
readonly keyColumnName?: string;
readonly valueFormat?: InputFormat;
}
const KNOWN_TYPES = [
'json',
'csv',
'tsv',
'parquet',
'orc',
'avro_ocf',
'avro_stream',
'protobuf',
'regex',
'javascript',
'kafka',
'kinesis',
];
function generateInputFormatFields(streaming: boolean) {
return compact([
{
name: 'type',
label: 'Input format',
type: 'string',
suggestions: [
'json',
'csv',
'tsv',
'parquet',
'orc',
'avro_ocf',
'avro_stream',
'protobuf',
'regex',
'javascript',
],
required: true,
info: (
<>
<p>The parser used to parse the data.</p>
<p>
For more information see{' '}
<ExternalLink href={`${getLink('DOCS')}/ingestion/data-formats`}>
the documentation
</ExternalLink>
.
</p>
</>
),
},
{
name: 'featureSpec',
label: 'JSON parser features',
type: 'json',
defined: typeIsKnown(KNOWN_TYPES, 'json'),
jsonCompletions: FEATURE_SPEC_COMPLETIONS,
info: (
<>
<p>
<ExternalLink href="https://github.com/FasterXML/jackson-core/wiki/JsonParser-Features">
JSON parser features
</ExternalLink>{' '}
supported by Jackson library. Those features will be applied when parsing the input JSON
data.
</p>
<p>
Example:{' '}
<Code>{`{ "ALLOW_SINGLE_QUOTES": true, "ALLOW_UNQUOTED_FIELD_NAMES": true }`}</Code>
</p>
</>
),
},
streaming
? {
name: 'assumeNewlineDelimited',
type: 'boolean',
defined: typeIsKnown(KNOWN_TYPES, 'json'),
disabled: inputFormat => Boolean(inputFormat.useJsonNodeReader),
defaultValue: false,
info: (
<>
<p>
In streaming ingestion, multi-line JSON events can be ingested (i.e. where a single
JSON event spans multiple lines). However, if a parsing exception occurs, all JSON
events that are present in the same streaming record will be discarded.
</p>
<p>
<Code>assumeNewlineDelimited</Code> and <Code>useJsonNodeReader</Code> (at most one
can be <Code>true</Code>) affect only how parsing exceptions are handled.
</p>
<p>
If the input is known to be newline delimited JSON (each individual JSON event is
contained in a single line, separated by newlines), setting this option to true
allows for more flexible parsing exception handling. Only the lines with invalid
JSON syntax will be discarded, while lines containing valid JSON events will still
be ingested.
</p>
</>
),
}
: undefined,
streaming
? {
name: 'useJsonNodeReader',
label: 'Use JSON node reader',
type: 'boolean',
defined: typeIsKnown(KNOWN_TYPES, 'json'),
disabled: inputFormat => Boolean(inputFormat.assumeNewlineDelimited),
defaultValue: false,
info: (
<>
{' '}
<p>
In streaming ingestion, multi-line JSON events can be ingested (i.e. where a single
JSON event spans multiple lines). However, if a parsing exception occurs, all JSON
events that are present in the same streaming record will be discarded.
</p>
<p>
<Code>assumeNewlineDelimited</Code> and <Code>useJsonNodeReader</Code> (at most one
can be <Code>true</Code>) affect only how parsing exceptions are handled.
</p>
<p>
When ingesting multi-line JSON events, enabling this option will enable the use of a
JSON parser which will retain any valid JSON events encountered within a streaming
record prior to when a parsing exception occurred.
</p>
</>
),
}
: undefined,
{
name: 'delimiter',
type: 'string',
defaultValue: '\t',
suggestions: ['\t', ';', '|', '#'],
defined: typeIsKnown(KNOWN_TYPES, 'tsv'),
info: <>A custom delimiter for data values.</>,
},
{
name: 'pattern',
type: 'string',
defined: typeIsKnown(KNOWN_TYPES, 'regex'),
required: true,
},
{
name: 'function',
type: 'string',
defined: typeIsKnown(KNOWN_TYPES, 'javascript'),
required: true,
},
{
name: 'skipHeaderRows',
type: 'number',
defaultValue: 0,
defined: typeIsKnown(KNOWN_TYPES, 'csv', 'tsv'),
min: 0,
info: (
<>
If this is set, skip the first <Code>skipHeaderRows</Code> rows from each file.
</>
),
},
{
name: 'findColumnsFromHeader',
type: 'boolean',
defined: typeIsKnown(KNOWN_TYPES, 'csv', 'tsv'),
required: true,
info: (
<>
If this is set, find the column names from the header row. Note that
<Code>skipHeaderRows</Code> will be applied before finding column names from the header.
For example, if you set <Code>skipHeaderRows</Code> to 2 and{' '}
<Code>findColumnsFromHeader</Code> to true, the task will skip the first two lines and
then extract column information from the third line.
</>
),
},
{
name: 'columns',
type: 'string-array',
required: true,
defined: p =>
(oneOf(p.type, 'csv', 'tsv') && p.findColumnsFromHeader === false) || p.type === 'regex',
info: (
<>
Specifies the columns of the data. The columns should be in the same order with the
columns of your data.
</>
),
},
{
name: 'listDelimiter',
type: 'string',
defaultValue: '\x01',
suggestions: ['\x01', '\x00'],
defined: typeIsKnown(KNOWN_TYPES, 'csv', 'tsv', 'regex'),
info: <>A custom delimiter for multi-value dimensions.</>,
},
{
name: 'avroBytesDecoder',
type: 'json',
defined: typeIsKnown(KNOWN_TYPES, 'avro_stream'),
required: true,
placeholder: `{ type: "schema_repo", ... }`,
jsonCompletions: AVRO_BYTES_DECODER_COMPLETIONS,
info: (
<>
<p>Specifies how to decode bytes to Avro record.</p>
<p>
For more details refer to the{' '}
<ExternalLink href={`${getLink('DOCS')}/ingestion/data-formats/#avro-bytes-decoder`}>
documentation
</ExternalLink>
.
</p>
</>
),
},
{
name: 'schema',
type: 'json',
defined: typeIsKnown(KNOWN_TYPES, 'avro_ocf'),
info: (
<>
Define a reader schema to be used when parsing Avro records. This is useful when parsing
multiple versions of Avro OCF file data.
</>
),
},
{
name: 'protoBytesDecoder',
type: 'json',
defined: typeIsKnown(KNOWN_TYPES, 'protobuf'),
required: true,
placeholder: `{ ... }`,
jsonCompletions: PROTO_BYTES_DECODER_COMPLETIONS,
info: <>Specifies how to decode bytes to Protobuf record.</>,
},
{
name: 'binaryAsString',
type: 'boolean',
defaultValue: false,
defined: typeIsKnown(KNOWN_TYPES, 'parquet', 'orc', 'avro_ocf', 'avro_stream'),
info: (
<>
Specifies if the binary column which is not logically marked as a string should be treated
as a UTF-8 encoded string.
</>
),
},
] as (Field<InputFormat> | undefined)[]);
}
export const BATCH_INPUT_FORMAT_FIELDS: Field<InputFormat>[] = generateInputFormatFields(false);
export const STREAMING_INPUT_FORMAT_FIELDS: Field<InputFormat>[] = generateInputFormatFields(true);
export const KAFKA_METADATA_INPUT_FORMAT_FIELDS: Field<InputFormat>[] = [
{
name: 'timestampColumnName',
label: 'Kafka timestamp column name',
type: 'string',
defaultValue: 'kafka.timestamp',
defined: typeIsKnown(KNOWN_TYPES, 'kafka'),
info: `Name of the column for the Kafka record's timestamp.`,
},
{
name: 'topicColumnName',
label: 'Kafka topic column name',
type: 'string',
defaultValue: 'kafka.topic',
defined: typeIsKnown(KNOWN_TYPES, 'kafka'),
info: `Name of the column for the topic from which the Kafka record came.`,
},
// -----------------------------------------------------
// keyFormat fields
{
name: 'keyFormat.type',
label: 'Kafka key input format',
type: 'string',
suggestions: [
undefined,
'json',
'csv',
'tsv',
'parquet',
'orc',
'avro_ocf',
'avro_stream',
'regex',
],
placeholder: `(don't parse Kafka key)`,
defined: typeIsKnown(KNOWN_TYPES, 'kafka'),
info: (
<>
<p>The parser used to parse the key of the Kafka message.</p>
<p>
For more information see{' '}
<ExternalLink href={`${getLink('DOCS')}/ingestion/data-formats`}>
the documentation
</ExternalLink>
.
</p>
</>
),
adjustment: inputFormat => {
const keyFormatType = deepGet(inputFormat, 'keyFormat.type');
// If the user selects one of these formats then populate the columns (that are in any case meaningless in this context)
// with an initial value.
switch (keyFormatType) {
case 'regex':
inputFormat = deepSet(inputFormat, 'keyFormat.pattern', '([\\s\\S]*)');
inputFormat = deepSet(inputFormat, 'keyFormat.columns', ['x']);
break;
case 'csv':
case 'tsv':
inputFormat = deepSet(inputFormat, 'keyFormat.findColumnsFromHeader', false);
inputFormat = deepSet(inputFormat, 'keyFormat.columns', ['x']);
break;
}
return inputFormat;
},
},
{
name: 'keyFormat.featureSpec',
label: 'Kafka key JSON parser features',
type: 'json',
defined: inputFormat => oneOfKnown(deepGet(inputFormat, 'keyFormat.type'), KNOWN_TYPES, 'json'),
hideInMore: true,
info: (
<>
<p>
<ExternalLink href="https://github.com/FasterXML/jackson-core/wiki/JsonParser-Features">
JSON parser features
</ExternalLink>{' '}
supported by Jackson library. Those features will be applied when parsing the input JSON
data.
</p>
<p>
Example:{' '}
<Code>{`{ "ALLOW_SINGLE_QUOTES": true, "ALLOW_UNQUOTED_FIELD_NAMES": true }`}</Code>
</p>
</>
),
},
{
name: 'keyFormat.assumeNewlineDelimited',
label: 'Kafka key assume newline delimited',
type: 'boolean',
defined: inputFormat => oneOfKnown(deepGet(inputFormat, 'keyFormat.type'), KNOWN_TYPES, 'json'),
disabled: inputFormat => Boolean(inputFormat.useJsonNodeReader),
defaultValue: false,
hideInMore: true,
info: (
<>
<p>
In streaming ingestion, multi-line JSON events can be ingested (i.e. where a single JSON
event spans multiple lines). However, if a parsing exception occurs, all JSON events that
are present in the same streaming record will be discarded.
</p>
<p>
<Code>assumeNewlineDelimited</Code> and <Code>useJsonNodeReader</Code> (at most one can be{' '}
<Code>true</Code>) affect only how parsing exceptions are handled.
</p>
<p>
If the input is known to be newline delimited JSON (each individual JSON event is
contained in a single line, separated by newlines), setting this option to true allows for
more flexible parsing exception handling. Only the lines with invalid JSON syntax will be
discarded, while lines containing valid JSON events will still be ingested.
</p>
</>
),
},
{
name: 'keyFormat.useJsonNodeReader',
label: 'Kafka key use JSON node reader',
type: 'boolean',
defined: inputFormat => oneOfKnown(deepGet(inputFormat, 'keyFormat.type'), KNOWN_TYPES, 'json'),
disabled: inputFormat => Boolean(inputFormat.assumeNewlineDelimited),
defaultValue: false,
hideInMore: true,
info: (
<>
{' '}
<p>
In streaming ingestion, multi-line JSON events can be ingested (i.e. where a single JSON
event spans multiple lines). However, if a parsing exception occurs, all JSON events that
are present in the same streaming record will be discarded.
</p>
<p>
<Code>assumeNewlineDelimited</Code> and <Code>useJsonNodeReader</Code> (at most one can be{' '}
<Code>true</Code>) affect only how parsing exceptions are handled.
</p>
<p>
When ingesting multi-line JSON events, enabling this option will enable the use of a JSON
parser which will retain any valid JSON events encountered within a streaming record prior
to when a parsing exception occurred.
</p>
</>
),
},
{
name: 'keyFormat.delimiter',
label: 'Kafka key delimiter',
type: 'string',
defaultValue: '\t',
suggestions: ['\t', ';', '|', '#'],
defined: inputFormat => oneOfKnown(deepGet(inputFormat, 'keyFormat.type'), KNOWN_TYPES, 'tsv'),
info: <>A custom delimiter for data values.</>,
},
{
name: 'keyFormat.pattern',
label: 'Kafka key pattern',
type: 'string',
defined: inputFormat =>
oneOfKnown(deepGet(inputFormat, 'keyFormat.type'), KNOWN_TYPES, 'regex'),
required: true,
},
{
name: 'keyFormat.skipHeaderRows',
label: 'Kafka key skip header rows',
type: 'number',
defaultValue: 0,
defined: inputFormat =>
oneOfKnown(deepGet(inputFormat, 'keyFormat.type'), KNOWN_TYPES, 'csv', 'tsv'),
min: 0,
info: (
<>
If this is set, skip the first <Code>skipHeaderRows</Code> rows from each file.
</>
),
},
{
name: 'keyFormat.findColumnsFromHeader',
label: 'Kafka key find columns from header',
type: 'boolean',
defined: inputFormat =>
oneOfKnown(deepGet(inputFormat, 'keyFormat.type'), KNOWN_TYPES, 'csv', 'tsv'),
required: true,
hideInMore: true,
info: (
<>
If this is set, find the column names from the header row. Note that
<Code>skipHeaderRows</Code> will be applied before finding column names from the header. For
example, if you set <Code>skipHeaderRows</Code> to 2 and <Code>findColumnsFromHeader</Code>{' '}
to true, the task will skip the first two lines and then extract column information from the
third line.
</>
),
},
{
name: 'keyFormat.columns',
label: 'Kafka key columns',
type: 'string-array',
required: true,
defined: inputFormat =>
(oneOf(deepGet(inputFormat, 'keyFormat.type'), 'csv', 'tsv') &&
deepGet(inputFormat, 'keyFormat.findColumnsFromHeader') === false) ||
deepGet(inputFormat, 'keyFormat.type') === 'regex',
hideInMore: true,
info: (
<>
Only the value of the first column will be read, the name of the column will be ignored so
enter anything here.
</>
),
},
{
name: 'keyFormat.listDelimiter',
label: 'Kafka key list delimiter',
type: 'string',
defaultValue: '\x01',
suggestions: ['\x01', '\x00'],
defined: inputFormat =>
oneOfKnown(deepGet(inputFormat, 'keyFormat.type'), KNOWN_TYPES, 'csv', 'tsv', 'regex'),
info: <>A custom delimiter for multi-value dimensions.</>,
},
{
name: 'keyFormat.avroBytesDecoder',
label: 'Kafka key Avro bytes decoder',
type: 'json',
defined: inputFormat =>
oneOfKnown(deepGet(inputFormat, 'keyFormat.type'), KNOWN_TYPES, 'avro_stream'),
required: true,
placeholder: `{ type: "schema_repo", ... }`,
info: (
<>
<p>Specifies how to decode bytes to Avro record.</p>
<p>
For more details refer to the{' '}
<ExternalLink href={`${getLink('DOCS')}/ingestion/data-formats/#avro-bytes-decoder`}>
documentation
</ExternalLink>
.
</p>
</>
),
},
{
name: 'keyFormat.schema',
label: 'Key format schema',
type: 'json',
defined: inputFormat =>
oneOfKnown(deepGet(inputFormat, 'keyFormat.type'), KNOWN_TYPES, 'avro_ocf'),
info: (
<>
Define a reader schema to be used when parsing Avro records. This is useful when parsing
multiple versions of Avro OCF file data.
</>
),
},
{
name: 'keyFormat.protoBytesDecoder',
label: 'Kafka key proto bytes decoder',
type: 'json',
defined: inputFormat =>
oneOfKnown(deepGet(inputFormat, 'keyFormat.type'), KNOWN_TYPES, 'protobuf'),
required: true,
placeholder: `{ ... }`,
info: <>Specifies how to decode bytes to Protobuf record.</>,
},
{
name: 'keyFormat.binaryAsString',
label: 'Kafka key binary as string',
type: 'boolean',
defaultValue: false,
defined: inputFormat =>
oneOf(deepGet(inputFormat, 'valueFormat.type'), 'parquet', 'orc', 'avro_ocf', 'avro_stream'),
info: (
<>
Specifies if the binary column which is not logically marked as a string should be treated
as a UTF-8 encoded string.
</>
),
},
// keyColumnName
{
name: 'keyColumnName',
label: 'Kafka key column name',
type: 'string',
defaultValue: 'kafka.key',
defined: inputFormat => Boolean(deepGet(inputFormat, 'keyFormat.type')),
info: `Custom prefix for all the header columns.`,
},
// -----------------------------------------------------
{
name: 'headerFormat.type',
label: 'Kafka header format type',
type: 'string',
defined: typeIsKnown(KNOWN_TYPES, 'kafka'),
placeholder: `(don't parse Kafka headers)`,
suggestions: [undefined, 'string'],
},
{
name: 'headerFormat.encoding',
label: 'Kafka header format encoding',
type: 'string',
defaultValue: 'UTF-8',
defined: inputFormat => deepGet(inputFormat, 'headerFormat.type') === 'string',
suggestions: ['UTF-8', 'UTF-16BE', 'UTF-16LE', 'UTF-16', 'US-ASCII', 'ISO-8859-1'],
},
{
name: 'headerColumnPrefix',
label: 'Kafka header column prefix',
type: 'string',
defaultValue: 'kafka.header.',
defined: inputFormat => deepGet(inputFormat, 'headerFormat.type') === 'string',
info: `Custom prefix for all the header columns.`,
},
];
export const KINESIS_METADATA_INPUT_FORMAT_FIELDS: Field<InputFormat>[] = [
{
name: 'timestampColumnName',
label: 'Kinesis timestamp column name',
type: 'string',
defaultValue: 'kinesis.timestamp',
defined: typeIsKnown(KNOWN_TYPES, 'kinesis'),
info: `The name of the column for the Kinesis timestamp.`,
},
{
name: 'partitionKeyColumnName',
label: 'Kinesis partition key column name',
type: 'string',
defaultValue: 'kinesis.partitionKey',
defined: typeIsKnown(KNOWN_TYPES, 'kinesis'),
info: `The name of the column for the Kinesis partition key. This field is useful when ingesting data from multiple partitions into the same datasource.`,
},
];
export function issueWithInputFormat(inputFormat: InputFormat | undefined): string | undefined {
return AutoForm.issueWithModel(inputFormat, BATCH_INPUT_FORMAT_FIELDS);
}
export function isKafkaOrKinesis(type: string | undefined): type is 'kafka' | 'kinesis' {
return type === 'kafka' || type === 'kinesis';
}
export function inputFormatCanProduceNestedData(inputFormat: InputFormat): boolean {
if (isKafkaOrKinesis(inputFormat.type)) {
return Boolean(
inputFormat.valueFormat && inputFormatCanProduceNestedData(inputFormat.valueFormat),
);
}
return oneOf(inputFormat.type, 'json', 'parquet', 'orc', 'avro_ocf', 'avro_stream', 'protobuf');
}