blob: 20caad91406ae35b18143a148028518ba04e45dd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Dispatch, Middleware, MiddlewareAPI } from 'redux';
import { makeApi, SupersetClient } from '@superset-ui/core';
import { SupersetError } from 'src/components/ErrorMessage/types';
import { FeatureFlag, isFeatureEnabled } from '../featureFlags';
import {
getClientErrorObject,
parseErrorJson,
} from '../utils/getClientErrorObject';
export type AsyncEvent = {
id: string;
channel_id: string;
job_id: string;
user_id: string;
status: string;
errors: SupersetError[];
result_url: string;
};
type AsyncEventOptions = {
config: {
GLOBAL_ASYNC_QUERIES_TRANSPORT: string;
GLOBAL_ASYNC_QUERIES_POLLING_DELAY: number;
};
getPendingComponents: (state: any) => any[];
successAction: (componentId: number, componentData: any) => { type: string };
errorAction: (componentId: number, response: any) => { type: string };
processEventsCallback?: (events: AsyncEvent[]) => void; // this is currently used only for tests
};
type CachedDataResponse = {
componentId: number;
status: string;
data: any;
};
const initAsyncEvents = (options: AsyncEventOptions) => {
// TODO: implement websocket support
const TRANSPORT_POLLING = 'polling';
const {
config,
getPendingComponents,
successAction,
errorAction,
processEventsCallback,
} = options;
const transport = config.GLOBAL_ASYNC_QUERIES_TRANSPORT || TRANSPORT_POLLING;
const polling_delay = config.GLOBAL_ASYNC_QUERIES_POLLING_DELAY || 500;
const middleware: Middleware = (store: MiddlewareAPI) => (next: Dispatch) => {
const JOB_STATUS = {
PENDING: 'pending',
RUNNING: 'running',
ERROR: 'error',
DONE: 'done',
};
const LOCALSTORAGE_KEY = 'last_async_event_id';
const POLLING_URL = '/api/v1/async_event/';
let lastReceivedEventId: string | null;
try {
lastReceivedEventId = localStorage.getItem(LOCALSTORAGE_KEY);
} catch (err) {
console.warn('Failed to fetch last event Id from localStorage');
}
const fetchEvents = makeApi<
{ last_id?: string | null },
{ result: AsyncEvent[] }
>({
method: 'GET',
endpoint: POLLING_URL,
});
const fetchCachedData = async (
asyncEvent: AsyncEvent,
componentId: number,
): Promise<CachedDataResponse> => {
let status = 'success';
let data;
try {
const { json } = await SupersetClient.get({
endpoint: asyncEvent.result_url,
});
data = 'result' in json ? json.result : json;
} catch (response) {
status = 'error';
data = await getClientErrorObject(response);
}
return { componentId, status, data };
};
const setLastId = (asyncEvent: AsyncEvent) => {
lastReceivedEventId = asyncEvent.id;
try {
localStorage.setItem(LOCALSTORAGE_KEY, lastReceivedEventId as string);
} catch (err) {
console.warn('Error saving event Id to localStorage', err);
}
};
const processEvents = async () => {
const state = store.getState();
const queuedComponents = getPendingComponents(state);
const eventArgs = lastReceivedEventId
? { last_id: lastReceivedEventId }
: {};
const events: AsyncEvent[] = [];
if (queuedComponents && queuedComponents.length) {
try {
const { result: events } = await fetchEvents(eventArgs);
if (events && events.length) {
const componentsByJobId = queuedComponents.reduce((acc, item) => {
acc[item.asyncJobId] = item;
return acc;
}, {});
const fetchDataEvents: Promise<CachedDataResponse>[] = [];
events.forEach((asyncEvent: AsyncEvent) => {
const component = componentsByJobId[asyncEvent.job_id];
if (!component) {
console.warn(
'Component not found for job_id',
asyncEvent.job_id,
);
return setLastId(asyncEvent);
}
const componentId = component.id;
switch (asyncEvent.status) {
case JOB_STATUS.DONE:
fetchDataEvents.push(
fetchCachedData(asyncEvent, componentId),
);
break;
case JOB_STATUS.ERROR:
store.dispatch(
errorAction(componentId, [parseErrorJson(asyncEvent)]),
);
break;
default:
console.warn('Received event with status', asyncEvent.status);
}
return setLastId(asyncEvent);
});
const fetchResults = await Promise.all(fetchDataEvents);
fetchResults.forEach(result => {
const data = Array.isArray(result.data)
? result.data
: [result.data];
if (result.status === 'success') {
store.dispatch(successAction(result.componentId, data));
} else {
store.dispatch(errorAction(result.componentId, data));
}
});
}
} catch (err) {
console.warn(err);
}
}
if (processEventsCallback) processEventsCallback(events);
return setTimeout(processEvents, polling_delay);
};
if (
isFeatureEnabled(FeatureFlag.GLOBAL_ASYNC_QUERIES) &&
transport === TRANSPORT_POLLING
) {
processEvents();
}
return action => next(action);
};
return middleware;
};
export default initAsyncEvents;