blob: f007263613d3c74cbb7af8f9ba154cc1e74a2e21 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { dedupe, F, SqlExpression, SqlFunction } from 'druid-query-toolkit';
import * as JSONBig from 'json-bigint-native';
import type {
DimensionSpec,
DimensionsSpec,
IngestionSpec,
IngestionType,
InputFormat,
IoConfig,
MetricSpec,
TimestampSpec,
Transform,
TransformSpec,
} from '../druid-models';
import {
ALL_POSSIBLE_SYSTEM_FIELDS,
DETECTION_TIMESTAMP_SPEC,
getDimensionNamesFromTransforms,
getDimensionSpecName,
getFlattenSpec,
getSpecType,
getTimestampSchema,
isDruidSource,
isFixedFormatSource,
PLACEHOLDER_TIMESTAMP_SPEC,
REINDEX_TIMESTAMP_SPEC,
TIME_COLUMN,
} from '../druid-models';
import { Api } from '../singletons';
import { getDruidErrorMessage, queryDruidRune } from './druid-query';
import { EMPTY_ARRAY, filterMap } from './general';
import { allowKeys, deepGet, deepSet } from './object-change';
const BASE_SAMPLER_CONFIG: SamplerConfig = {
numRows: 500,
timeoutMs: 15000,
};
export type SampleSpec = IngestionSpec & {
samplerConfig: SamplerConfig;
};
export interface SamplerConfig {
numRows?: number;
timeoutMs?: number;
}
export interface SampleResponse {
data: SampleEntry[];
logicalSegmentSchema: { name: string; type: string }[];
logicalDimensions: DimensionSpec[];
physicalDimensions: DimensionSpec[];
numRowsIndexed: number;
numRowsRead: number;
}
export type TimeColumnAction = 'preserve' | 'ignore' | 'ignoreIfZero';
export function getHeaderNamesFromSampleResponse(
sampleResponse: SampleResponse,
timeColumnAction: TimeColumnAction = 'preserve',
): string[] {
return getHeaderFromSampleResponse(sampleResponse, timeColumnAction).map(s => s.name);
}
export function getHeaderFromSampleResponse(
sampleResponse: SampleResponse,
timeColumnAction: TimeColumnAction = 'preserve',
): { name: string; type: string }[] {
const ignoreTimeColumn =
timeColumnAction === 'ignore' ||
(timeColumnAction === 'ignoreIfZero' &&
!sampleResponse.data.some(d => {
const t = d.parsed?.[TIME_COLUMN];
return typeof t === 'number' && t > 0;
}));
return sampleResponse.logicalSegmentSchema.filter(
s => !ignoreTimeColumn || s.name !== TIME_COLUMN,
);
}
export function guessDimensionsFromSampleResponse(sampleResponse: SampleResponse): DimensionSpec[] {
const { logicalDimensions, physicalDimensions, data } = sampleResponse;
return logicalDimensions.map(d => {
// Boolean column are currently reported as "long" so let's turn them into "string"
if (
d.type === 'long' &&
physicalDimensions.find(_ => _.name === d.name)?.type === 'json' &&
typeof data[0]?.input?.[d.name] === 'boolean'
) {
return {
name: d.name,
type: 'string',
};
}
return d;
});
}
export type CacheRows = Record<string, any>[];
export interface SampleResponseWithExtraInfo extends SampleResponse {
columns?: string[];
columnInfo?: Record<string, any>;
aggregators?: Record<string, any>;
rollup?: boolean;
}
export interface SampleEntry {
input: Record<string, any>;
parsed?: Record<string, any>;
unparseable?: boolean;
error?: string;
}
export function getCacheRowsFromSampleResponse(sampleResponse: SampleResponse): CacheRows {
return filterMap(sampleResponse.data, d => ({
...d.input,
...allowKeys<any>(d.parsed || {}, ALL_POSSIBLE_SYSTEM_FIELDS),
})).slice(0, 20);
}
export function applyCache(sampleSpec: SampleSpec, cacheRows: CacheRows) {
if (!cacheRows) return sampleSpec;
// In order to prevent potential data loss null columns should be kept by the sampler and shown in the ingestion flow
if (deepGet(sampleSpec, 'spec.ioConfig.inputFormat')) {
sampleSpec = deepSet(sampleSpec, 'spec.ioConfig.inputFormat.keepNullColumns', true);
}
// If this is already an inline spec there is nothing to do
if (deepGet(sampleSpec, 'spec.ioConfig.inputSource.type') === 'inline') return sampleSpec;
// Make the spec into an inline json spec
sampleSpec = deepSet(sampleSpec, 'type', 'index');
sampleSpec = deepSet(sampleSpec, 'spec.type', 'index');
sampleSpec = deepSet(sampleSpec, 'spec.ioConfig.type', 'index');
sampleSpec = deepSet(sampleSpec, 'spec.ioConfig.inputSource', {
type: 'inline',
data: cacheRows.map(r => JSONBig.stringify(r)).join('\n'),
});
const flattenSpec = getFlattenSpec(sampleSpec);
let inputFormat: InputFormat = { type: 'json', keepNullColumns: true };
if (flattenSpec) {
inputFormat = deepSet(inputFormat, 'flattenSpec', flattenSpec);
}
sampleSpec = deepSet(sampleSpec, 'spec.ioConfig.inputFormat', inputFormat);
return sampleSpec;
}
export async function getProxyOverlordModules(): Promise<string[]> {
let statusResp: any;
try {
statusResp = await Api.instance.get(`/proxy/overlord/status`);
} catch (e) {
throw new Error(getDruidErrorMessage(e));
}
const { modules } = statusResp.data;
if (!Array.isArray(modules)) throw new Error('unexpected result from overlord/status');
return modules.map((m: any) => m.artifact);
}
export async function postToSampler(
sampleSpec: SampleSpec,
forStr: string,
signal?: AbortSignal,
): Promise<SampleResponse> {
sampleSpec = fixSamplerLookups(fixSamplerTypes(sampleSpec));
let sampleResp: any;
try {
sampleResp = await Api.instance.post(`/druid/indexer/v1/sampler?for=${forStr}`, sampleSpec, {
signal,
});
} catch (e) {
throw new Error(getDruidErrorMessage(e));
}
return sampleResp.data;
}
export type SampleStrategy = 'start' | 'end';
function makeSamplerIoConfig(
ioConfig: IoConfig,
specType: IngestionType,
sampleStrategy: SampleStrategy,
): IoConfig {
ioConfig = deepSet(ioConfig || {}, 'type', specType);
if (specType === 'kafka') {
ioConfig = deepSet(ioConfig, 'useEarliestOffset', sampleStrategy === 'start');
} else if (specType === 'kinesis') {
ioConfig = deepSet(ioConfig, 'useEarliestSequenceNumber', sampleStrategy === 'start');
}
// In order to prevent potential data loss null columns should be kept by the sampler and shown in the ingestion flow
if (ioConfig.inputFormat) {
ioConfig = deepSet(ioConfig, 'inputFormat.keepNullColumns', true);
}
return ioConfig;
}
/**
This is a hack to deal with the fact that the sampler can not deal with the index_parallel type
*/
function fixSamplerTypes(sampleSpec: SampleSpec): SampleSpec {
let samplerType: string = getSpecType(sampleSpec);
if (samplerType === 'index_parallel') {
samplerType = 'index';
}
sampleSpec = deepSet(sampleSpec, 'type', samplerType);
sampleSpec = deepSet(sampleSpec, 'spec.type', samplerType);
sampleSpec = deepSet(sampleSpec, 'spec.ioConfig.type', samplerType);
sampleSpec = deepSet(sampleSpec, 'spec.tuningConfig.type', samplerType);
return sampleSpec;
}
const WHOLE_ROW_INPUT_FORMAT: InputFormat = {
type: 'regex',
pattern: '([\\s\\S]*)', // Match the entire line, every single character
listDelimiter: '56616469-6de2-9da4-efb8-8f416e6e6965', // Just a UUID to disable the list delimiter, let's hope we do not see this UUID in the data
columns: ['raw'],
};
const KAFKA_SAMPLE_INPUT_FORMAT: InputFormat = {
type: 'kafka',
headerFormat: {
type: 'string',
encoding: 'UTF-8',
},
keyFormat: WHOLE_ROW_INPUT_FORMAT,
valueFormat: WHOLE_ROW_INPUT_FORMAT,
};
const KINESIS_SAMPLE_INPUT_FORMAT: InputFormat = {
type: 'kinesis',
valueFormat: WHOLE_ROW_INPUT_FORMAT,
};
export async function sampleForConnect(
spec: Partial<IngestionSpec>,
sampleStrategy: SampleStrategy,
): Promise<SampleResponseWithExtraInfo> {
const samplerType = getSpecType(spec);
let ioConfig: IoConfig = makeSamplerIoConfig(
deepGet(spec, 'spec.ioConfig'),
samplerType,
sampleStrategy,
);
if (!isFixedFormatSource(spec)) {
ioConfig = deepSet(
ioConfig,
'inputFormat',
samplerType === 'kafka'
? KAFKA_SAMPLE_INPUT_FORMAT
: samplerType === 'kinesis'
? KINESIS_SAMPLE_INPUT_FORMAT
: WHOLE_ROW_INPUT_FORMAT,
);
}
const reingestMode = isDruidSource(spec);
const sampleSpec: SampleSpec = {
type: samplerType,
spec: {
type: samplerType,
ioConfig,
dataSchema: {
dataSource: 'sample',
timestampSpec: reingestMode ? REINDEX_TIMESTAMP_SPEC : PLACEHOLDER_TIMESTAMP_SPEC,
dimensionsSpec: { useSchemaDiscovery: true },
granularitySpec: {
rollup: false,
},
},
} as any,
samplerConfig: BASE_SAMPLER_CONFIG,
};
const samplerResponse: SampleResponseWithExtraInfo = await postToSampler(sampleSpec, 'connect');
if (!samplerResponse.data.length) return samplerResponse;
if (reingestMode) {
const dataSource = deepGet(ioConfig, 'inputSource.dataSource');
const intervals = deepGet(ioConfig, 'inputSource.interval');
const scanResponse = await queryDruidRune({
queryType: 'scan',
dataSource,
intervals,
resultFormat: 'compactedList',
limit: 1,
columns: [],
granularity: 'all',
});
const columns = deepGet(scanResponse, '0.columns');
if (!Array.isArray(columns)) {
throw new Error(`unexpected response from scan query`);
}
samplerResponse.columns = columns;
const segmentMetadataResponse = await queryDruidRune({
queryType: 'segmentMetadata',
dataSource,
intervals,
merge: true,
aggregatorMergeStrategy: 'lenient',
analysisTypes: ['aggregators', 'rollup'],
});
if (!Array.isArray(segmentMetadataResponse) || segmentMetadataResponse.length !== 1) {
throw new Error(`unexpected response from segmentMetadata query`);
}
const segmentMetadataResponse0 = segmentMetadataResponse[0];
samplerResponse.rollup = segmentMetadataResponse0.rollup;
samplerResponse.columnInfo = segmentMetadataResponse0.columns;
samplerResponse.aggregators = segmentMetadataResponse0.aggregators;
}
return samplerResponse;
}
export async function sampleForParser(
spec: Partial<IngestionSpec>,
sampleStrategy: SampleStrategy,
): Promise<SampleResponse> {
const samplerType = getSpecType(spec);
const ioConfig: IoConfig = makeSamplerIoConfig(
deepGet(spec, 'spec.ioConfig'),
samplerType,
sampleStrategy,
);
const reingestMode = isDruidSource(spec);
const sampleSpec: SampleSpec = {
type: samplerType,
spec: {
ioConfig,
dataSchema: {
dataSource: 'sample',
timestampSpec: reingestMode ? REINDEX_TIMESTAMP_SPEC : DETECTION_TIMESTAMP_SPEC,
dimensionsSpec: {
dimensions: deepGet(ioConfig, 'inputSource.systemFields'),
useSchemaDiscovery: true,
},
granularitySpec: {
rollup: false,
},
},
},
samplerConfig: BASE_SAMPLER_CONFIG,
};
return postToSampler(sampleSpec, 'parser');
}
export async function sampleForTimestamp(
spec: Partial<IngestionSpec>,
cacheRows: CacheRows,
): Promise<SampleResponse> {
const samplerType = getSpecType(spec);
const timestampSpec: TimestampSpec = deepGet(spec, 'spec.dataSchema.timestampSpec');
const timestampSchema = getTimestampSchema(spec);
// First do a query with a static timestamp spec
const sampleSpecColumns: SampleSpec = {
type: samplerType,
spec: {
ioConfig: deepGet(spec, 'spec.ioConfig'),
dataSchema: {
dataSource: 'sample',
dimensionsSpec: {
useSchemaDiscovery: true,
},
timestampSpec: timestampSchema === 'column' ? PLACEHOLDER_TIMESTAMP_SPEC : timestampSpec,
granularitySpec: {
rollup: false,
},
},
},
samplerConfig: BASE_SAMPLER_CONFIG,
};
const sampleColumns = await postToSampler(
applyCache(sampleSpecColumns, cacheRows),
'timestamp-columns',
);
// If we are not parsing a column then there is nothing left to do
if (timestampSchema === 'none') return sampleColumns;
const transforms: Transform[] =
deepGet(spec, 'spec.dataSchema.transformSpec.transforms') || EMPTY_ARRAY;
// If we are trying to parse a column then get a bit fancy:
// Query the same sample again (same cache key)
const sampleSpec: SampleSpec = {
type: samplerType,
spec: {
ioConfig: deepGet(spec, 'spec.ioConfig'),
dataSchema: {
dataSource: 'sample',
dimensionsSpec: {
useSchemaDiscovery: true,
},
timestampSpec,
transformSpec: {
transforms: transforms.filter(transform => transform.name === TIME_COLUMN),
},
granularitySpec: {
rollup: false,
},
},
},
samplerConfig: BASE_SAMPLER_CONFIG,
};
const sampleTime = await postToSampler(applyCache(sampleSpec, cacheRows), 'timestamp-time');
if (sampleTime.data.length !== sampleColumns.data.length) {
// If the two responses did not come from the same cache (or for some reason have different lengths) then
// just return the one with the parsed time column.
return sampleTime;
}
const sampleTimeData = sampleTime.data;
return {
...sampleColumns,
data: sampleColumns.data.map((d, i) => {
// Merge the column sample with the time column sample
if (!d.parsed) return d;
const timeDatumParsed = sampleTimeData[i].parsed;
d.parsed.__time = timeDatumParsed ? timeDatumParsed.__time : null;
return d;
}),
};
}
export async function sampleForTransform(
spec: Partial<IngestionSpec>,
cacheRows: CacheRows,
forceSegmentSortByTime: boolean,
): Promise<SampleResponse> {
const samplerType = getSpecType(spec);
const timestampSpec: TimestampSpec = deepGet(spec, 'spec.dataSchema.timestampSpec');
const transforms: Transform[] = deepGet(spec, 'spec.dataSchema.transformSpec.transforms') || [];
// Extra step to simulate auto-detecting dimension with transforms
let specialDimensionSpec: DimensionsSpec = {
useSchemaDiscovery: true,
forceSegmentSortByTime,
};
if (transforms && transforms.length) {
const sampleSpecHack: SampleSpec = {
type: samplerType,
spec: {
ioConfig: deepGet(spec, 'spec.ioConfig'),
dataSchema: {
dataSource: 'sample',
timestampSpec,
dimensionsSpec: {
useSchemaDiscovery: true,
},
granularitySpec: {
rollup: false,
},
},
},
samplerConfig: BASE_SAMPLER_CONFIG,
};
const sampleResponseHack = await postToSampler(
applyCache(sampleSpecHack, cacheRows),
'transform-pre',
);
specialDimensionSpec = deepSet(
specialDimensionSpec,
'dimensions',
dedupe(
(
guessDimensionsFromSampleResponse(sampleResponseHack) as (DimensionSpec | string)[]
).concat(getDimensionNamesFromTransforms(transforms)),
getDimensionSpecName,
),
);
}
const sampleSpec: SampleSpec = {
type: samplerType,
spec: {
ioConfig: deepGet(spec, 'spec.ioConfig'),
dataSchema: {
dataSource: 'sample',
timestampSpec,
dimensionsSpec: specialDimensionSpec, // Hack Hack Hack
transformSpec: {
transforms,
},
granularitySpec: {
rollup: false,
},
},
},
samplerConfig: BASE_SAMPLER_CONFIG,
};
return postToSampler(applyCache(sampleSpec, cacheRows), 'transform');
}
export async function sampleForFilter(
spec: Partial<IngestionSpec>,
cacheRows: CacheRows,
): Promise<SampleResponse> {
const samplerType = getSpecType(spec);
const timestampSpec: TimestampSpec = deepGet(spec, 'spec.dataSchema.timestampSpec');
const transforms: Transform[] = deepGet(spec, 'spec.dataSchema.transformSpec.transforms') || [];
const filter: any = deepGet(spec, 'spec.dataSchema.transformSpec.filter');
// Extra step to simulate auto-detecting dimension with transforms
let specialDimensionSpec: DimensionsSpec = { useSchemaDiscovery: true };
if (transforms && transforms.length) {
const sampleSpecHack: SampleSpec = {
type: samplerType,
spec: {
ioConfig: deepGet(spec, 'spec.ioConfig'),
dataSchema: {
dataSource: 'sample',
timestampSpec,
dimensionsSpec: {
useSchemaDiscovery: true,
},
granularitySpec: {
rollup: false,
},
},
},
samplerConfig: BASE_SAMPLER_CONFIG,
};
const sampleResponseHack = await postToSampler(
applyCache(sampleSpecHack, cacheRows),
'filter-pre',
);
specialDimensionSpec = deepSet(
specialDimensionSpec,
'dimensions',
dedupe(
getHeaderNamesFromSampleResponse(sampleResponseHack, 'ignore').concat(
getDimensionNamesFromTransforms(transforms),
),
),
);
}
const sampleSpec: SampleSpec = {
type: samplerType,
spec: {
ioConfig: deepGet(spec, 'spec.ioConfig'),
dataSchema: {
dataSource: 'sample',
timestampSpec,
dimensionsSpec: specialDimensionSpec, // Hack Hack Hack
transformSpec: {
transforms,
filter,
},
granularitySpec: {
rollup: false,
},
},
},
samplerConfig: BASE_SAMPLER_CONFIG,
};
return postToSampler(applyCache(sampleSpec, cacheRows), 'filter');
}
export async function sampleForSchema(
spec: Partial<IngestionSpec>,
cacheRows: CacheRows,
): Promise<SampleResponse> {
const samplerType = getSpecType(spec);
const timestampSpec: TimestampSpec = deepGet(spec, 'spec.dataSchema.timestampSpec');
const transformSpec: TransformSpec =
deepGet(spec, 'spec.dataSchema.transformSpec') || ({} as TransformSpec);
const dimensionsSpec: DimensionsSpec = deepGet(spec, 'spec.dataSchema.dimensionsSpec');
const metricsSpec: MetricSpec[] = deepGet(spec, 'spec.dataSchema.metricsSpec') || [];
const queryGranularity: string =
deepGet(spec, 'spec.dataSchema.granularitySpec.queryGranularity') || 'NONE';
const rollup = deepGet(spec, 'spec.dataSchema.granularitySpec.rollup') ?? true;
const sampleSpec: SampleSpec = {
type: samplerType,
spec: {
ioConfig: deepGet(spec, 'spec.ioConfig'),
dataSchema: {
dataSource: 'sample',
timestampSpec,
transformSpec,
granularitySpec: {
queryGranularity,
rollup,
},
dimensionsSpec,
metricsSpec,
},
},
samplerConfig: BASE_SAMPLER_CONFIG,
};
return postToSampler(applyCache(sampleSpec, cacheRows), 'schema');
}
function fixSamplerLookups(sampleSpec: SampleSpec): SampleSpec {
const transforms: Transform[] | undefined = deepGet(
sampleSpec,
'spec.dataSchema.transformSpec.transforms',
);
if (!Array.isArray(transforms)) return sampleSpec;
return deepSet(
sampleSpec,
'spec.dataSchema.transformSpec.transforms',
transforms.map(transform => {
const { expression } = transform;
if (typeof expression !== 'string') return transform;
return { ...transform, expression: changeLookupInExpressionsSampling(expression) };
}),
);
}
/**
* Lookups do not work in the sampler because they are not loaded in the Overlord
* to prevent the user from getting an error like "Unknown lookup [lookup name]" we
* change the lookup expression to a placeholder
*
* lookup("x", 'lookup_name') => concat('lookup_name', '[', "x", '] -- This is a placeholder, lookups are not supported in sampling')
* lookup("x", 'lookup_name', 'replaceValue') => nvl(concat('lookup_name', '[', "x", '] -- This is a placeholder, lookups are not supported in sampling'), 'replaceValue')
*/
export function changeLookupInExpressionsSampling(druidExpression: string): string {
if (!druidExpression.includes('lookup')) return druidExpression;
// The Druid expressions are very close to SQL so try parsing them as SQL, if it parses then this is a more robust way to apply this transformation
const parsedDruidExpression = SqlExpression.maybeParse(druidExpression);
if (parsedDruidExpression) {
return String(
parsedDruidExpression.walk(ex => {
if (ex instanceof SqlFunction && ex.getEffectiveFunctionName() === 'LOOKUP') {
if (ex.numArgs() < 2 || ex.numArgs() > 3) return SqlExpression.parse('null');
const concat = F(
'concat',
ex.getArg(1),
'[',
ex.getArg(0),
'] -- This is a placeholder, lookups are not supported in sampling',
);
const replaceMissingValueWith = ex.getArg(2);
if (!replaceMissingValueWith) return concat;
return F('nvl', concat, replaceMissingValueWith);
}
return ex;
}),
);
}
// If we can not parse the expression as SQL then bash it with a regexp
return druidExpression.replace(/lookup\s*\(([^)]+)\)/g, (_, argString: string) => {
const args = argString.trim().split(/\s*,\s*/);
if (args.length < 2 || args.length > 3) return 'null';
const concat = `concat(${args[1]},'[',${args[0]},'] -- This is a placeholder, lookups are not supported in sampling')`;
const replaceMissingValueWith = args[2];
if (!replaceMissingValueWith) return concat;
return `nvl(${concat},${replaceMissingValueWith})`;
});
}