| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| import { Code } from '@blueprintjs/core'; |
| import React from 'react'; |
| |
| import { AutoForm, ExternalLink, Field } from '../components'; |
| import { getLink } from '../links'; |
| import { |
| deepDelete, |
| deepGet, |
| deepMove, |
| deepSet, |
| deepSetIfUnset, |
| EMPTY_ARRAY, |
| EMPTY_OBJECT, |
| filterMap, |
| oneOf, |
| typeIs, |
| } from '../utils'; |
| import { HeaderAndRows } from '../utils/sampler'; |
| |
| import { |
| DimensionsSpec, |
| getDimensionSpecName, |
| getDimensionSpecs, |
| getDimensionSpecType, |
| } from './dimension-spec'; |
| import { InputFormat, issueWithInputFormat } from './input-format'; |
| import { InputSource, issueWithInputSource } from './input-source'; |
| import { |
| getMetricSpecOutputType, |
| getMetricSpecs, |
| getMetricSpecSingleFieldName, |
| MetricSpec, |
| } from './metric-spec'; |
| import { TimestampSpec } from './timestamp-spec'; |
| import { TransformSpec } from './transform-spec'; |
| |
| export const MAX_INLINE_DATA_LENGTH = 65536; |
| |
| const CURRENT_YEAR = new Date().getUTCFullYear(); |
| |
| export interface IngestionSpec { |
| readonly type: IngestionType; |
| readonly spec: IngestionSpecInner; |
| } |
| |
| export interface IngestionSpecInner { |
| readonly ioConfig: IoConfig; |
| readonly dataSchema: DataSchema; |
| readonly tuningConfig?: TuningConfig; |
| } |
| |
| export function isEmptyIngestionSpec(spec: Partial<IngestionSpec>) { |
| return Object.keys(spec).length === 0; |
| } |
| |
| export type IngestionType = 'kafka' | 'kinesis' | 'index_parallel'; |
| |
| // A combination of IngestionType and inputSourceType |
| export type IngestionComboType = |
| | 'kafka' |
| | 'kinesis' |
| | 'index_parallel:http' |
| | 'index_parallel:local' |
| | 'index_parallel:druid' |
| | 'index_parallel:inline' |
| | 'index_parallel:s3' |
| | 'index_parallel:azure' |
| | 'index_parallel:google' |
| | 'index_parallel:hdfs'; |
| |
| // Some extra values that can be selected in the initial screen |
| export type IngestionComboTypeWithExtra = |
| | IngestionComboType |
| | 'azure-event-hubs' |
| | 'hadoop' |
| | 'example' |
| | 'other'; |
| |
| function ingestionTypeToIoAndTuningConfigType(ingestionType: IngestionType): string { |
| switch (ingestionType) { |
| case 'kafka': |
| case 'kinesis': |
| case 'index_parallel': |
| return ingestionType; |
| |
| default: |
| throw new Error(`unknown type '${ingestionType}'`); |
| } |
| } |
| |
| export function getIngestionComboType( |
| spec: Partial<IngestionSpec>, |
| ): IngestionComboType | undefined { |
| const ioConfig = deepGet(spec, 'spec.ioConfig') || EMPTY_OBJECT; |
| |
| switch (ioConfig.type) { |
| case 'kafka': |
| case 'kinesis': |
| return ioConfig.type; |
| |
| case 'index_parallel': { |
| const inputSource = deepGet(spec, 'spec.ioConfig.inputSource') || EMPTY_OBJECT; |
| switch (inputSource.type) { |
| case 'local': |
| case 'http': |
| case 'druid': |
| case 'inline': |
| case 's3': |
| case 'azure': |
| case 'google': |
| case 'hdfs': |
| return `${ioConfig.type}:${inputSource.type}` as IngestionComboType; |
| } |
| } |
| } |
| |
| return; |
| } |
| |
| export function getIngestionTitle(ingestionType: IngestionComboTypeWithExtra): string { |
| switch (ingestionType) { |
| case 'index_parallel:local': |
| return 'Local disk'; |
| |
| case 'index_parallel:http': |
| return 'HTTP(s)'; |
| |
| case 'index_parallel:druid': |
| return 'Reindex from Druid'; |
| |
| case 'index_parallel:inline': |
| return 'Paste data'; |
| |
| case 'index_parallel:s3': |
| return 'Amazon S3'; |
| |
| case 'index_parallel:azure': |
| return 'Azure Data Lake'; |
| |
| case 'index_parallel:google': |
| return 'Google Cloud Storage'; |
| |
| case 'index_parallel:hdfs': |
| return 'HDFS'; |
| |
| case 'kafka': |
| return 'Apache Kafka'; |
| |
| case 'kinesis': |
| return 'Amazon Kinesis'; |
| |
| case 'hadoop': |
| return 'HDFS'; |
| |
| case 'azure-event-hubs': |
| return 'Azure Event Hub'; |
| |
| case 'example': |
| return 'Example data'; |
| |
| case 'other': |
| return 'Other'; |
| |
| default: |
| return 'Unknown ingestion'; |
| } |
| } |
| |
| export function getIngestionImage(ingestionType: IngestionComboTypeWithExtra): string { |
| const parts = ingestionType.split(':'); |
| if (parts.length === 2) return parts[1].toLowerCase(); |
| return ingestionType; |
| } |
| |
| export function getIngestionDocLink(spec: Partial<IngestionSpec>): string { |
| const type = getSpecType(spec); |
| |
| switch (type) { |
| case 'kafka': |
| return `${getLink('DOCS')}/development/extensions-core/kafka-ingestion.html`; |
| |
| case 'kinesis': |
| return `${getLink('DOCS')}/development/extensions-core/kinesis-ingestion.html`; |
| |
| default: |
| return `${getLink('DOCS')}/ingestion/native-batch.html#input-sources`; |
| } |
| } |
| |
| export function getRequiredModule(ingestionType: IngestionComboTypeWithExtra): string | undefined { |
| switch (ingestionType) { |
| case 'index_parallel:s3': |
| return 'druid-s3-extensions'; |
| |
| case 'index_parallel:azure': |
| return 'druid-azure-extensions'; |
| |
| case 'index_parallel:google': |
| return 'druid-google-extensions'; |
| |
| case 'index_parallel:hdfs': |
| return 'druid-hdfs-storage'; |
| |
| case 'kafka': |
| return 'druid-kafka-indexing-service'; |
| |
| case 'kinesis': |
| return 'druid-kinesis-indexing-service'; |
| |
| default: |
| return; |
| } |
| } |
| |
| export function getIssueWithSpec(spec: Partial<IngestionSpec>): string | undefined { |
| if (!deepGet(spec, 'spec.dataSchema.dataSource')) { |
| return 'missing spec.dataSchema.dataSource'; |
| } |
| |
| if (!deepGet(spec, 'spec.dataSchema.timestampSpec')) { |
| return 'missing spec.dataSchema.timestampSpec'; |
| } |
| |
| if (!deepGet(spec, 'spec.dataSchema.dimensionsSpec')) { |
| return 'missing spec.dataSchema.dimensionsSpec'; |
| } |
| |
| return; |
| } |
| |
| // -------------- |
| |
| export interface DataSchema { |
| dataSource: string; |
| timestampSpec: TimestampSpec; |
| transformSpec?: TransformSpec; |
| granularitySpec?: GranularitySpec; |
| dimensionsSpec: DimensionsSpec; |
| metricsSpec?: MetricSpec[]; |
| } |
| |
| export type DimensionMode = 'specific' | 'auto-detect'; |
| |
| export function getDimensionMode(spec: Partial<IngestionSpec>): DimensionMode { |
| const dimensions = deepGet(spec, 'spec.dataSchema.dimensionsSpec.dimensions') || EMPTY_ARRAY; |
| return Array.isArray(dimensions) && dimensions.length === 0 ? 'auto-detect' : 'specific'; |
| } |
| |
| export function getRollup(spec: Partial<IngestionSpec>): boolean { |
| const specRollup = deepGet(spec, 'spec.dataSchema.granularitySpec.rollup'); |
| return typeof specRollup === 'boolean' ? specRollup : true; |
| } |
| |
| export function getSpecType(spec: Partial<IngestionSpec>): IngestionType { |
| return ( |
| deepGet(spec, 'type') || |
| deepGet(spec, 'spec.ioConfig.type') || |
| deepGet(spec, 'spec.tuningConfig.type') || |
| 'index_parallel' |
| ); |
| } |
| |
| export function isTask(spec: Partial<IngestionSpec>) { |
| const type = String(getSpecType(spec)); |
| return ( |
| type.startsWith('index_') || |
| oneOf(type, 'index', 'compact', 'kill', 'append', 'merge', 'same_interval_merge') |
| ); |
| } |
| |
| export function isDruidSource(spec: Partial<IngestionSpec>): boolean { |
| return deepGet(spec, 'spec.ioConfig.inputSource.type') === 'druid'; |
| } |
| |
| /** |
| * Make sure that the types are set in the root, ioConfig, and tuningConfig |
| * @param spec |
| */ |
| export function normalizeSpec(spec: Partial<IngestionSpec>): IngestionSpec { |
| if (!spec || typeof spec !== 'object') { |
| // This does not match the type of IngestionSpec but this dialog is robust enough to deal with anything but spec must be an object |
| spec = {}; |
| } |
| |
| // Make sure that if we actually get a task payload we extract the spec |
| if (typeof spec.spec !== 'object' && typeof (spec as any).ioConfig === 'object') { |
| spec = { spec: spec as any }; |
| } |
| |
| const specType = |
| deepGet(spec, 'type') || |
| deepGet(spec, 'spec.ioConfig.type') || |
| deepGet(spec, 'spec.tuningConfig.type'); |
| |
| if (!specType) return spec as IngestionSpec; |
| spec = deepSetIfUnset(spec, 'type', specType); |
| spec = deepSetIfUnset(spec, 'spec.ioConfig.type', specType); |
| spec = deepSetIfUnset(spec, 'spec.tuningConfig.type', specType); |
| return spec as IngestionSpec; |
| } |
| |
| /** |
| * Make sure that any extra junk in the spec other than 'type' and 'spec' is removed |
| * @param spec |
| */ |
| export function cleanSpec(spec: Partial<IngestionSpec>): Partial<IngestionSpec> { |
| return { |
| type: spec.type, |
| spec: spec.spec, |
| }; |
| } |
| |
| export interface GranularitySpec { |
| type?: string; |
| queryGranularity?: string; |
| segmentGranularity?: string; |
| rollup?: boolean; |
| intervals?: string | string[]; |
| } |
| |
| // -------------- |
| |
| export interface IoConfig { |
| type: string; |
| inputSource?: InputSource; |
| inputFormat?: InputFormat; |
| appendToExisting?: boolean; |
| topic?: string; |
| consumerProperties?: any; |
| replicas?: number; |
| taskCount?: number; |
| taskDuration?: string; |
| startDelay?: string; |
| period?: string; |
| useEarliestOffset?: boolean; |
| stream?: string; |
| endpoint?: string; |
| useEarliestSequenceNumber?: boolean; |
| } |
| |
| export function invalidIoConfig(ioConfig: IoConfig): boolean { |
| return ( |
| (ioConfig.type === 'kafka' && ioConfig.useEarliestOffset == null) || |
| (ioConfig.type === 'kinesis' && ioConfig.useEarliestSequenceNumber == null) |
| ); |
| } |
| |
| export function getIoConfigFormFields(ingestionComboType: IngestionComboType): Field<IoConfig>[] { |
| const inputSourceType: Field<IoConfig> = { |
| name: 'inputSource.type', |
| label: 'Source type', |
| type: 'string', |
| suggestions: ['local', 'http', 'inline', 's3', 'azure', 'google', 'hdfs'], |
| info: ( |
| <p> |
| Druid connects to raw data through{' '} |
| <ExternalLink href={`${getLink('DOCS')}/ingestion/native-batch.html#input-sources`}> |
| inputSources |
| </ExternalLink> |
| . You can change your selected inputSource here. |
| </p> |
| ), |
| }; |
| |
| switch (ingestionComboType) { |
| case 'index_parallel:http': |
| return [ |
| inputSourceType, |
| { |
| name: 'inputSource.uris', |
| label: 'URIs', |
| type: 'string-array', |
| placeholder: |
| 'https://example.com/path/to/file1.ext, https://example.com/path/to/file2.ext', |
| required: true, |
| info: ( |
| <p> |
| The full URI of your file. To ingest from multiple URIs, use commas to separate each |
| individual URI. |
| </p> |
| ), |
| }, |
| { |
| name: 'inputSource.httpAuthenticationUsername', |
| label: 'HTTP auth username', |
| type: 'string', |
| placeholder: '(optional)', |
| info: <p>Username to use for authentication with specified URIs</p>, |
| }, |
| { |
| name: 'inputSource.httpAuthenticationPassword', |
| label: 'HTTP auth password', |
| type: 'string', |
| placeholder: '(optional)', |
| info: <p>Password to use for authentication with specified URIs</p>, |
| }, |
| ]; |
| |
| case 'index_parallel:local': |
| return [ |
| inputSourceType, |
| { |
| name: 'inputSource.baseDir', |
| label: 'Base directory', |
| type: 'string', |
| placeholder: '/path/to/files/', |
| required: true, |
| info: ( |
| <> |
| <ExternalLink href={`${getLink('DOCS')}/ingestion/native-batch.html#input-sources`}> |
| inputSource.baseDir |
| </ExternalLink> |
| <p>Specifies the directory to search recursively for files to be ingested.</p> |
| </> |
| ), |
| }, |
| { |
| name: 'inputSource.filter', |
| label: 'File filter', |
| type: 'string', |
| required: true, |
| suggestions: [ |
| '*', |
| '*.json', |
| '*.json.gz', |
| '*.csv', |
| '*.tsv', |
| '*.parquet', |
| '*.orc', |
| '*.avro', |
| ], |
| info: ( |
| <> |
| <ExternalLink |
| href={`${getLink('DOCS')}/ingestion/native-batch.html#local-input-source`} |
| > |
| inputSource.filter |
| </ExternalLink> |
| <p> |
| A wildcard filter for files. See{' '} |
| <ExternalLink href="https://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter.html"> |
| here |
| </ExternalLink>{' '} |
| for format information. |
| </p> |
| </> |
| ), |
| }, |
| ]; |
| |
| case 'index_parallel:druid': |
| return [ |
| inputSourceType, |
| { |
| name: 'inputSource.dataSource', |
| label: 'Datasource', |
| type: 'string', |
| required: true, |
| info: <p>The datasource to fetch rows from.</p>, |
| }, |
| { |
| name: 'inputSource.interval', |
| label: 'Interval', |
| type: 'interval', |
| placeholder: `${CURRENT_YEAR}-01-01/${CURRENT_YEAR + 1}-01-01`, |
| required: true, |
| info: ( |
| <p> |
| A String representing ISO-8601 Interval. This defines the time range to fetch the data |
| over. |
| </p> |
| ), |
| }, |
| { |
| name: 'inputSource.filter', |
| label: 'Filter', |
| type: 'json', |
| placeholder: '(optional)', |
| hideInMore: true, |
| info: ( |
| <p> |
| The{' '} |
| <ExternalLink href={`${getLink('DOCS')}/querying/filters.html`}>filter</ExternalLink>{' '} |
| to apply to the data as part of querying. |
| </p> |
| ), |
| }, |
| ]; |
| |
| case 'index_parallel:inline': |
| return [ |
| inputSourceType, |
| // do not add 'data' here as it has special handling in the load-data view |
| ]; |
| |
| case 'index_parallel:s3': |
| return [ |
| inputSourceType, |
| { |
| name: 'inputSource.uris', |
| label: 'S3 URIs', |
| type: 'string-array', |
| placeholder: 's3://your-bucket/some-file1.ext, s3://your-bucket/some-file2.ext', |
| required: true, |
| defined: ioConfig => |
| !deepGet(ioConfig, 'inputSource.prefixes') && !deepGet(ioConfig, 'inputSource.objects'), |
| info: ( |
| <> |
| <p> |
| The full S3 URI of your file. To ingest from multiple URIs, use commas to separate |
| each individual URI. |
| </p> |
| <p>Either S3 URIs or prefixes or objects must be set.</p> |
| </> |
| ), |
| }, |
| { |
| name: 'inputSource.prefixes', |
| label: 'S3 prefixes', |
| type: 'string-array', |
| placeholder: 's3://your-bucket/some-path1, s3://your-bucket/some-path2', |
| required: true, |
| defined: ioConfig => |
| !deepGet(ioConfig, 'inputSource.uris') && !deepGet(ioConfig, 'inputSource.objects'), |
| info: ( |
| <> |
| <p>A list of paths (with bucket) where your files are stored.</p> |
| <p>Either S3 URIs or prefixes or objects must be set.</p> |
| </> |
| ), |
| }, |
| { |
| name: 'inputSource.objects', |
| label: 'S3 objects', |
| type: 'json', |
| placeholder: '{"bucket":"your-bucket", "path":"some-file.ext"}', |
| required: true, |
| defined: ioConfig => deepGet(ioConfig, 'inputSource.objects'), |
| info: ( |
| <> |
| <p> |
| JSON array of{' '} |
| <ExternalLink href={`${getLink('DOCS')}/development/extensions-core/s3.html`}> |
| S3 Objects |
| </ExternalLink> |
| . |
| </p> |
| <p>Either S3 URIs or prefixes or objects must be set.</p> |
| </> |
| ), |
| }, |
| |
| { |
| name: 'inputSource.properties.accessKeyId.type', |
| label: 'Access key ID type', |
| type: 'string', |
| suggestions: [undefined, 'environment', 'default'], |
| placeholder: '(none)', |
| info: ( |
| <> |
| <p>S3 access key type.</p> |
| <p>Setting this will override the default configuration provided in the config.</p> |
| <p> |
| The access key can be pulled from an environment variable or inlined in the |
| ingestion spec (default). |
| </p> |
| <p> |
| Note: Inlining the access key into the ingestion spec is dangerous as it might |
| appear in server log files and can be seen by anyone accessing this console. |
| </p> |
| </> |
| ), |
| adjustment: ioConfig => { |
| return deepSet( |
| ioConfig, |
| 'inputSource.properties.secretAccessKey.type', |
| deepGet(ioConfig, 'inputSource.properties.accessKeyId.type'), |
| ); |
| }, |
| }, |
| { |
| name: 'inputSource.properties.accessKeyId.variable', |
| label: 'Access key ID environment variable', |
| type: 'string', |
| placeholder: '(environment variable name)', |
| defined: ioConfig => |
| deepGet(ioConfig, 'inputSource.properties.accessKeyId.type') === 'environment', |
| info: <p>The environment variable containing the S3 access key for this S3 bucket.</p>, |
| }, |
| { |
| name: 'inputSource.properties.accessKeyId.password', |
| label: 'Access key ID value', |
| type: 'string', |
| placeholder: '(access key)', |
| defined: ioConfig => |
| deepGet(ioConfig, 'inputSource.properties.accessKeyId.type') === 'default', |
| info: ( |
| <> |
| <p>S3 access key for this S3 bucket.</p> |
| <p> |
| Note: Inlining the access key into the ingestion spec is dangerous as it might |
| appear in server log files and can be seen by anyone accessing this console. |
| </p> |
| </> |
| ), |
| }, |
| |
| { |
| name: 'inputSource.properties.secretAccessKey.type', |
| label: 'Secret key type', |
| type: 'string', |
| suggestions: [undefined, 'environment', 'default'], |
| placeholder: '(none)', |
| info: ( |
| <> |
| <p>S3 secret key type.</p> |
| <p>Setting this will override the default configuration provided in the config.</p> |
| <p> |
| The secret key can be pulled from an environment variable or inlined in the |
| ingestion spec (default). |
| </p> |
| <p> |
| Note: Inlining the secret key into the ingestion spec is dangerous as it might |
| appear in server log files and can be seen by anyone accessing this console. |
| </p> |
| </> |
| ), |
| }, |
| { |
| name: 'inputSource.properties.secretAccessKey.variable', |
| label: 'Secret key value', |
| type: 'string', |
| placeholder: '(environment variable name)', |
| defined: ioConfig => |
| deepGet(ioConfig, 'inputSource.properties.secretAccessKey.type') === 'environment', |
| info: <p>The environment variable containing the S3 secret key for this S3 bucket.</p>, |
| }, |
| { |
| name: 'inputSource.properties.secretAccessKey.password', |
| label: 'Secret key value', |
| type: 'string', |
| placeholder: '(secret key)', |
| defined: ioConfig => |
| deepGet(ioConfig, 'inputSource.properties.secretAccessKey.type') === 'default', |
| info: ( |
| <> |
| <p>S3 secret key for this S3 bucket.</p> |
| <p> |
| Note: Inlining the access key into the ingestion spec is dangerous as it might |
| appear in server log files and can be seen by anyone accessing this console. |
| </p> |
| </> |
| ), |
| }, |
| ]; |
| |
| case 'index_parallel:azure': |
| return [ |
| inputSourceType, |
| { |
| name: 'inputSource.uris', |
| label: 'Azure URIs', |
| type: 'string-array', |
| placeholder: |
| 'azure://your-container/some-file1.ext, azure://your-container/some-file2.ext', |
| required: true, |
| defined: ioConfig => |
| !deepGet(ioConfig, 'inputSource.prefixes') && !deepGet(ioConfig, 'inputSource.objects'), |
| info: ( |
| <> |
| <p> |
| The full Azure URI of your file. To ingest from multiple URIs, use commas to |
| separate each individual URI. |
| </p> |
| <p>Either Azure URIs or prefixes or objects must be set.</p> |
| </> |
| ), |
| }, |
| { |
| name: 'inputSource.prefixes', |
| label: 'Azure prefixes', |
| type: 'string-array', |
| placeholder: 'azure://your-container/some-path1, azure://your-container/some-path2', |
| required: true, |
| defined: ioConfig => |
| !deepGet(ioConfig, 'inputSource.uris') && !deepGet(ioConfig, 'inputSource.objects'), |
| info: ( |
| <> |
| <p>A list of paths (with bucket) where your files are stored.</p> |
| <p>Either Azure URIs or prefixes or objects must be set.</p> |
| </> |
| ), |
| }, |
| { |
| name: 'inputSource.objects', |
| label: 'Azure objects', |
| type: 'json', |
| placeholder: '{"bucket":"your-container", "path":"some-file.ext"}', |
| required: true, |
| defined: ioConfig => deepGet(ioConfig, 'inputSource.objects'), |
| info: ( |
| <> |
| <p> |
| JSON array of{' '} |
| <ExternalLink href={`${getLink('DOCS')}/development/extensions-core/azure.html`}> |
| S3 Objects |
| </ExternalLink> |
| . |
| </p> |
| <p>Either Azure URIs or prefixes or objects must be set.</p> |
| </> |
| ), |
| }, |
| ]; |
| |
| case 'index_parallel:google': |
| return [ |
| inputSourceType, |
| { |
| name: 'inputSource.uris', |
| label: 'Google Cloud Storage URIs', |
| type: 'string-array', |
| placeholder: 'gs://your-bucket/some-file1.ext, gs://your-bucket/some-file2.ext', |
| required: true, |
| defined: ioConfig => |
| !deepGet(ioConfig, 'inputSource.prefixes') && !deepGet(ioConfig, 'inputSource.objects'), |
| info: ( |
| <> |
| <p> |
| The full Google Cloud Storage URI of your file. To ingest from multiple URIs, use |
| commas to separate each individual URI. |
| </p> |
| <p>Either Google Cloud Storage URIs or prefixes or objects must be set.</p> |
| </> |
| ), |
| }, |
| { |
| name: 'inputSource.prefixes', |
| label: 'Google Cloud Storage prefixes', |
| type: 'string-array', |
| placeholder: 'gs://your-bucket/some-path1, gs://your-bucket/some-path2', |
| required: true, |
| defined: ioConfig => |
| !deepGet(ioConfig, 'inputSource.uris') && !deepGet(ioConfig, 'inputSource.objects'), |
| info: ( |
| <> |
| <p>A list of paths (with bucket) where your files are stored.</p> |
| <p>Either Google Cloud Storage URIs or prefixes or objects must be set.</p> |
| </> |
| ), |
| }, |
| { |
| name: 'inputSource.objects', |
| label: 'Google Cloud Storage objects', |
| type: 'json', |
| placeholder: '{"bucket":"your-bucket", "path":"some-file.ext"}', |
| required: true, |
| defined: ioConfig => deepGet(ioConfig, 'inputSource.objects'), |
| info: ( |
| <> |
| <p> |
| JSON array of{' '} |
| <ExternalLink href={`${getLink('DOCS')}/development/extensions-core/google.html`}> |
| Google Cloud Storage Objects |
| </ExternalLink> |
| . |
| </p> |
| <p>Either Google Cloud Storage URIs or prefixes or objects must be set.</p> |
| </> |
| ), |
| }, |
| ]; |
| |
| case 'index_parallel:hdfs': |
| return [ |
| inputSourceType, |
| { |
| name: 'inputSource.paths', |
| label: 'Paths', |
| type: 'string', |
| placeholder: '/path/to/file.ext', |
| required: true, |
| }, |
| ]; |
| |
| case 'kafka': |
| return [ |
| { |
| name: 'consumerProperties.{bootstrap.servers}', |
| label: 'Bootstrap servers', |
| type: 'string', |
| required: true, |
| info: ( |
| <> |
| <ExternalLink |
| href={`${getLink( |
| 'DOCS', |
| )}/development/extensions-core/kafka-ingestion#kafkasupervisorioconfig`} |
| > |
| consumerProperties |
| </ExternalLink> |
| <p> |
| A list of Kafka brokers in the form:{' '} |
| <Code>{`<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...`}</Code> |
| </p> |
| </> |
| ), |
| }, |
| { |
| name: 'topic', |
| type: 'string', |
| required: true, |
| defined: typeIs('kafka'), |
| }, |
| { |
| name: 'consumerProperties', |
| type: 'json', |
| defaultValue: {}, |
| info: ( |
| <> |
| <ExternalLink |
| href={`${getLink( |
| 'DOCS', |
| )}/development/extensions-core/kafka-ingestion#kafkasupervisorioconfig`} |
| > |
| consumerProperties |
| </ExternalLink> |
| <p>A map of properties to be passed to the Kafka consumer.</p> |
| </> |
| ), |
| }, |
| ]; |
| |
| case 'kinesis': |
| return [ |
| { |
| name: 'stream', |
| type: 'string', |
| placeholder: 'your-kinesis-stream', |
| required: true, |
| info: <>The Kinesis stream to read.</>, |
| }, |
| { |
| name: 'endpoint', |
| type: 'string', |
| defaultValue: 'kinesis.us-east-1.amazonaws.com', |
| suggestions: [ |
| 'kinesis.us-east-2.amazonaws.com', |
| 'kinesis.us-east-1.amazonaws.com', |
| 'kinesis.us-west-1.amazonaws.com', |
| 'kinesis.us-west-2.amazonaws.com', |
| 'kinesis.ap-east-1.amazonaws.com', |
| 'kinesis.ap-south-1.amazonaws.com', |
| 'kinesis.ap-northeast-3.amazonaws.com', |
| 'kinesis.ap-northeast-2.amazonaws.com', |
| 'kinesis.ap-southeast-1.amazonaws.com', |
| 'kinesis.ap-southeast-2.amazonaws.com', |
| 'kinesis.ap-northeast-1.amazonaws.com', |
| 'kinesis.ca-central-1.amazonaws.com', |
| 'kinesis.cn-north-1.amazonaws.com.com', |
| 'kinesis.cn-northwest-1.amazonaws.com.com', |
| 'kinesis.eu-central-1.amazonaws.com', |
| 'kinesis.eu-west-1.amazonaws.com', |
| 'kinesis.eu-west-2.amazonaws.com', |
| 'kinesis.eu-west-3.amazonaws.com', |
| 'kinesis.eu-north-1.amazonaws.com', |
| 'kinesis.sa-east-1.amazonaws.com', |
| 'kinesis.us-gov-east-1.amazonaws.com', |
| 'kinesis.us-gov-west-1.amazonaws.com', |
| ], |
| info: ( |
| <> |
| The Amazon Kinesis stream endpoint for a region. You can find a list of endpoints{' '} |
| <ExternalLink href="https://docs.aws.amazon.com/general/latest/gr/ak.html"> |
| here |
| </ExternalLink> |
| . |
| </> |
| ), |
| }, |
| { |
| name: 'awsAssumedRoleArn', |
| label: 'AWS assumed role ARN', |
| type: 'string', |
| placeholder: 'optional', |
| info: <>The AWS assumed role to use for additional permissions.</>, |
| }, |
| { |
| name: 'awsExternalId', |
| label: 'AWS external ID', |
| type: 'string', |
| placeholder: 'optional', |
| info: <>The AWS external id to use for additional permissions.</>, |
| }, |
| ]; |
| } |
| |
| throw new Error(`unknown input type ${ingestionComboType}`); |
| } |
| |
| export function issueWithIoConfig( |
| ioConfig: IoConfig | undefined, |
| ignoreInputFormat = false, |
| ): string | undefined { |
| if (!ioConfig) return 'does not exist'; |
| if (!ioConfig.type) return 'missing a type'; |
| switch (ioConfig.type) { |
| case 'index': |
| case 'index_parallel': |
| if (issueWithInputSource(ioConfig.inputSource)) { |
| return `inputSource: '${issueWithInputSource(ioConfig.inputSource)}'`; |
| } |
| break; |
| |
| case 'kafka': |
| if (!ioConfig.topic) return 'must have a topic'; |
| break; |
| |
| case 'kinesis': |
| if (!ioConfig.stream) return 'must have a stream'; |
| break; |
| } |
| |
| if (!ignoreInputFormat && issueWithInputFormat(ioConfig.inputFormat)) { |
| return `inputFormat: '${issueWithInputFormat(ioConfig.inputFormat)}'`; |
| } |
| |
| return; |
| } |
| |
| export function getIoConfigTuningFormFields( |
| ingestionComboType: IngestionComboType, |
| ): Field<IoConfig>[] { |
| switch (ingestionComboType) { |
| case 'index_parallel:http': |
| case 'index_parallel:s3': |
| case 'index_parallel:azure': |
| case 'index_parallel:google': |
| case 'index_parallel:hdfs': |
| return [ |
| { |
| name: 'inputSource.fetchTimeout', |
| label: 'Fetch timeout', |
| type: 'number', |
| defaultValue: 60000, |
| info: ( |
| <> |
| <p>Timeout for fetching the object.</p> |
| </> |
| ), |
| }, |
| { |
| name: 'inputSource.maxFetchRetry', |
| label: 'Max fetch retry', |
| type: 'number', |
| defaultValue: 3, |
| info: ( |
| <> |
| <p>Maximum retry for fetching the object.</p> |
| </> |
| ), |
| }, |
| ]; |
| |
| case 'index_parallel:local': |
| case 'index_parallel:inline': |
| return []; |
| |
| case 'index_parallel:druid': |
| return [ |
| { |
| name: 'inputSource.maxFetchCapacityBytes', |
| label: 'Max fetch capacity bytes', |
| type: 'number', |
| defaultValue: 157286400, |
| info: ( |
| <p> |
| When used with the native parallel index task, the maximum number of bytes of input |
| segments to process in a single task. If a single segment is larger than this number, |
| it will be processed by itself in a single task (input segments are never split across |
| tasks). Defaults to 150MB. |
| </p> |
| ), |
| }, |
| ]; |
| |
| case 'kafka': |
| case 'kinesis': |
| return [ |
| { |
| name: 'useEarliestOffset', |
| type: 'boolean', |
| defined: typeIs('kafka'), |
| required: true, |
| info: ( |
| <> |
| <p> |
| If a supervisor is managing a dataSource for the first time, it will obtain a set of |
| starting offsets from Kafka. This flag determines whether it retrieves the earliest |
| or latest offsets in Kafka. Under normal circumstances, subsequent tasks will start |
| from where the previous segments ended so this flag will only be used on first run. |
| </p> |
| </> |
| ), |
| }, |
| { |
| name: 'useEarliestSequenceNumber', |
| type: 'boolean', |
| defined: typeIs('kinesis'), |
| required: true, |
| info: ( |
| <> |
| If a supervisor is managing a dataSource for the first time, it will obtain a set of |
| starting sequence numbers from Kinesis. This flag determines whether it retrieves the |
| earliest or latest sequence numbers in Kinesis. Under normal circumstances, subsequent |
| tasks will start from where the previous segments ended so this flag will only be used |
| on first run. |
| </> |
| ), |
| }, |
| { |
| name: 'taskDuration', |
| type: 'duration', |
| defaultValue: 'PT1H', |
| info: ( |
| <> |
| <p> |
| The length of time before tasks stop reading and begin publishing their segment. |
| </p> |
| </> |
| ), |
| }, |
| { |
| name: 'taskCount', |
| type: 'number', |
| defaultValue: 1, |
| info: ( |
| <> |
| <p> |
| The maximum number of reading tasks in a replica set. This means that the maximum |
| number of reading tasks will be <Code>taskCount * replicas</Code> and the total |
| number of tasks (reading + publishing) will be higher than this. See 'Capacity |
| Planning' below for more details. |
| </p> |
| </> |
| ), |
| }, |
| { |
| name: 'replicas', |
| type: 'number', |
| defaultValue: 1, |
| info: ( |
| <> |
| <p> |
| The number of replica sets, where 1 means a single set of tasks (no replication). |
| Replica tasks will always be assigned to different workers to provide resiliency |
| against process failure. |
| </p> |
| </> |
| ), |
| }, |
| { |
| name: 'completionTimeout', |
| type: 'duration', |
| defaultValue: 'PT30M', |
| info: ( |
| <> |
| <p> |
| The length of time to wait before declaring a publishing task as failed and |
| terminating it. If this is set too low, your tasks may never publish. The publishing |
| clock for a task begins roughly after taskDuration elapses. |
| </p> |
| </> |
| ), |
| }, |
| { |
| name: 'recordsPerFetch', |
| type: 'number', |
| defaultValue: 2000, |
| defined: typeIs('kinesis'), |
| info: <>The number of records to request per GetRecords call to Kinesis.</>, |
| }, |
| { |
| name: 'pollTimeout', |
| type: 'number', |
| defaultValue: 100, |
| defined: typeIs('kafka'), |
| info: ( |
| <> |
| <p> |
| The length of time to wait for the kafka consumer to poll records, in milliseconds. |
| </p> |
| </> |
| ), |
| }, |
| { |
| name: 'fetchDelayMillis', |
| type: 'number', |
| defaultValue: 1000, |
| defined: typeIs('kinesis'), |
| info: <>Time in milliseconds to wait between subsequent GetRecords calls to Kinesis.</>, |
| }, |
| { |
| name: 'deaggregate', |
| type: 'boolean', |
| defaultValue: false, |
| defined: typeIs('kinesis'), |
| info: <>Whether to use the de-aggregate function of the KCL.</>, |
| }, |
| { |
| name: 'startDelay', |
| type: 'duration', |
| defaultValue: 'PT5S', |
| info: ( |
| <> |
| <p>The period to wait before the supervisor starts managing tasks.</p> |
| </> |
| ), |
| }, |
| { |
| name: 'period', |
| label: 'Management period', |
| type: 'duration', |
| defaultValue: 'PT30S', |
| info: ( |
| <> |
| <p>How often the supervisor will execute its management logic.</p> |
| <p> |
| Note that the supervisor will also run in response to certain events (such as tasks |
| succeeding, failing, and reaching their taskDuration) so this value specifies the |
| maximum time between iterations. |
| </p> |
| </> |
| ), |
| }, |
| { |
| name: 'lateMessageRejectionPeriod', |
| type: 'string', |
| placeholder: '(none)', |
| info: ( |
| <> |
| <p> |
| Configure tasks to reject messages with timestamps earlier than this period before |
| the task was created; for example if this is set to PT1H and the supervisor creates |
| a task at 2016-01-01T12:00Z, messages with timestamps earlier than 2016-01-01T11:00Z |
| will be dropped. |
| </p> |
| <p> |
| This may help prevent concurrency issues if your data stream has late messages and |
| you have multiple pipelines that need to operate on the same segments (e.g. a |
| realtime and a nightly batch ingestion pipeline). |
| </p> |
| </> |
| ), |
| }, |
| { |
| name: 'earlyMessageRejectionPeriod', |
| type: 'string', |
| placeholder: '(none)', |
| info: ( |
| <> |
| <p> |
| Configure tasks to reject messages with timestamps later than this period after the |
| task reached its taskDuration; for example if this is set to PT1H, the taskDuration |
| is set to PT1H and the supervisor creates a task at 2016-01-01T12:00Z, messages with |
| timestamps later than 2016-01-01T14:00Z will be dropped. |
| </p> |
| </> |
| ), |
| }, |
| { |
| name: 'skipOffsetGaps', |
| type: 'boolean', |
| defaultValue: false, |
| defined: typeIs('kafka'), |
| info: ( |
| <> |
| <p> |
| Whether or not to allow gaps of missing offsets in the Kafka stream. This is |
| required for compatibility with implementations such as MapR Streams which does not |
| guarantee consecutive offsets. If this is false, an exception will be thrown if |
| offsets are not consecutive. |
| </p> |
| </> |
| ), |
| }, |
| ]; |
| } |
| |
| throw new Error(`unknown ingestion combo type ${ingestionComboType}`); |
| } |
| |
| // --------------------------------------- |
| |
| function filterIsFilename(filter: string): boolean { |
| return !/[*?]/.test(filter); |
| } |
| |
| function filenameFromPath(path: string): string | undefined { |
| const m = /([^/.]+)[^/]*?\/?$/.exec(path); |
| if (!m) return; |
| return m[1]; |
| } |
| |
| function basenameFromFilename(filename: string): string | undefined { |
| return filename.split('.')[0]; |
| } |
| |
| export function fillDataSourceNameIfNeeded(spec: Partial<IngestionSpec>): Partial<IngestionSpec> { |
| const possibleName = guessDataSourceName(spec); |
| if (!possibleName) return spec; |
| return deepSetIfUnset(spec, 'spec.dataSchema.dataSource', possibleName); |
| } |
| |
| export function guessDataSourceName(spec: Partial<IngestionSpec>): string | undefined { |
| const ioConfig = deepGet(spec, 'spec.ioConfig'); |
| if (!ioConfig) return; |
| |
| switch (ioConfig.type) { |
| case 'index': |
| case 'index_parallel': { |
| const inputSource = ioConfig.inputSource; |
| if (!inputSource) return; |
| |
| switch (inputSource.type) { |
| case 'local': |
| if (inputSource.filter && filterIsFilename(inputSource.filter)) { |
| return basenameFromFilename(inputSource.filter); |
| } else if (inputSource.baseDir) { |
| return filenameFromPath(inputSource.baseDir); |
| } else { |
| return; |
| } |
| |
| case 's3': |
| case 'azure': |
| case 'google': { |
| const actualPath = (inputSource.objects || EMPTY_ARRAY)[0]; |
| const uriPath = |
| (inputSource.uris || EMPTY_ARRAY)[0] || (inputSource.prefixes || EMPTY_ARRAY)[0]; |
| return actualPath ? actualPath.path : uriPath ? filenameFromPath(uriPath) : undefined; |
| } |
| |
| case 'http': |
| return Array.isArray(inputSource.uris) |
| ? filenameFromPath(inputSource.uris[0]) |
| : undefined; |
| |
| case 'druid': |
| return inputSource.dataSource; |
| |
| case 'inline': |
| return 'inline_data'; |
| } |
| |
| return; |
| } |
| |
| case 'kafka': |
| return ioConfig.topic; |
| |
| case 'kinesis': |
| return ioConfig.stream; |
| |
| default: |
| return; |
| } |
| } |
| |
| // -------------- |
| |
| export interface TuningConfig { |
| type: string; |
| maxRowsInMemory?: number; |
| maxBytesInMemory?: number; |
| partitionsSpec?: PartitionsSpec; |
| maxPendingPersists?: number; |
| indexSpec?: IndexSpec; |
| forceExtendableShardSpecs?: boolean; |
| forceGuaranteedRollup?: boolean; |
| reportParseExceptions?: boolean; |
| pushTimeout?: number; |
| segmentWriteOutMediumFactory?: any; |
| intermediateHandoffPeriod?: string; |
| handoffConditionTimeout?: number; |
| resetOffsetAutomatically?: boolean; |
| workerThreads?: number; |
| chatThreads?: number; |
| chatRetries?: number; |
| httpTimeout?: string; |
| shutdownTimeout?: string; |
| offsetFetchPeriod?: string; |
| maxParseExceptions?: number; |
| maxSavedParseExceptions?: number; |
| recordBufferSize?: number; |
| recordBufferOfferTimeout?: number; |
| recordBufferFullWait?: number; |
| fetchSequenceNumberTimeout?: number; |
| fetchThreads?: number; |
| } |
| |
| export interface PartitionsSpec { |
| type: 'string'; |
| |
| // For type: dynamic |
| maxTotalRows?: number; |
| |
| // For type: hashed |
| numShards?: number; |
| partitionDimensions?: string[]; |
| |
| // For type: single_dim |
| targetRowsPerSegment?: number; |
| maxRowsPerSegment?: number; |
| partitionDimension?: string; |
| assumeGrouped?: boolean; |
| } |
| |
| export function adjustForceGuaranteedRollup(spec: Partial<IngestionSpec>) { |
| if (getSpecType(spec) !== 'index_parallel') return spec; |
| |
| const partitionsSpecType = deepGet(spec, 'spec.tuningConfig.partitionsSpec.type') || 'dynamic'; |
| if (partitionsSpecType === 'dynamic') { |
| spec = deepDelete(spec, 'spec.tuningConfig.forceGuaranteedRollup'); |
| } else if (oneOf(partitionsSpecType, 'hashed', 'single_dim')) { |
| spec = deepSet(spec, 'spec.tuningConfig.forceGuaranteedRollup', true); |
| } |
| |
| return spec; |
| } |
| |
| export function invalidPartitionConfig(spec: Partial<IngestionSpec>): boolean { |
| return ( |
| // Bad primary partitioning, or... |
| !deepGet(spec, 'spec.dataSchema.granularitySpec.segmentGranularity') || |
| // Bad secondary partitioning |
| !AutoForm.isValidModel(spec, getSecondaryPartitionRelatedFormFields(spec, undefined)) |
| ); |
| } |
| |
| export const PRIMARY_PARTITION_RELATED_FORM_FIELDS: Field<IngestionSpec>[] = [ |
| { |
| name: 'spec.dataSchema.granularitySpec.segmentGranularity', |
| label: 'Segment granularity', |
| type: 'string', |
| suggestions: ['hour', 'day', 'week', 'month', 'year'], |
| required: true, |
| info: ( |
| <> |
| The granularity to create time chunks at. Multiple segments can be created per time chunk. |
| For example, with 'DAY' segmentGranularity, the events of the same day fall into |
| the same time chunk which can be optionally further partitioned into multiple segments based |
| on other configurations and input size. |
| </> |
| ), |
| }, |
| { |
| name: 'spec.dataSchema.granularitySpec.intervals', |
| label: 'Time intervals', |
| type: 'string-array', |
| placeholder: '(auto determine)', |
| defined: s => getSpecType(s) === 'index_parallel', |
| info: ( |
| <> |
| <p> |
| A list of intervals describing what time chunks of segments should be created. This list |
| will be broken up and rounded-off based on the segmentGranularity. |
| </p> |
| <p> |
| If not provided, batch ingestion tasks will generally determine which time chunks to |
| output based on what timestamps are found in the input data. |
| </p> |
| <p> |
| If specified, batch ingestion tasks may be able to skip a determining-partitions phase, |
| which can result in faster ingestion. Batch ingestion tasks may also be able to request |
| all their locks up-front instead of one by one. Batch ingestion tasks will throw away any |
| records with timestamps outside of the specified intervals. |
| </p> |
| </> |
| ), |
| }, |
| ]; |
| |
| export function getSecondaryPartitionRelatedFormFields( |
| spec: Partial<IngestionSpec>, |
| dimensionSuggestions: string[] | undefined, |
| ): Field<IngestionSpec>[] { |
| const specType = getSpecType(spec); |
| switch (specType) { |
| case 'index_parallel': |
| return [ |
| { |
| name: 'spec.tuningConfig.partitionsSpec.type', |
| label: 'Partitioning type', |
| type: 'string', |
| required: true, |
| suggestions: ['dynamic', 'hashed', 'single_dim'], |
| info: ( |
| <p> |
| For perfect rollup, you should use either <Code>hashed</Code> (partitioning based on |
| the hash of dimensions in each row) or <Code>single_dim</Code> (based on ranges of a |
| single dimension). For best-effort rollup, you should use <Code>dynamic</Code>. |
| </p> |
| ), |
| adjustment: s => { |
| if ( |
| deepGet(s, 'spec.tuningConfig.partitionsSpec.type') !== 'single_dim' || |
| !Array.isArray(dimensionSuggestions) || |
| !dimensionSuggestions.length |
| ) { |
| return s; |
| } |
| |
| return deepSet( |
| s, |
| 'spec.tuningConfig.partitionsSpec.partitionDimension', |
| dimensionSuggestions[0], |
| ); |
| }, |
| }, |
| // partitionsSpec type: dynamic |
| { |
| name: 'spec.tuningConfig.partitionsSpec.maxRowsPerSegment', |
| type: 'number', |
| defaultValue: 5000000, |
| defined: s => deepGet(s, 'spec.tuningConfig.partitionsSpec.type') === 'dynamic', |
| info: <>Determines how many rows are in each segment.</>, |
| }, |
| { |
| name: 'spec.tuningConfig.partitionsSpec.maxTotalRows', |
| type: 'number', |
| defaultValue: 20000000, |
| defined: s => deepGet(s, 'spec.tuningConfig.partitionsSpec.type') === 'dynamic', |
| info: <>Total number of rows in segments waiting for being pushed.</>, |
| }, |
| // partitionsSpec type: hashed |
| { |
| name: 'spec.tuningConfig.partitionsSpec.targetRowsPerSegment', |
| type: 'number', |
| zeroMeansUndefined: true, |
| defaultValue: 5000000, |
| defined: s => |
| deepGet(s, 'spec.tuningConfig.partitionsSpec.type') === 'hashed' && |
| !deepGet(s, 'spec.tuningConfig.partitionsSpec.numShards'), |
| info: ( |
| <> |
| <p> |
| If the segments generated are a sub-optimal size for the requested partition |
| dimensions, consider setting this field. |
| </p> |
| <p> |
| A target row count for each partition. Each partition will have a row count close to |
| the target assuming evenly distributed keys. Defaults to 5 million if numShards is |
| null. |
| </p> |
| </> |
| ), |
| }, |
| { |
| name: 'spec.tuningConfig.partitionsSpec.numShards', |
| type: 'number', |
| zeroMeansUndefined: true, |
| hideInMore: true, |
| defined: s => |
| deepGet(s, 'spec.tuningConfig.partitionsSpec.type') === 'hashed' && |
| !deepGet(s, 'spec.tuningConfig.partitionsSpec.targetRowsPerSegment'), |
| info: ( |
| <> |
| <p> |
| If you know the optimal number of shards and want to speed up the time it takes for |
| compaction to run, set this field. |
| </p> |
| <p> |
| Directly specify the number of shards to create. If this is specified and |
| 'intervals' is specified in the granularitySpec, the index task can skip |
| the determine intervals/partitions pass through the data. |
| </p> |
| </> |
| ), |
| }, |
| { |
| name: 'spec.tuningConfig.partitionsSpec.partitionDimensions', |
| type: 'string-array', |
| placeholder: '(all dimensions)', |
| defined: s => deepGet(s, 'spec.tuningConfig.partitionsSpec.type') === 'hashed', |
| info: <p>The dimensions to partition on. Leave blank to select all dimensions.</p>, |
| }, |
| // partitionsSpec type: single_dim |
| { |
| name: 'spec.tuningConfig.partitionsSpec.partitionDimension', |
| type: 'string', |
| defined: s => deepGet(s, 'spec.tuningConfig.partitionsSpec.type') === 'single_dim', |
| required: true, |
| suggestions: dimensionSuggestions, |
| info: ( |
| <> |
| <p>The dimension to partition on.</p> |
| <p> |
| This should be the first dimension in your schema which would make it first in the |
| sort order. As{' '} |
| <ExternalLink href={`${getLink('DOCS')}/ingestion/index.html#why-partition`}> |
| Partitioning and sorting are best friends! |
| </ExternalLink> |
| </p> |
| </> |
| ), |
| }, |
| { |
| name: 'spec.tuningConfig.partitionsSpec.targetRowsPerSegment', |
| type: 'number', |
| zeroMeansUndefined: true, |
| defined: s => |
| deepGet(s, 'spec.tuningConfig.partitionsSpec.type') === 'single_dim' && |
| !deepGet(s, 'spec.tuningConfig.partitionsSpec.maxRowsPerSegment'), |
| required: s => |
| !deepGet(s, 'spec.tuningConfig.partitionsSpec.targetRowsPerSegment') && |
| !deepGet(s, 'spec.tuningConfig.partitionsSpec.maxRowsPerSegment'), |
| info: ( |
| <p> |
| Target number of rows to include in a partition, should be a number that targets |
| segments of 500MB~1GB. |
| </p> |
| ), |
| }, |
| { |
| name: 'spec.tuningConfig.partitionsSpec.maxRowsPerSegment', |
| type: 'number', |
| zeroMeansUndefined: true, |
| defined: s => |
| deepGet(s, 'spec.tuningConfig.partitionsSpec.type') === 'single_dim' && |
| !deepGet(s, 'spec.tuningConfig.partitionsSpec.targetRowsPerSegment'), |
| required: s => |
| !deepGet(s, 'spec.tuningConfig.partitionsSpec.targetRowsPerSegment') && |
| !deepGet(s, 'spec.tuningConfig.partitionsSpec.maxRowsPerSegment'), |
| info: <p>Maximum number of rows to include in a partition.</p>, |
| }, |
| { |
| name: 'spec.tuningConfig.partitionsSpec.assumeGrouped', |
| type: 'boolean', |
| defaultValue: false, |
| hideInMore: true, |
| defined: s => deepGet(s, 'spec.tuningConfig.partitionsSpec.type') === 'single_dim', |
| info: ( |
| <p> |
| Assume that input data has already been grouped on time and dimensions. Ingestion will |
| run faster, but may choose sub-optimal partitions if this assumption is violated. |
| </p> |
| ), |
| }, |
| ]; |
| |
| case 'kafka': |
| case 'kinesis': |
| return [ |
| { |
| name: 'spec.tuningConfig.maxRowsPerSegment', |
| type: 'number', |
| defaultValue: 5000000, |
| info: <>Determines how many rows are in each segment.</>, |
| }, |
| { |
| name: 'spec.tuningConfig.maxTotalRows', |
| type: 'number', |
| defaultValue: 20000000, |
| info: <>Total number of rows in segments waiting for being pushed.</>, |
| }, |
| ]; |
| } |
| |
| throw new Error(`unknown spec type ${specType}`); |
| } |
| |
| const TUNING_FORM_FIELDS: Field<IngestionSpec>[] = [ |
| { |
| name: 'spec.tuningConfig.maxNumConcurrentSubTasks', |
| type: 'number', |
| defaultValue: 1, |
| min: 1, |
| defined: typeIs('index_parallel'), |
| info: ( |
| <> |
| Maximum number of tasks which can be run at the same time. The supervisor task would spawn |
| worker tasks up to maxNumConcurrentSubTasks regardless of the available task slots. If this |
| value is set to 1, the supervisor task processes data ingestion on its own instead of |
| spawning worker tasks. If this value is set to too large, too many worker tasks can be |
| created which might block other ingestion. |
| </> |
| ), |
| }, |
| { |
| name: 'spec.tuningConfig.maxRetry', |
| type: 'number', |
| defaultValue: 3, |
| defined: typeIs('index_parallel'), |
| hideInMore: true, |
| info: <>Maximum number of retries on task failures.</>, |
| }, |
| { |
| name: 'spec.tuningConfig.taskStatusCheckPeriodMs', |
| type: 'number', |
| defaultValue: 1000, |
| defined: typeIs('index_parallel'), |
| hideInMore: true, |
| info: <>Polling period in milliseconds to check running task statuses.</>, |
| }, |
| { |
| name: 'spec.tuningConfig.totalNumMergeTasks', |
| type: 'number', |
| defaultValue: 10, |
| min: 1, |
| defined: s => |
| Boolean( |
| s.type === 'index_parallel' && |
| oneOf(deepGet(s, 'spec.tuningConfig.partitionsSpec.type'), 'hashed', 'single_dim'), |
| ), |
| info: <>Number of tasks to merge partial segments after shuffle.</>, |
| }, |
| { |
| name: 'spec.tuningConfig.maxNumSegmentsToMerge', |
| type: 'number', |
| defaultValue: 100, |
| defined: s => |
| Boolean( |
| s.type === 'index_parallel' && |
| oneOf(deepGet(s, 'spec.tuningConfig.partitionsSpec.type'), 'hashed', 'single_dim'), |
| ), |
| info: ( |
| <> |
| Max limit for the number of segments a single task can merge at the same time after shuffle. |
| </> |
| ), |
| }, |
| { |
| name: 'spec.tuningConfig.maxRowsInMemory', |
| type: 'number', |
| defaultValue: 1000000, |
| info: <>Used in determining when intermediate persists to disk should occur.</>, |
| }, |
| { |
| name: 'spec.tuningConfig.maxBytesInMemory', |
| type: 'number', |
| placeholder: 'Default: 1/6 of max JVM memory', |
| info: <>Used in determining when intermediate persists to disk should occur.</>, |
| }, |
| { |
| name: 'spec.tuningConfig.resetOffsetAutomatically', |
| type: 'boolean', |
| defaultValue: false, |
| defined: typeIs('kafka', 'kinesis'), |
| info: ( |
| <> |
| Whether to reset the consumer offset if the next offset that it is trying to fetch is less |
| than the earliest available offset for that particular partition. |
| </> |
| ), |
| }, |
| { |
| name: 'spec.tuningConfig.skipSequenceNumberAvailabilityCheck', |
| type: 'boolean', |
| defaultValue: false, |
| defined: typeIs('kinesis'), |
| info: ( |
| <> |
| Whether to enable checking if the current sequence number is still available in a particular |
| Kinesis shard. If set to false, the indexing task will attempt to reset the current sequence |
| number (or not), depending on the value of <Code>resetOffsetAutomatically</Code>. |
| </> |
| ), |
| }, |
| { |
| name: 'spec.tuningConfig.intermediatePersistPeriod', |
| type: 'duration', |
| defaultValue: 'PT10M', |
| defined: typeIs('kafka', 'kinesis'), |
| info: <>The period that determines the rate at which intermediate persists occur.</>, |
| }, |
| { |
| name: 'spec.tuningConfig.intermediateHandoffPeriod', |
| type: 'duration', |
| defaultValue: 'P2147483647D', |
| defined: typeIs('kafka', 'kinesis'), |
| info: ( |
| <> |
| How often the tasks should hand off segments. Handoff will happen either if |
| maxRowsPerSegment or maxTotalRows is hit or every intermediateHandoffPeriod, whichever |
| happens earlier. |
| </> |
| ), |
| }, |
| { |
| name: 'spec.tuningConfig.maxPendingPersists', |
| type: 'number', |
| hideInMore: true, |
| info: ( |
| <> |
| Maximum number of persists that can be pending but not started. If this limit would be |
| exceeded by a new intermediate persist, ingestion will block until the currently-running |
| persist finishes. |
| </> |
| ), |
| }, |
| { |
| name: 'spec.tuningConfig.pushTimeout', |
| type: 'number', |
| defaultValue: 0, |
| hideInMore: true, |
| info: ( |
| <> |
| Milliseconds to wait for pushing segments. It must be >= 0, where 0 means to wait |
| forever. |
| </> |
| ), |
| }, |
| { |
| name: 'spec.tuningConfig.handoffConditionTimeout', |
| type: 'number', |
| defaultValue: 0, |
| defined: typeIs('kafka', 'kinesis'), |
| hideInMore: true, |
| info: <>Milliseconds to wait for segment handoff. 0 means to wait forever.</>, |
| }, |
| { |
| name: 'spec.tuningConfig.indexSpec.bitmap.type', |
| label: 'Index bitmap type', |
| type: 'string', |
| defaultValue: 'roaring', |
| suggestions: ['concise', 'roaring'], |
| hideInMore: true, |
| info: <>Compression format for bitmap indexes.</>, |
| }, |
| { |
| name: 'spec.tuningConfig.indexSpec.dimensionCompression', |
| label: 'Index dimension compression', |
| type: 'string', |
| defaultValue: 'lz4', |
| suggestions: ['lz4', 'lzf', 'uncompressed'], |
| hideInMore: true, |
| info: <>Compression format for dimension columns.</>, |
| }, |
| { |
| name: 'spec.tuningConfig.indexSpec.metricCompression', |
| label: 'Index metric compression', |
| type: 'string', |
| defaultValue: 'lz4', |
| suggestions: ['lz4', 'lzf', 'uncompressed'], |
| hideInMore: true, |
| info: <>Compression format for primitive type metric columns.</>, |
| }, |
| { |
| name: 'spec.tuningConfig.indexSpec.longEncoding', |
| label: 'Index long encoding', |
| type: 'string', |
| defaultValue: 'longs', |
| suggestions: ['longs', 'auto'], |
| hideInMore: true, |
| info: ( |
| <> |
| Encoding format for long-typed columns. Applies regardless of whether they are dimensions or |
| metrics. <Code>auto</Code> encodes the values using offset or lookup table depending on |
| column cardinality, and store them with variable size. <Code>longs</Code> stores the value |
| as-is with 8 bytes each. |
| </> |
| ), |
| }, |
| { |
| name: 'spec.tuningConfig.splitHintSpec.maxSplitSize', |
| type: 'number', |
| defaultValue: 1073741824, |
| min: 1000000, |
| defined: s => |
| s.type === 'index_parallel' && deepGet(s, 'spec.ioConfig.inputFormat.type') !== 'http', |
| hideInMore: true, |
| adjustment: s => deepSet(s, 'splitHintSpec.type', 'maxSize'), |
| info: ( |
| <> |
| Maximum number of bytes of input files to process in a single subtask. If a single file is |
| larger than this number, it will be processed by itself in a single subtask (Files are never |
| split across tasks yet). |
| </> |
| ), |
| }, |
| { |
| name: 'spec.tuningConfig.splitHintSpec.maxNumFiles', |
| type: 'number', |
| defaultValue: 1000, |
| min: 1, |
| defined: typeIs('index_parallel'), |
| hideInMore: true, |
| adjustment: s => deepSet(s, 'splitHintSpec.type', 'maxSize'), |
| info: ( |
| <> |
| Maximum number of input files to process in a single subtask. This limit is to avoid task |
| failures when the ingestion spec is too long. There are two known limits on the max size of |
| serialized ingestion spec, i.e., the max ZNode size in ZooKeeper ( |
| <Code>jute.maxbuffer</Code>) and the max packet size in MySQL ( |
| <Code>max_allowed_packet</Code>). These can make ingestion tasks fail if the serialized |
| ingestion spec size hits one of them. |
| </> |
| ), |
| }, |
| { |
| name: 'spec.tuningConfig.chatHandlerTimeout', |
| type: 'duration', |
| defaultValue: 'PT10S', |
| defined: typeIs('index_parallel'), |
| hideInMore: true, |
| info: <>Timeout for reporting the pushed segments in worker tasks.</>, |
| }, |
| { |
| name: 'spec.tuningConfig.chatHandlerNumRetries', |
| type: 'number', |
| defaultValue: 5, |
| defined: typeIs('index_parallel'), |
| hideInMore: true, |
| info: <>Retries for reporting the pushed segments in worker tasks.</>, |
| }, |
| { |
| name: 'spec.tuningConfig.workerThreads', |
| type: 'number', |
| placeholder: 'min(10, taskCount)', |
| defined: typeIs('kafka', 'kinesis'), |
| info: ( |
| <>The number of threads that will be used by the supervisor for asynchronous operations.</> |
| ), |
| }, |
| { |
| name: 'spec.tuningConfig.chatThreads', |
| type: 'number', |
| placeholder: 'min(10, taskCount * replicas)', |
| defined: typeIs('kafka', 'kinesis'), |
| hideInMore: true, |
| info: <>The number of threads that will be used for communicating with indexing tasks.</>, |
| }, |
| { |
| name: 'spec.tuningConfig.chatRetries', |
| type: 'number', |
| defaultValue: 8, |
| defined: typeIs('kafka', 'kinesis'), |
| hideInMore: true, |
| info: ( |
| <> |
| The number of times HTTP requests to indexing tasks will be retried before considering tasks |
| unresponsive. |
| </> |
| ), |
| }, |
| { |
| name: 'spec.tuningConfig.httpTimeout', |
| type: 'duration', |
| defaultValue: 'PT10S', |
| defined: typeIs('kafka', 'kinesis'), |
| info: <>How long to wait for a HTTP response from an indexing task.</>, |
| }, |
| { |
| name: 'spec.tuningConfig.shutdownTimeout', |
| type: 'duration', |
| defaultValue: 'PT80S', |
| defined: typeIs('kafka', 'kinesis'), |
| hideInMore: true, |
| info: ( |
| <> |
| How long to wait for the supervisor to attempt a graceful shutdown of tasks before exiting. |
| </> |
| ), |
| }, |
| { |
| name: 'spec.tuningConfig.offsetFetchPeriod', |
| type: 'duration', |
| defaultValue: 'PT30S', |
| defined: typeIs('kafka'), |
| info: ( |
| <> |
| How often the supervisor queries Kafka and the indexing tasks to fetch current offsets and |
| calculate lag. |
| </> |
| ), |
| }, |
| { |
| name: 'spec.tuningConfig.recordBufferSize', |
| type: 'number', |
| defaultValue: 10000, |
| defined: typeIs('kinesis'), |
| info: ( |
| <> |
| Size of the buffer (number of events) used between the Kinesis fetch threads and the main |
| ingestion thread. |
| </> |
| ), |
| }, |
| { |
| name: 'spec.tuningConfig.recordBufferOfferTimeout', |
| type: 'number', |
| defaultValue: 5000, |
| defined: typeIs('kinesis'), |
| hideInMore: true, |
| info: ( |
| <> |
| Length of time in milliseconds to wait for space to become available in the buffer before |
| timing out. |
| </> |
| ), |
| }, |
| { |
| name: 'spec.tuningConfig.recordBufferFullWait', |
| hideInMore: true, |
| type: 'number', |
| defaultValue: 5000, |
| defined: typeIs('kinesis'), |
| info: ( |
| <> |
| Length of time in milliseconds to wait for the buffer to drain before attempting to fetch |
| records from Kinesis again. |
| </> |
| ), |
| }, |
| { |
| name: 'spec.tuningConfig.fetchSequenceNumberTimeout', |
| type: 'number', |
| defaultValue: 60000, |
| defined: typeIs('kinesis'), |
| hideInMore: true, |
| info: ( |
| <> |
| Length of time in milliseconds to wait for Kinesis to return the earliest or latest sequence |
| number for a shard. Kinesis will not return the latest sequence number if no data is |
| actively being written to that shard. In this case, this fetch call will repeatedly timeout |
| and retry until fresh data is written to the stream. |
| </> |
| ), |
| }, |
| { |
| name: 'spec.tuningConfig.fetchThreads', |
| type: 'number', |
| placeholder: 'max(1, {numProcessors} - 1)', |
| defined: typeIs('kinesis'), |
| hideInMore: true, |
| info: ( |
| <> |
| Size of the pool of threads fetching data from Kinesis. There is no benefit in having more |
| threads than Kinesis shards. |
| </> |
| ), |
| }, |
| { |
| name: 'spec.tuningConfig.maxRecordsPerPoll', |
| type: 'number', |
| defaultValue: 100, |
| defined: typeIs('kinesis'), |
| hideInMore: true, |
| info: ( |
| <> |
| The maximum number of records/events to be fetched from buffer per poll. The actual maximum |
| will be <Code>max(maxRecordsPerPoll, max(bufferSize, 1))</Code>. |
| </> |
| ), |
| }, |
| { |
| name: 'spec.tuningConfig.repartitionTransitionDuration', |
| type: 'duration', |
| defaultValue: 'PT2M', |
| defined: typeIs('kinesis'), |
| hideInMore: true, |
| info: ( |
| <> |
| <p> |
| When shards are split or merged, the supervisor will recompute shard, task group mappings, |
| and signal any running tasks created under the old mappings to stop early at{' '} |
| <Code>(current time + repartitionTransitionDuration)</Code>. Stopping the tasks early |
| allows Druid to begin reading from the new shards more quickly. |
| </p> |
| <p> |
| The repartition transition wait time controlled by this property gives the stream |
| additional time to write records to the new shards after the split/merge, which helps |
| avoid the issues with empty shard handling described at |
| <ExternalLink href="https://github.com/apache/druid/issues/7600">#7600</ExternalLink>. |
| </p> |
| </> |
| ), |
| }, |
| ]; |
| |
| export function getTuningFormFields() { |
| return TUNING_FORM_FIELDS; |
| } |
| |
| export interface IndexSpec { |
| bitmap?: Bitmap; |
| dimensionCompression?: string; |
| metricCompression?: string; |
| longEncoding?: string; |
| } |
| |
| export interface Bitmap { |
| type: string; |
| compressRunOnSerialization?: boolean; |
| } |
| |
| // -------------- |
| |
| export function updateIngestionType( |
| spec: Partial<IngestionSpec>, |
| comboType: IngestionComboType, |
| ): Partial<IngestionSpec> { |
| const [ingestionType, inputSourceType] = comboType.split(':'); |
| const ioAndTuningConfigType = ingestionTypeToIoAndTuningConfigType( |
| ingestionType as IngestionType, |
| ); |
| |
| let newSpec = spec; |
| newSpec = deepSet(newSpec, 'type', ingestionType); |
| newSpec = deepSet(newSpec, 'spec.ioConfig.type', ioAndTuningConfigType); |
| newSpec = deepSet(newSpec, 'spec.tuningConfig.type', ioAndTuningConfigType); |
| |
| if (inputSourceType) { |
| newSpec = deepSet(newSpec, 'spec.ioConfig.inputSource', { type: inputSourceType }); |
| } |
| |
| return newSpec; |
| } |
| |
| export function issueWithSampleData(sampleData: string[]): JSX.Element | undefined { |
| if (sampleData.length) { |
| const firstData = sampleData[0]; |
| |
| if (firstData === '{') { |
| return ( |
| <> |
| This data looks like regular JSON object. For Druid to parse a text file it must have one |
| row per event. Maybe look at{' '} |
| <ExternalLink href="http://ndjson.org/">newline delimited JSON</ExternalLink> instead. |
| </> |
| ); |
| } |
| |
| if (oneOf(firstData, '[', '[]')) { |
| return ( |
| <> |
| This data looks like a multi-line JSON array. For Druid to parse a text file it must have |
| one row per event. Maybe look at{' '} |
| <ExternalLink href="http://ndjson.org/">newline delimited JSON</ExternalLink> instead. |
| </> |
| ); |
| } |
| } |
| |
| return; |
| } |
| |
| export function fillInputFormatIfNeeded( |
| spec: Partial<IngestionSpec>, |
| sampleData: string[], |
| ): Partial<IngestionSpec> { |
| if (deepGet(spec, 'spec.ioConfig.inputFormat.type')) return spec; |
| return deepSet(spec, 'spec.ioConfig.inputFormat', guessInputFormat(sampleData)); |
| } |
| |
| export function guessInputFormat(sampleData: string[]): InputFormat { |
| let sampleDatum = sampleData[0]; |
| if (sampleDatum) { |
| sampleDatum = String(sampleDatum); // Really ensure it is a string |
| |
| // First check for magic byte sequences as they rarely yield false positives |
| |
| // Parquet 4 byte magic header: https://github.com/apache/parquet-format#file-format |
| if (sampleDatum.startsWith('PAR1')) { |
| return inputFormatFromType('parquet'); |
| } |
| // ORC 3 byte magic header: https://orc.apache.org/specification/ORCv1/ |
| if (sampleDatum.startsWith('ORC')) { |
| return inputFormatFromType('orc'); |
| } |
| // Avro OCF 4 byte magic header: https://avro.apache.org/docs/current/spec.html#Object+Container+Files |
| if (sampleDatum.startsWith('Obj') && sampleDatum.charCodeAt(3) === 1) { |
| return inputFormatFromType('avro_ocf'); |
| } |
| |
| // After checking for magic byte sequences perform heuristics to deduce string formats |
| |
| // If the string starts and ends with curly braces assume JSON |
| if (sampleDatum.startsWith('{') && sampleDatum.endsWith('}')) { |
| return inputFormatFromType('json'); |
| } |
| // Contains more than 3 tabs assume TSV |
| if (sampleDatum.split('\t').length > 3) { |
| return inputFormatFromType('tsv', !/\t\d+\t/.test(sampleDatum)); |
| } |
| // Contains more than 3 commas assume CSV |
| if (sampleDatum.split(',').length > 3) { |
| return inputFormatFromType('csv', !/,\d+,/.test(sampleDatum)); |
| } |
| } |
| |
| return inputFormatFromType('regex'); |
| } |
| |
| function inputFormatFromType(type: string, findColumnsFromHeader?: boolean): InputFormat { |
| let inputFormat: InputFormat = { type }; |
| |
| if (type === 'regex') { |
| inputFormat = deepSet(inputFormat, 'pattern', '(.*)'); |
| inputFormat = deepSet(inputFormat, 'columns', ['column1']); |
| } |
| |
| if (typeof findColumnsFromHeader === 'boolean') { |
| inputFormat = deepSet(inputFormat, 'findColumnsFromHeader', findColumnsFromHeader); |
| } |
| |
| return inputFormat; |
| } |
| |
| // ------------------------ |
| |
| export function guessTypeFromSample(sample: any[]): string { |
| const definedValues = sample.filter(v => v != null); |
| if ( |
| definedValues.length && |
| definedValues.every(v => !isNaN(v) && oneOf(typeof v, 'number', 'string')) |
| ) { |
| if (definedValues.every(v => v % 1 === 0)) { |
| return 'long'; |
| } else { |
| return 'double'; |
| } |
| } else { |
| return 'string'; |
| } |
| } |
| |
| export function getColumnTypeFromHeaderAndRows( |
| headerAndRows: HeaderAndRows, |
| column: string, |
| ): string { |
| return guessTypeFromSample( |
| filterMap(headerAndRows.rows, (r: any) => (r.parsed ? r.parsed[column] : undefined)), |
| ); |
| } |
| |
| function getTypeHintsFromSpec(spec: Partial<IngestionSpec>): Record<string, string> { |
| const typeHints: Record<string, string> = {}; |
| const currentDimensions = deepGet(spec, 'spec.dataSchema.dimensionsSpec.dimensions') || []; |
| for (const currentDimension of currentDimensions) { |
| typeHints[getDimensionSpecName(currentDimension)] = getDimensionSpecType(currentDimension); |
| } |
| |
| const currentMetrics = deepGet(spec, 'spec.dataSchema.metricsSpec') || []; |
| for (const currentMetric of currentMetrics) { |
| const singleFieldName = getMetricSpecSingleFieldName(currentMetric); |
| const metricOutputType = getMetricSpecOutputType(currentMetric); |
| if (singleFieldName && metricOutputType) { |
| typeHints[singleFieldName] = metricOutputType; |
| } |
| } |
| |
| return typeHints; |
| } |
| |
| export function updateSchemaWithSample( |
| spec: Partial<IngestionSpec>, |
| headerAndRows: HeaderAndRows, |
| dimensionMode: DimensionMode, |
| rollup: boolean, |
| forcePartitionInitialization = false, |
| ): Partial<IngestionSpec> { |
| const typeHints = getTypeHintsFromSpec(spec); |
| |
| let newSpec = spec; |
| |
| if (dimensionMode === 'auto-detect') { |
| newSpec = deepDelete(newSpec, 'spec.dataSchema.dimensionsSpec.dimensions'); |
| newSpec = deepSet(newSpec, 'spec.dataSchema.dimensionsSpec.dimensionExclusions', []); |
| } else { |
| newSpec = deepDelete(newSpec, 'spec.dataSchema.dimensionsSpec.dimensionExclusions'); |
| |
| const dimensions = getDimensionSpecs(headerAndRows, typeHints, rollup); |
| if (dimensions) { |
| newSpec = deepSet(newSpec, 'spec.dataSchema.dimensionsSpec.dimensions', dimensions); |
| } |
| } |
| |
| if (rollup) { |
| newSpec = deepSet(newSpec, 'spec.dataSchema.granularitySpec.queryGranularity', 'hour'); |
| |
| const metrics = getMetricSpecs(headerAndRows, typeHints); |
| if (metrics) { |
| newSpec = deepSet(newSpec, 'spec.dataSchema.metricsSpec', metrics); |
| } |
| } else { |
| newSpec = deepSet(newSpec, 'spec.dataSchema.granularitySpec.queryGranularity', 'none'); |
| newSpec = deepDelete(newSpec, 'spec.dataSchema.metricsSpec'); |
| } |
| |
| if ( |
| getSpecType(newSpec) === 'index_parallel' && |
| (!deepGet(newSpec, 'spec.tuningConfig.partitionsSpec') || forcePartitionInitialization) |
| ) { |
| newSpec = adjustForceGuaranteedRollup( |
| deepSet( |
| newSpec, |
| 'spec.tuningConfig.partitionsSpec', |
| rollup ? { type: 'hashed' } : { type: 'dynamic' }, |
| ), |
| ); |
| } |
| |
| newSpec = deepSet(newSpec, 'spec.dataSchema.granularitySpec.rollup', rollup); |
| return newSpec; |
| } |
| |
| // ------------------------ |
| |
| export function upgradeSpec(spec: any): Partial<IngestionSpec> { |
| if (deepGet(spec, 'spec.ioConfig.firehose')) { |
| switch (deepGet(spec, 'spec.ioConfig.firehose.type')) { |
| case 'static-s3': |
| deepSet(spec, 'spec.ioConfig.firehose.type', 's3'); |
| break; |
| |
| case 'static-google-blobstore': |
| deepSet(spec, 'spec.ioConfig.firehose.type', 'google'); |
| deepMove(spec, 'spec.ioConfig.firehose.blobs', 'spec.ioConfig.firehose.objects'); |
| break; |
| } |
| |
| spec = deepMove(spec, 'spec.ioConfig.firehose', 'spec.ioConfig.inputSource'); |
| spec = deepMove( |
| spec, |
| 'spec.dataSchema.parser.parseSpec.timestampSpec', |
| 'spec.dataSchema.timestampSpec', |
| ); |
| spec = deepMove( |
| spec, |
| 'spec.dataSchema.parser.parseSpec.dimensionsSpec', |
| 'spec.dataSchema.dimensionsSpec', |
| ); |
| spec = deepMove(spec, 'spec.dataSchema.parser.parseSpec', 'spec.ioConfig.inputFormat'); |
| spec = deepDelete(spec, 'spec.dataSchema.parser'); |
| spec = deepMove(spec, 'spec.ioConfig.inputFormat.format', 'spec.ioConfig.inputFormat.type'); |
| } |
| return spec; |
| } |
| |
| export function downgradeSpec(spec: Partial<IngestionSpec>): Partial<IngestionSpec> { |
| if (deepGet(spec, 'spec.ioConfig.inputSource')) { |
| spec = deepMove(spec, 'spec.ioConfig.inputFormat.type', 'spec.ioConfig.inputFormat.format'); |
| spec = deepSet(spec, 'spec.dataSchema.parser', { type: 'string' }); |
| spec = deepMove(spec, 'spec.ioConfig.inputFormat', 'spec.dataSchema.parser.parseSpec'); |
| spec = deepMove( |
| spec, |
| 'spec.dataSchema.dimensionsSpec', |
| 'spec.dataSchema.parser.parseSpec.dimensionsSpec', |
| ); |
| spec = deepMove( |
| spec, |
| 'spec.dataSchema.timestampSpec', |
| 'spec.dataSchema.parser.parseSpec.timestampSpec', |
| ); |
| spec = deepMove(spec, 'spec.ioConfig.inputSource', 'spec.ioConfig.firehose'); |
| |
| switch (deepGet(spec, 'spec.ioConfig.firehose.type')) { |
| case 's3': |
| deepSet(spec, 'spec.ioConfig.firehose.type', 'static-s3'); |
| break; |
| |
| case 'google': |
| deepSet(spec, 'spec.ioConfig.firehose.type', 'static-google-blobstore'); |
| deepMove(spec, 'spec.ioConfig.firehose.objects', 'spec.ioConfig.firehose.blobs'); |
| break; |
| } |
| } |
| return spec; |
| } |
| |
| export function adjustId(id: string): string { |
| return id |
| .replace(/\//g, '') // Can not have / |
| .replace(/^\./, '') // Can not have leading . |
| .replace(/\s+/gm, ' '); // Can not have whitespaces other than space |
| } |