blob: 3da20ee8ec3c94ee3280098777690d67aea0def2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Icon, Intent, Menu, MenuItem, Popover, Position, Tag } from '@blueprintjs/core';
import { IconNames } from '@blueprintjs/icons';
import * as JSONBig from 'json-bigint-native';
import memoize from 'memoize-one';
import type { JSX } from 'react';
import React, { createContext, useContext } from 'react';
import type { Column, Filter, SortingRule } from 'react-table';
import ReactTable from 'react-table';
import type { TableColumnSelectorColumn } from '../../components';
import {
ACTION_COLUMN_ID,
ACTION_COLUMN_LABEL,
ACTION_COLUMN_WIDTH,
ActionCell,
MoreButton,
RefreshButton,
TableClickableCell,
TableColumnSelector,
TableFilterableCell,
ViewControlBar,
} from '../../components';
import {
AlertDialog,
AsyncActionDialog,
SpecDialog,
SupervisorResetOffsetsDialog,
SupervisorTableActionDialog,
TaskGroupHandoffDialog,
} from '../../dialogs';
import type {
IngestionSpec,
QueryWithContext,
RowStatsKey,
SupervisorStats,
SupervisorStatus,
SupervisorStatusTask,
} from '../../druid-models';
import { getConsoleViewIcon, getTotalSupervisorStats } from '../../druid-models';
import type { Capabilities } from '../../helpers';
import {
SMALL_TABLE_PAGE_SIZE,
SMALL_TABLE_PAGE_SIZE_OPTIONS,
sqlQueryCustomTableFilters,
suggestibleFilterInput,
} from '../../react-table';
import { Api, AppToaster } from '../../singletons';
import type { AuxiliaryQueryFn, TableState } from '../../utils';
import {
assemble,
checkedCircleIcon,
deepGet,
filterMap,
formatByteRate,
formatBytes,
formatInteger,
formatRate,
getApiArray,
getDruidErrorMessage,
hasOverlayOpen,
isNumberLike,
LocalStorageBackedVisibility,
LocalStorageKeys,
nonEmptyArray,
oneOf,
pluralIfNeeded,
queryDruidSql,
QueryManager,
QueryState,
ResultWithAuxiliaryWork,
sortedToOrderByClause,
twoLines,
} from '../../utils';
import type { BasicAction } from '../../utils/basic-action';
import './supervisors-view.scss';
const SUPERVISOR_TABLE_COLUMNS: TableColumnSelectorColumn[] = [
'Supervisor ID',
'Datasource',
'Type',
'Topic/Stream',
'Status',
'Configured tasks',
{ text: 'Running tasks', label: 'status API' },
{ text: 'Aggregate lag', label: 'status API' },
{ text: 'Stats', label: 'stats API' },
{ text: 'Recent errors', label: 'status API' },
];
const ROW_STATS_KEYS: RowStatsKey[] = ['1m', '5m', '15m'];
const STATUS_API_TIMEOUT = 5000;
const STATS_API_TIMEOUT = 5000;
function getRowStatsKeyTitle(key: RowStatsKey) {
return `Rate over past ${pluralIfNeeded(parseInt(key, 10), 'minute')}`;
}
function getRowStatsKeySeconds(key: RowStatsKey): number {
return parseInt(key, 10) * 60;
}
interface SupervisorQuery extends TableState {
capabilities: Capabilities;
visibleColumns: LocalStorageBackedVisibility;
}
interface SupervisorQueryResultRow {
readonly supervisor_id: string;
readonly datasource: string;
readonly type: string;
readonly source: string;
readonly detailed_state: string;
readonly spec?: IngestionSpec;
readonly suspended: boolean;
}
interface SupervisorsWithAuxiliaryInfo {
readonly supervisors: SupervisorQueryResultRow[];
readonly count: number;
readonly status: Record<string, SupervisorStatus>;
readonly stats: Record<string, SupervisorStats>;
}
export const StatusContext = createContext<Record<string, SupervisorStatus>>({});
export const StatsContext = createContext<{
stats: Record<string, SupervisorStats>;
statsKey: RowStatsKey;
}>({ stats: {}, statsKey: '5m' });
interface HeaderStatsKeySelectorProps {
changeStatsKey(statsKey: RowStatsKey): void;
}
function HeaderStatsKeySelector({ changeStatsKey }: HeaderStatsKeySelectorProps) {
const { statsKey } = useContext(StatsContext);
return (
<Popover
position={Position.BOTTOM}
content={
<Menu>
{ROW_STATS_KEYS.map(k => (
<MenuItem
key={k}
icon={checkedCircleIcon(k === statsKey)}
text={getRowStatsKeyTitle(k)}
onClick={() => changeStatsKey(k)}
/>
))}
</Menu>
}
>
<i className="title-button">
{getRowStatsKeyTitle(statsKey)} <Icon icon={IconNames.CARET_DOWN} />
</i>
</Popover>
);
}
export interface SupervisorsViewProps {
filters: Filter[];
onFiltersChange(filters: Filter[]): void;
openSupervisorDialog: boolean | undefined;
goToDatasource(datasource: string): void;
goToQuery(queryWithContext: QueryWithContext): void;
goToStreamingDataLoader(supervisorId: string): void;
goToTasks(supervisorId: string, type: string | undefined): void;
capabilities: Capabilities;
}
export interface SupervisorsViewState {
supervisorsState: QueryState<SupervisorsWithAuxiliaryInfo>;
statsKey: RowStatsKey;
resumeSupervisorId?: string;
suspendSupervisorId?: string;
handoffSupervisorId?: string;
resetOffsetsSupervisorInfo?: { id: string; type: string };
resetSupervisorId?: string;
terminateSupervisorId?: string;
showResumeAllSupervisors: boolean;
showSuspendAllSupervisors: boolean;
showTerminateAllSupervisors: boolean;
supervisorSpecDialogOpen: boolean;
alertErrorMsg?: string;
supervisorTableActionDialogId?: string;
supervisorTableActionDialogActions: BasicAction[];
visibleColumns: LocalStorageBackedVisibility;
page: number;
pageSize: number;
sorted: SortingRule[];
}
function detailedStateToColor(detailedState: string): string {
switch (detailedState) {
case 'UNHEALTHY_SUPERVISOR':
case 'UNHEALTHY_TASKS':
case 'UNABLE_TO_CONNECT_TO_STREAM':
case 'LOST_CONTACT_WITH_STREAM':
case 'INVALID_SPEC':
return '#d5100a';
case 'PENDING':
return '#00eaff';
case 'DISCOVERING_INITIAL_TASKS':
case 'CREATING_TASKS':
case 'CONNECTING_TO_STREAM':
case 'RUNNING':
return '#2167d5';
case 'IDLE':
return '#44659d';
case 'STOPPING':
return '#e75c06';
case 'SUSPENDED':
case 'SCHEDULER_STOPPED':
return '#ffbf00';
default:
return '#0a1500';
}
}
export class SupervisorsView extends React.PureComponent<
SupervisorsViewProps,
SupervisorsViewState
> {
private readonly supervisorQueryManager: QueryManager<
SupervisorQuery,
SupervisorsWithAuxiliaryInfo
>;
constructor(props: SupervisorsViewProps) {
super(props);
this.state = {
supervisorsState: QueryState.INIT,
statsKey: '5m',
showResumeAllSupervisors: false,
showSuspendAllSupervisors: false,
showTerminateAllSupervisors: false,
supervisorSpecDialogOpen: Boolean(props.openSupervisorDialog),
supervisorTableActionDialogActions: [],
visibleColumns: new LocalStorageBackedVisibility(
LocalStorageKeys.SUPERVISOR_TABLE_COLUMN_SELECTION,
),
page: 0,
pageSize: SMALL_TABLE_PAGE_SIZE,
sorted: [],
};
this.supervisorQueryManager = new QueryManager({
processQuery: async (
{ capabilities, visibleColumns, filtered, sorted, page, pageSize },
signal,
setIntermediateQuery,
) => {
let supervisors: SupervisorQueryResultRow[];
let count = -1;
const auxiliaryQueries: AuxiliaryQueryFn<SupervisorsWithAuxiliaryInfo>[] = [];
if (capabilities.hasSql()) {
const whereExpression = sqlQueryCustomTableFilters(filtered);
let filterClause = '';
if (whereExpression.toString() !== 'TRUE') {
filterClause = whereExpression.toString();
}
const sqlQuery = assemble(
'WITH s AS (SELECT',
' "supervisor_id",',
' "datasource",',
' "type",',
' "source",',
` CASE WHEN "suspended" = 0 THEN "detailed_state" ELSE 'SUSPENDED' END AS "detailed_state",`,
visibleColumns.shown('Configured tasks') ? ' "spec",' : undefined,
' "suspended" = 1 AS "suspended"',
'FROM "sys"."supervisors")',
'SELECT *',
'FROM s',
filterClause ? `WHERE ${filterClause}` : undefined,
sortedToOrderByClause(sorted),
`LIMIT ${pageSize}`,
page ? `OFFSET ${page * pageSize}` : undefined,
).join('\n');
setIntermediateQuery(sqlQuery);
supervisors = (
await queryDruidSql<SupervisorQueryResultRow>(
{
query: sqlQuery,
},
signal,
)
).map(supervisor => {
const spec: any = supervisor.spec;
if (typeof spec !== 'string') return supervisor;
return { ...supervisor, spec: JSONBig.parse(spec) };
});
auxiliaryQueries.push(async (supervisorsWithAuxiliaryInfo, signal) => {
const sqlQuery = assemble(
'SELECT COUNT(*) AS "cnt"',
'FROM "sys"."supervisors"',
filterClause ? `WHERE ${filterClause}` : undefined,
).join('\n');
const cnt: any = (
await queryDruidSql<{ cnt: number }>(
{
query: sqlQuery,
},
signal,
)
)[0].cnt;
return {
...supervisorsWithAuxiliaryInfo,
count: typeof cnt === 'number' ? cnt : -1,
};
});
} else if (capabilities.hasOverlordAccess()) {
supervisors = (await getApiArray('/druid/indexer/v1/supervisor?full', signal)).map(
(sup: any) => {
return {
supervisor_id: deepGet(sup, 'id'),
datasource: deepGet(sup, 'dataSource'),
type: deepGet(sup, 'spec.tuningConfig.type'),
source:
deepGet(sup, 'spec.ioConfig.topic') ||
deepGet(sup, 'spec.ioConfig.stream') ||
'n/a',
state: deepGet(sup, 'state'),
detailed_state: deepGet(sup, 'detailedState'),
spec: sup.spec,
suspended: Boolean(deepGet(sup, 'suspended')),
};
},
);
count = supervisors.length;
const firstSorted = sorted[0];
if (firstSorted) {
const { id, desc } = firstSorted;
supervisors.sort((s1: any, s2: any) => {
return (
String(s1[id]).localeCompare(String(s2[id]), undefined, { numeric: true }) *
(desc ? -1 : 1)
);
});
}
} else {
throw new Error(`must have SQL or overlord access`);
}
if (capabilities.hasOverlordAccess()) {
if (visibleColumns.shown('Running tasks', 'Aggregate lag', 'Recent errors')) {
auxiliaryQueries.push(
...supervisors.map(
(supervisor): AuxiliaryQueryFn<SupervisorsWithAuxiliaryInfo> =>
async (supervisorsWithAuxiliaryInfo, signal) => {
const status = (
await Api.instance.get(
`/druid/indexer/v1/supervisor/${Api.encodePath(
supervisor.supervisor_id,
)}/status`,
{ signal, timeout: STATUS_API_TIMEOUT },
)
).data;
return {
...supervisorsWithAuxiliaryInfo,
status: {
...supervisorsWithAuxiliaryInfo.status,
[supervisor.supervisor_id]: status,
},
};
},
),
);
}
if (visibleColumns.shown('Stats')) {
auxiliaryQueries.push(
...filterMap(
supervisors,
(supervisor): AuxiliaryQueryFn<SupervisorsWithAuxiliaryInfo> | undefined => {
if (oneOf(supervisor.type, 'autocompact', 'scheduled_batch')) return; // These supervisors do not report stats
return async (supervisorsWithAuxiliaryInfo, signal) => {
const stats = (
await Api.instance.get(
`/druid/indexer/v1/supervisor/${Api.encodePath(
supervisor.supervisor_id,
)}/stats`,
{ signal, timeout: STATS_API_TIMEOUT },
)
).data;
return {
...supervisorsWithAuxiliaryInfo,
stats: {
...supervisorsWithAuxiliaryInfo.stats,
[supervisor.supervisor_id]: stats,
},
};
};
},
),
);
}
}
return new ResultWithAuxiliaryWork<SupervisorsWithAuxiliaryInfo>(
{ supervisors, count, status: {}, stats: {} },
auxiliaryQueries,
);
},
onStateChange: supervisorsState => {
this.setState({
supervisorsState,
});
},
});
}
componentDidMount() {
this.fetchData();
}
componentWillUnmount(): void {
this.supervisorQueryManager.terminate();
}
componentDidUpdate(
prevProps: Readonly<SupervisorsViewProps>,
prevState: Readonly<SupervisorsViewState>,
) {
const { filters } = this.props;
const { page, pageSize, sorted } = this.state;
if (
!sqlQueryCustomTableFilters(filters).equals(sqlQueryCustomTableFilters(prevProps.filters)) ||
page !== prevState.page ||
pageSize !== prevState.pageSize ||
sortedToOrderByClause(sorted) !== sortedToOrderByClause(prevState.sorted)
) {
this.fetchData();
}
}
private readonly fetchData = () => {
const { capabilities, filters } = this.props;
const { visibleColumns, page, pageSize, sorted } = this.state;
this.supervisorQueryManager.runQuery({
page,
pageSize,
filtered: filters,
sorted,
visibleColumns,
capabilities,
});
};
private readonly handleFilterChange = (filters: Filter[]) => {
this.goToFirstPage();
this.props.onFiltersChange(filters);
};
private goToFirstPage() {
if (this.state.page) {
this.setState({ page: 0 });
}
}
private readonly closeSpecDialogs = () => {
this.setState({
supervisorSpecDialogOpen: false,
});
};
private readonly submitSupervisor = async (spec: JSON) => {
try {
await Api.instance.post('/druid/indexer/v1/supervisor', spec);
} catch (e) {
AppToaster.show({
message: `Failed to submit supervisor: ${getDruidErrorMessage(e)}`,
intent: Intent.DANGER,
});
return;
}
AppToaster.show({
message: 'Supervisor submitted successfully',
intent: Intent.SUCCESS,
});
this.supervisorQueryManager.rerunLastQuery();
};
private getSupervisorActions(supervisor: SupervisorQueryResultRow): BasicAction[] {
const { supervisor_id, datasource, suspended, type } = supervisor;
const { goToDatasource, goToStreamingDataLoader } = this.props;
const actions: BasicAction[] = [];
if (oneOf(type, 'kafka', 'kinesis')) {
actions.push(
{
icon: IconNames.MULTI_SELECT,
title: 'Go to datasource',
onAction: () => goToDatasource(datasource),
},
{
icon: IconNames.CLOUD_UPLOAD,
title: 'Open in data loader',
onAction: () => goToStreamingDataLoader(supervisor_id),
},
);
}
actions.push(
{
icon: suspended ? IconNames.PLAY : IconNames.PAUSE,
title: suspended ? 'Resume' : 'Suspend',
onAction: () =>
suspended
? this.setState({ resumeSupervisorId: supervisor_id })
: this.setState({ suspendSupervisorId: supervisor_id }),
},
{
icon: getConsoleViewIcon('tasks'),
title: 'Go to tasks',
onAction: () => this.goToTasksForSupervisor(supervisor),
},
);
if (!suspended) {
actions.push({
icon: IconNames.AUTOMATIC_UPDATES,
title: 'Handoff early',
onAction: () => this.setState({ handoffSupervisorId: supervisor_id }),
});
}
actions.push(
{
icon: IconNames.STEP_BACKWARD,
title: `Set ${type === 'kinesis' ? 'sequence numbers' : 'offsets'}`,
onAction: () => this.setState({ resetOffsetsSupervisorInfo: { id: supervisor_id, type } }),
},
{
icon: IconNames.STEP_BACKWARD,
title: 'Hard reset',
intent: Intent.DANGER,
onAction: () => this.setState({ resetSupervisorId: supervisor_id }),
},
{
icon: IconNames.CROSS,
title: 'Terminate',
intent: Intent.DANGER,
onAction: () => this.setState({ terminateSupervisorId: supervisor_id }),
},
);
return actions;
}
renderResumeSupervisorAction() {
const { resumeSupervisorId } = this.state;
if (!resumeSupervisorId) return;
return (
<AsyncActionDialog
action={async () => {
const resp = await Api.instance.post(
`/druid/indexer/v1/supervisor/${Api.encodePath(resumeSupervisorId)}/resume`,
{},
);
return resp.data;
}}
confirmButtonText="Resume supervisor"
successText="Supervisor has been resumed"
failText="Could not resume supervisor"
intent={Intent.PRIMARY}
onClose={() => {
this.setState({ resumeSupervisorId: undefined });
}}
onSuccess={() => {
this.supervisorQueryManager.rerunLastQuery();
}}
>
<p>
Are you sure you want to resume supervisor <Tag minimal>{resumeSupervisorId}</Tag>?
</p>
</AsyncActionDialog>
);
}
renderSuspendSupervisorAction() {
const { suspendSupervisorId } = this.state;
if (!suspendSupervisorId) return;
return (
<AsyncActionDialog
action={async () => {
const resp = await Api.instance.post(
`/druid/indexer/v1/supervisor/${Api.encodePath(suspendSupervisorId)}/suspend`,
{},
);
return resp.data;
}}
confirmButtonText="Suspend supervisor"
successText="Supervisor has been suspended"
failText="Could not suspend supervisor"
intent={Intent.DANGER}
onClose={() => {
this.setState({ suspendSupervisorId: undefined });
}}
onSuccess={() => {
this.supervisorQueryManager.rerunLastQuery();
}}
>
<p>
Are you sure you want to suspend supervisor <Tag minimal>{suspendSupervisorId}</Tag>?
</p>
</AsyncActionDialog>
);
}
renderTaskGroupHandoffAction() {
const { handoffSupervisorId } = this.state;
if (!handoffSupervisorId) return;
return (
<TaskGroupHandoffDialog
supervisorId={handoffSupervisorId}
onClose={() => {
this.setState({ handoffSupervisorId: undefined });
}}
onSuccess={() => {
this.supervisorQueryManager.rerunLastQuery();
}}
/>
);
}
renderResetOffsetsSupervisorAction() {
const { resetOffsetsSupervisorInfo } = this.state;
if (!resetOffsetsSupervisorInfo) return;
return (
<SupervisorResetOffsetsDialog
supervisorId={resetOffsetsSupervisorInfo.id}
supervisorType={resetOffsetsSupervisorInfo.type}
onClose={() => {
this.setState({ resetOffsetsSupervisorInfo: undefined });
}}
/>
);
}
renderResetSupervisorAction() {
const { resetSupervisorId } = this.state;
if (!resetSupervisorId) return;
return (
<AsyncActionDialog
action={async () => {
const resp = await Api.instance.post(
`/druid/indexer/v1/supervisor/${Api.encodePath(resetSupervisorId)}/reset`,
'',
);
return resp.data;
}}
confirmButtonText="Hard reset supervisor"
successText="Supervisor has been hard reset"
failText="Could not hard reset supervisor"
intent={Intent.DANGER}
onClose={() => {
this.setState({ resetSupervisorId: undefined });
}}
onSuccess={() => {
this.supervisorQueryManager.rerunLastQuery();
}}
warningChecks={[
<>
I understand that resetting <Tag minimal>{resetSupervisorId}</Tag> will clear
checkpoints and may lead to data loss or duplication.
</>,
'I understand that this operation cannot be undone.',
]}
>
<p>
Are you sure you want to hard reset supervisor <Tag minimal>{resetSupervisorId}</Tag>?
</p>
<p>Hard resetting a supervisor may lead to data loss or data duplication.</p>
<p>
Use this operation to restore functionality when the supervisor stops operating due to
missing offsets.
</p>
</AsyncActionDialog>
);
}
renderTerminateSupervisorAction() {
const { terminateSupervisorId } = this.state;
if (!terminateSupervisorId) return;
return (
<AsyncActionDialog
action={async () => {
const resp = await Api.instance.post(
`/druid/indexer/v1/supervisor/${Api.encodePath(terminateSupervisorId)}/terminate`,
{},
);
return resp.data;
}}
confirmButtonText="Terminate supervisor"
successText="Supervisor has been terminated"
failText="Could not terminate supervisor"
intent={Intent.DANGER}
onClose={() => {
this.setState({ terminateSupervisorId: undefined });
}}
onSuccess={() => {
this.supervisorQueryManager.rerunLastQuery();
}}
>
<p>
Are you sure you want to terminate supervisor <Tag minimal>{terminateSupervisorId}</Tag>?
</p>
<p>This action is not reversible.</p>
</AsyncActionDialog>
);
}
private renderSupervisorFilterableCell(field: string) {
const { filters } = this.props;
const { handleFilterChange } = this;
return function SupervisorFilterableCell(row: { value: any }) {
return (
<TableFilterableCell
field={field}
value={row.value}
filters={filters}
onFiltersChange={handleFilterChange}
/>
);
};
}
private onSupervisorDetail(supervisor: SupervisorQueryResultRow) {
this.setState({
supervisorTableActionDialogId: supervisor.supervisor_id,
supervisorTableActionDialogActions: this.getSupervisorActions(supervisor),
});
}
private goToTasksForSupervisor(supervisor: SupervisorQueryResultRow) {
const { goToTasks } = this.props;
switch (supervisor.type) {
case 'kafka':
case 'kinesis':
goToTasks(
`index_${supervisor.type}_${supervisor.supervisor_id}`,
`index_${supervisor.type}`,
);
return;
case 'autocompact':
goToTasks(supervisor.supervisor_id.replace(/^autocompact__/, ''), 'compact');
return;
case 'scheduled_batch':
goToTasks(
supervisor.supervisor_id.replace(/^scheduled_batch__/, '').replace(/__[0-9a-f-]+$/, ''),
'query_controller',
);
return;
default:
goToTasks(supervisor.supervisor_id, undefined);
return;
}
}
private renderSupervisorTable() {
const { filters } = this.props;
const { supervisorsState, page, pageSize, sorted, statsKey, visibleColumns } = this.state;
const { supervisors, count, status, stats } = supervisorsState.data || {
supervisors: [],
count: -1,
status: {},
stats: {},
};
return (
<StatusContext.Provider value={status}>
<StatsContext.Provider value={{ stats, statsKey }}>
<ReactTable
data={supervisors}
pages={count >= 0 ? Math.ceil(count / pageSize) : 10000000} // We are hiding the page selector
loading={supervisorsState.loading}
noDataText={
supervisorsState.isEmpty()
? 'No supervisors'
: supervisorsState.getErrorMessage() || ''
}
manual
filterable
filtered={filters}
onFilteredChange={this.handleFilterChange}
sorted={sorted}
onSortedChange={sorted => this.setState({ sorted })}
page={page}
onPageChange={page => this.setState({ page })}
pageSize={pageSize}
onPageSizeChange={pageSize => this.setState({ pageSize })}
pageSizeOptions={SMALL_TABLE_PAGE_SIZE_OPTIONS}
showPagination
showPageJump={false}
ofText={count >= 0 ? `of ${formatInteger(count)}` : ''}
columns={this.getTableColumns(visibleColumns, filters)}
/>
</StatsContext.Provider>
</StatusContext.Provider>
);
}
private readonly getTableColumns = memoize(
(
visibleColumns: LocalStorageBackedVisibility,
filters: Filter[],
): Column<SupervisorQueryResultRow>[] => {
return [
{
Header: 'Supervisor ID',
id: 'supervisor_id',
accessor: 'supervisor_id',
width: 280,
show: visibleColumns.shown('Supervisor ID'),
Cell: ({ value, original }) => (
<TableClickableCell
tooltip="Show detail"
onClick={() => this.onSupervisorDetail(original)}
hoverIcon={IconNames.SEARCH_TEMPLATE}
>
{value}
</TableClickableCell>
),
},
{
Header: 'Datasource',
accessor: 'datasource',
width: 280,
Cell: this.renderSupervisorFilterableCell('datasource'),
show: visibleColumns.shown('Datasource'),
},
{
Header: 'Type',
accessor: 'type',
width: 120,
Cell: this.renderSupervisorFilterableCell('type'),
show: visibleColumns.shown('Type'),
},
{
Header: 'Topic/Stream',
accessor: 'source',
width: 200,
Cell: this.renderSupervisorFilterableCell('source'),
show: visibleColumns.shown('Topic/Stream'),
},
{
Header: 'Status',
id: 'detailed_state',
width: 170,
Filter: suggestibleFilterInput([
'CONNECTING_TO_STREAM',
'CREATING_TASKS',
'DISCOVERING_INITIAL_TASKS',
'IDLE',
'INVALID_SPEC',
'LOST_CONTACT_WITH_STREAM',
'PENDING',
'RUNNING',
'SCHEDULER_STOPPED',
'STOPPING',
'SUSPENDED',
'UNABLE_TO_CONNECT_TO_STREAM',
'UNHEALTHY_SUPERVISOR',
'UNHEALTHY_TASKS',
]),
accessor: 'detailed_state',
Cell: ({ value }) => (
<TableFilterableCell
field="detailed_state"
value={value}
filters={filters}
onFiltersChange={this.handleFilterChange}
>
<span>
<span style={{ color: detailedStateToColor(value) }}>&#x25cf;&nbsp;</span>
{value}
</span>
</TableFilterableCell>
),
show: visibleColumns.shown('Status'),
},
{
Header: 'Configured tasks',
id: 'configured_tasks',
width: 130,
accessor: 'spec',
filterable: false,
sortable: false,
className: 'padded',
Cell: ({ value }) => {
if (!value) return null;
const taskCount = deepGet(value, 'spec.ioConfig.taskCount');
const replicas = deepGet(value, 'spec.ioConfig.replicas');
if (typeof taskCount !== 'number' || typeof replicas !== 'number') return null;
return (
<div>
<div>{formatInteger(taskCount * replicas)}</div>
<div className="detail-line">
{replicas === 1
? '(no replication)'
: `(${pluralIfNeeded(taskCount, 'task')} × ${pluralIfNeeded(
replicas,
'replica',
)})`}
</div>
</div>
);
},
show: visibleColumns.shown('Configured tasks'),
},
{
Header: 'Running tasks',
id: 'running_tasks',
accessor: 'supervisor_id',
width: 150,
filterable: false,
sortable: false,
Cell: ({ value, original }) => {
const status = useContext(StatusContext);
if (original.suspended) return;
const { activeTasks, publishingTasks } = status[value]?.payload || {};
let label: string | JSX.Element;
if (Array.isArray(activeTasks)) {
label = pluralIfNeeded(activeTasks.length, 'active task');
if (nonEmptyArray(publishingTasks)) {
label = (
<>
<div>{label}</div>
<div>{pluralIfNeeded(publishingTasks.length, 'publishing task')}</div>
</>
);
}
} else {
label = '';
}
return (
<TableClickableCell
tooltip="Go to tasks"
onClick={() => this.goToTasksForSupervisor(original)}
hoverIcon={IconNames.ARROW_TOP_RIGHT}
>
{label}
</TableClickableCell>
);
},
show: visibleColumns.shown('Running tasks'),
},
{
Header: 'Aggregate lag',
accessor: 'supervisor_id',
width: 200,
filterable: false,
sortable: false,
className: 'padded',
show: visibleColumns.shown('Aggregate lag'),
Cell: ({ value }) => {
const status = useContext(StatusContext);
const aggregateLag = status[value]?.payload?.aggregateLag;
return isNumberLike(aggregateLag) ? formatInteger(aggregateLag) : null;
},
},
{
Header: twoLines(
'Stats',
<HeaderStatsKeySelector changeStatsKey={statsKey => this.setState({ statsKey })} />,
),
id: 'stats',
accessor: 'supervisor_id',
width: 300,
filterable: false,
sortable: false,
className: 'padded',
show: visibleColumns.shown('Stats'),
Cell: ({ value, original }) => {
const { stats, statsKey } = useContext(StatsContext);
const s = stats[value];
if (!s) return;
const activeTasks: SupervisorStatusTask[] | undefined = deepGet(
original,
'status.payload.activeTasks',
);
const activeTaskIds: string[] | undefined = Array.isArray(activeTasks)
? activeTasks.map((t: SupervisorStatusTask) => t.id)
: undefined;
const c = getTotalSupervisorStats(s, statsKey, activeTaskIds);
const seconds = getRowStatsKeySeconds(statsKey);
const issues = (c.processedWithError || 0) + (c.thrownAway || 0) + (c.unparseable || 0);
const totalLabel = `Total (past ${statsKey}): `;
return issues ? (
<div>
<div
data-tooltip={`${totalLabel}${formatBytes(c.processedBytes * seconds)}`}
>{`Input: ${formatByteRate(c.processedBytes)}`}</div>
{Boolean(c.processed) && (
<div
data-tooltip={`${totalLabel}${formatInteger(c.processed * seconds)} events`}
>{`Processed: ${formatRate(c.processed)}`}</div>
)}
{Boolean(c.processedWithError) && (
<div
className="warning-line"
data-tooltip={`${totalLabel}${formatInteger(
c.processedWithError * seconds,
)} events`}
>
Processed with error: {formatRate(c.processedWithError)}
</div>
)}
{Boolean(c.thrownAway) && (
<div
className="warning-line"
data-tooltip={`${totalLabel}${formatInteger(c.thrownAway * seconds)} events`}
>
Thrown away: {formatRate(c.thrownAway)}
</div>
)}
{Boolean(c.unparseable) && (
<div
className="warning-line"
data-tooltip={`${totalLabel}${formatInteger(c.unparseable * seconds)} events`}
>
Unparseable: {formatRate(c.unparseable)}
</div>
)}
</div>
) : c.processedBytes ? (
<div
data-tooltip={`${totalLabel}${formatInteger(
c.processed * seconds,
)} events, ${formatBytes(c.processedBytes * seconds)}`}
>{`Processed: ${formatRate(c.processed)} (${formatByteRate(c.processedBytes)})`}</div>
) : (
<div>No activity</div>
);
},
},
{
Header: 'Recent errors',
accessor: 'supervisor_id',
width: 150,
filterable: false,
sortable: false,
show: visibleColumns.shown('Recent errors'),
Cell: ({ value, original }) => {
const status = useContext(StatusContext);
const recentErrors = status[value]?.payload?.recentErrors;
if (!recentErrors) return null;
return (
<TableClickableCell
tooltip="Show errors"
onClick={() => this.onSupervisorDetail(original)}
hoverIcon={IconNames.SEARCH_TEMPLATE}
>
{pluralIfNeeded(recentErrors.length, 'error')}
</TableClickableCell>
);
},
},
{
Header: ACTION_COLUMN_LABEL,
id: ACTION_COLUMN_ID,
accessor: 'supervisor_id',
width: ACTION_COLUMN_WIDTH,
filterable: false,
sortable: false,
Cell: ({ value: id, original }) => {
const supervisorActions = this.getSupervisorActions(original);
return (
<ActionCell
onDetail={() => this.onSupervisorDetail(original)}
actions={supervisorActions}
menuTitle={id}
/>
);
},
},
];
},
);
renderBulkSupervisorActions() {
const { capabilities, goToQuery } = this.props;
const lastSupervisorQuery = this.supervisorQueryManager.getLastIntermediateQuery();
return (
<>
<MoreButton>
{capabilities.hasSql() && (
<MenuItem
icon={getConsoleViewIcon('workbench')}
text="View SQL query for table"
disabled={typeof lastSupervisorQuery !== 'string'}
onClick={() => {
if (typeof lastSupervisorQuery !== 'string') return;
goToQuery({ queryString: lastSupervisorQuery });
}}
/>
)}
<MenuItem
icon={IconNames.MANUALLY_ENTERED_DATA}
text="Submit JSON supervisor"
onClick={() => this.setState({ supervisorSpecDialogOpen: true })}
/>
<MenuItem
icon={IconNames.PLAY}
text="Resume all supervisors"
onClick={() => this.setState({ showResumeAllSupervisors: true })}
/>
<MenuItem
icon={IconNames.PAUSE}
text="Suspend all supervisors"
onClick={() => this.setState({ showSuspendAllSupervisors: true })}
/>
<MenuItem
icon={IconNames.CROSS}
text="Terminate all supervisors"
intent={Intent.DANGER}
onClick={() => this.setState({ showTerminateAllSupervisors: true })}
/>
</MoreButton>
{this.renderResumeAllSupervisorAction()}
{this.renderSuspendAllSupervisorAction()}
{this.renderTerminateAllSupervisorAction()}
</>
);
}
renderResumeAllSupervisorAction() {
const { showResumeAllSupervisors } = this.state;
if (!showResumeAllSupervisors) return;
return (
<AsyncActionDialog
action={async () => {
const resp = await Api.instance.post(`/druid/indexer/v1/supervisor/resumeAll`, {});
return resp.data;
}}
confirmButtonText="Resume all supervisors"
successText="All supervisors have been resumed"
failText="Could not resume all supervisors"
intent={Intent.PRIMARY}
onClose={() => {
this.setState({ showResumeAllSupervisors: false });
}}
onSuccess={() => {
this.supervisorQueryManager.rerunLastQuery();
}}
>
<p>Are you sure you want to resume all the supervisors?</p>
</AsyncActionDialog>
);
}
renderSuspendAllSupervisorAction() {
const { showSuspendAllSupervisors } = this.state;
if (!showSuspendAllSupervisors) return;
return (
<AsyncActionDialog
action={async () => {
const resp = await Api.instance.post(`/druid/indexer/v1/supervisor/suspendAll`, {});
return resp.data;
}}
confirmButtonText="Suspend all supervisors"
successText="All supervisors have been suspended"
failText="Could not suspend all supervisors"
intent={Intent.DANGER}
onClose={() => {
this.setState({ showSuspendAllSupervisors: false });
}}
onSuccess={() => {
this.supervisorQueryManager.rerunLastQuery();
}}
>
<p>Are you sure you want to suspend all the supervisors?</p>
</AsyncActionDialog>
);
}
renderTerminateAllSupervisorAction() {
const { showTerminateAllSupervisors } = this.state;
if (!showTerminateAllSupervisors) return;
return (
<AsyncActionDialog
action={async () => {
const resp = await Api.instance.post(`/druid/indexer/v1/supervisor/terminateAll`, {});
return resp.data;
}}
confirmButtonText="Terminate all supervisors"
successText="All supervisors have been terminated"
failText="Could not terminate all supervisors"
intent={Intent.DANGER}
onClose={() => {
this.setState({ showTerminateAllSupervisors: false });
}}
onSuccess={() => {
this.supervisorQueryManager.rerunLastQuery();
}}
>
<p>Are you sure you want to terminate all the supervisors?</p>
</AsyncActionDialog>
);
}
render() {
const {
supervisorSpecDialogOpen,
alertErrorMsg,
supervisorTableActionDialogId,
supervisorTableActionDialogActions,
visibleColumns,
} = this.state;
return (
<div className="supervisors-view app-view">
<ViewControlBar label="Supervisors">
<RefreshButton
localStorageKey={LocalStorageKeys.SUPERVISORS_REFRESH_RATE}
onRefresh={auto => {
if (auto && hasOverlayOpen()) return;
this.supervisorQueryManager.rerunLastQuery(auto);
}}
/>
{this.renderBulkSupervisorActions()}
<TableColumnSelector
columns={SUPERVISOR_TABLE_COLUMNS}
onChange={column =>
this.setState(prevState => ({
visibleColumns: prevState.visibleColumns.toggle(column),
}))
}
onClose={this.fetchData}
tableColumnsHidden={visibleColumns.getHiddenColumns()}
/>
</ViewControlBar>
{this.renderSupervisorTable()}
{this.renderResumeSupervisorAction()}
{this.renderSuspendSupervisorAction()}
{this.renderTaskGroupHandoffAction()}
{this.renderResetOffsetsSupervisorAction()}
{this.renderResetSupervisorAction()}
{this.renderTerminateSupervisorAction()}
{supervisorSpecDialogOpen && (
<SpecDialog
onClose={this.closeSpecDialogs}
onSubmit={this.submitSupervisor}
title="Submit supervisor"
/>
)}
<AlertDialog
icon={IconNames.ERROR}
intent={Intent.PRIMARY}
isOpen={Boolean(alertErrorMsg)}
confirmButtonText="OK"
onConfirm={() => this.setState({ alertErrorMsg: undefined })}
>
<p>{alertErrorMsg}</p>
</AlertDialog>
{supervisorTableActionDialogId && (
<SupervisorTableActionDialog
supervisorId={supervisorTableActionDialogId}
actions={supervisorTableActionDialogActions}
onClose={() => this.setState({ supervisorTableActionDialogId: undefined })}
/>
)}
</div>
);
}
}