| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| import { useState, useEffect, useCallback, useMemo } from 'react' |
| import { DEVLAKE_ENDPOINT } from '@/utils/config' |
| import request from '@/utils/request' |
| import { NullPipelineRun } from '@/data/NullPipelineRun' |
| import { ToastNotification } from '@/components/Toast' |
| import { Providers } from '@/data/Providers' |
| import { Intent } from '@blueprintjs/core' |
| // import { integrationsData } from '@/data/integrations' |
| |
| function usePipelineManager (myPipelineName = `COLLECTION ${Date.now()}`, initialTasks = []) { |
| // const [integrations, setIntegrations] = useState(integrationsData) |
| const [pipelineName, setPipelineName] = useState(myPipelineName ?? `COLLECTION ${Date.now()}`) |
| const [isFetching, setIsFetching] = useState(false) |
| const [isFetchingAll, setIsFetchingAll] = useState(false) |
| const [isRunning, setIsRunning] = useState(false) |
| const [isCancelling, setIsCancelling] = useState(false) |
| const [errors, setErrors] = useState([]) |
| const [settings, setSettings] = useState({ |
| name: pipelineName, |
| plan: [ |
| [...initialTasks] |
| ] |
| }) |
| |
| const [pipelines, setPipelines] = useState([]) |
| const [pipelineCount, setPipelineCount] = useState(0) |
| const [activePipeline, setActivePipeline] = useState(NullPipelineRun) |
| const [lastRunId, setLastRunId] = useState(null) |
| const [pipelineRun, setPipelineRun] = useState(NullPipelineRun) |
| const [allowedProviders, setAllowedProviders] = useState([ |
| Providers.JIRA, |
| Providers.GITLAB, |
| Providers.JENKINS, |
| Providers.GITHUB, |
| Providers.REFDIFF, |
| Providers.GITEXTRACTOR, |
| Providers.FEISHU, |
| Providers.AE, |
| Providers.DBT, |
| Providers.TAPD |
| ]) |
| |
| const PIPELINES_ENDPOINT = useMemo(() => `${DEVLAKE_ENDPOINT}/pipelines`, [DEVLAKE_ENDPOINT]) |
| const [logfile, setLogfile] = useState('logging.tar.gz') |
| |
| const runPipeline = useCallback((runSettings = null) => { |
| console.log('>> RUNNING PIPELINE....') |
| try { |
| setIsRunning(true) |
| setErrors([]) |
| ToastNotification.clear() |
| console.log('>> DISPATCHING PIPELINE REQUEST', runSettings || settings) |
| const run = async () => { |
| // @todo: remove "ID" fallback key when no longer needed |
| const p = await request.post(`${DEVLAKE_ENDPOINT}/pipelines`, runSettings || settings) |
| const t = await request.get(`${DEVLAKE_ENDPOINT}/pipelines/${p.data?.ID || p.data?.id}/tasks`) |
| console.log('>> RAW PIPELINE DATA FROM API...', p.data) |
| setPipelineRun({ ...p.data, ID: p.data?.ID || p.data?.id, tasks: [...t.data.tasks] }) |
| setLastRunId(p.data?.ID || p.data?.id) |
| ToastNotification.show({ message: `Created New Pipeline - ${pipelineName}.`, intent: Intent.SUCCESS, icon: 'small-tick' }) |
| setTimeout(() => { |
| setIsRunning(false) |
| }, 500) |
| } |
| run() |
| } catch (e) { |
| setIsRunning(false) |
| setErrors([e.message]) |
| console.log('>> FAILED TO RUN PIPELINE!!', e) |
| } |
| }, [pipelineName, settings]) |
| |
| const cancelPipeline = useCallback((pipelineID) => { |
| try { |
| setIsCancelling(true) |
| setErrors([]) |
| ToastNotification.clear() |
| console.log('>> DISPATCHING CANCEL PIPELINE REQUEST, RUN ID =', pipelineID) |
| const cancel = async () => { |
| const c = await request.delete(`${DEVLAKE_ENDPOINT}/pipelines/${pipelineID}`) |
| console.log('>> RAW PIPELINE CANCEL RUN RESPONSE FROM API...', c) |
| setPipelineRun(NullPipelineRun) |
| ToastNotification.show({ message: `Pipeline RUN ID - ${pipelineID} Cancelled`, intent: 'danger', icon: 'small-tick' }) |
| setTimeout(() => { |
| setIsCancelling(false) |
| }, 500) |
| } |
| cancel() |
| } catch (e) { |
| setIsCancelling(false) |
| setErrors([e.message]) |
| console.log('>> FAILED TO FETCH CANCEL PIPELINE RUN!!', pipelineID, e) |
| } |
| }, []) |
| |
| const fetchPipeline = useCallback((pipelineID, refresh = false) => { |
| if (!pipelineID) { |
| console.log('>> !ABORT! Pipeline ID Missing! Aborting Fetch...') |
| // return ToastNotification.show({ message: 'Pipeline ID Missing! Aborting Fetch...', intent: 'danger', icon: 'warning-sign' }) |
| } |
| try { |
| setIsFetching(true) |
| setErrors([]) |
| ToastNotification.clear() |
| console.log('>> FETCHING PIPELINE RUN DETAILS...') |
| const fetch = async () => { |
| const p = await request.get(`${DEVLAKE_ENDPOINT}/pipelines/${pipelineID}`) |
| const t = await request.get(`${DEVLAKE_ENDPOINT}/pipelines/${pipelineID}/tasks`) |
| console.log('>> RAW PIPELINE RUN DATA FROM API...', p.data) |
| console.log('>> RAW PIPELINE TASKS DATA FROM API...', t.data) |
| setActivePipeline({ |
| ...p.data, |
| id: p.data.id, |
| tasks: [...t.data.tasks] |
| }) |
| setPipelineRun((pR) => refresh ? { ...p.data, ID: p.data.id, tasks: [...t.data.tasks] } : pR) |
| setLastRunId((lrId) => refresh ? p.data?.id : lrId) |
| setTimeout(() => { |
| setIsFetching(false) |
| }, 500) |
| } |
| fetch() |
| } catch (e) { |
| setIsFetching(false) |
| setErrors([e.message]) |
| setActivePipeline(NullPipelineRun) |
| console.log('>> FAILED TO FETCH PIPELINE RUN!!', e) |
| } |
| }, []) |
| |
| const fetchPipelineTasks = useCallback(() => { |
| try { |
| setIsFetching(true) |
| setErrors([]) |
| ToastNotification.clear() |
| } catch (e) { |
| setIsFetching(false) |
| setErrors([e.message]) |
| console.log('>> FAILED TO FETCH PIPELINE RUN TASKS!!', e) |
| } |
| }, []) |
| |
| const fetchAllPipelines = useCallback((status = null, fetchTimeout = 500) => { |
| try { |
| setIsFetchingAll(true) |
| setErrors([]) |
| ToastNotification.clear() |
| console.log('>> FETCHING ALL PIPELINE RUNS...') |
| const fetchAll = async () => { |
| let queryParams = '?' |
| queryParams += status && ['TASK_COMPLETED', 'TASK_RUNNING', 'TASK_FAILED'].includes(status) |
| ? `status=${status}&` |
| : '' |
| const p = await request.get(`${DEVLAKE_ENDPOINT}/pipelines${queryParams}`) |
| console.log('>> RAW PIPELINES RUN DATA FROM API...', p.data?.pipelines) |
| let pipelines = p.data && p.data.pipelines ? [...p.data.pipelines] : [] |
| // @todo: remove "ID" fallback key when no longer needed |
| pipelines = pipelines.map(p => ({ ...p, ID: p.id })) |
| setPipelines(pipelines) |
| setPipelineCount(p.data ? p.data.count : 0) |
| setTimeout(() => { |
| setIsFetchingAll(false) |
| }, fetchTimeout) |
| } |
| fetchAll() |
| } catch (e) { |
| setIsFetchingAll(false) |
| setErrors([e.message]) |
| setPipelines([]) |
| setPipelineCount(0) |
| console.log('>> FAILED TO FETCH ALL PIPELINE RUNS!!', e) |
| } |
| }, []) |
| |
| const buildPipelineStages = useCallback((tasks = [], outputArray = false) => { |
| let stages = {} |
| let stagesArray = [] |
| tasks?.forEach(tS => { |
| stages = { |
| ...stages, |
| [tS.pipelineRow]: tasks?.filter(t => t.pipelineRow === tS.pipelineRow) |
| } |
| }) |
| stagesArray = Object.values(stages) |
| console.log('>>> BUILDING PIPELINE STAGES...', tasks, stages, stagesArray) |
| return outputArray ? stagesArray : stages |
| }, []) |
| |
| const detectPipelineProviders = useCallback((tasks, providers = allowedProviders) => { |
| return [...tasks?.flat().filter(aT => providers.includes(aT.Plugin || aT.plugin)).map(p => p.Plugin || p.plugin)] |
| }, [allowedProviders]) |
| |
| useEffect(() => { |
| console.log('>> PIPELINE MANAGER - RECEIVED RUN/TASK SETTINGS', settings) |
| }, [settings]) |
| |
| useEffect(() => { |
| |
| }, [pipelineName, initialTasks]) |
| |
| const getPipelineLogfile = useCallback((pipelineId = 0) => { |
| return `${PIPELINES_ENDPOINT}/${pipelineId}/${logfile}` |
| }, [PIPELINES_ENDPOINT, logfile]) |
| |
| return { |
| errors, |
| isRunning, |
| isFetching, |
| isFetchingAll, |
| isCancelling, |
| pipelineName, |
| settings, |
| setSettings, |
| setPipelineName, |
| pipelineRun, |
| activePipeline, |
| pipelines, |
| pipelineCount, |
| lastRunId, |
| logfile, |
| runPipeline, |
| cancelPipeline, |
| fetchPipeline, |
| fetchPipelineTasks, |
| fetchAllPipelines, |
| buildPipelineStages, |
| detectPipelineProviders, |
| allowedProviders, |
| setAllowedProviders, |
| getPipelineLogfile |
| } |
| } |
| |
| export default usePipelineManager |