blob: 5de738a38cccc32202b6b911aa40ee154e2b29d4 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tasks
import (
"encoding/json"
"sort"
"strings"
"time"
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/models/common"
"github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
"github.com/apache/incubator-devlake/plugins/argocd/models"
)
var _ plugin.SubTaskEntryPoint = ExtractSyncOperations
var ExtractSyncOperationsMeta = plugin.SubTaskMeta{
Name: "extractSyncOperations",
EntryPoint: ExtractSyncOperations,
EnabledByDefault: true,
Description: "Extract sync operations from raw data",
DependencyTables: []string{RAW_SYNC_OPERATION_TABLE},
ProductTables: []string{models.ArgocdSyncOperation{}.TableName()},
}
type ArgocdApiSyncOperation struct {
// For history entries
ID int64 `json:"id"`
Revision string `json:"revision"`
DeployedAt time.Time `json:"deployedAt"`
DeployStartedAt *time.Time `json:"deployStartedAt"`
Source struct {
RepoURL string `json:"repoURL"`
} `json:"source"`
InitiatedBy struct {
Username string `json:"username"`
Automated bool `json:"automated"`
} `json:"initiatedBy"`
Metadata ArgocdApiSyncOperationMetadata `json:"metadata"`
Operation ArgocdApiSyncOperationDetails `json:"operation"`
// For operationState (current operation)
Phase string `json:"phase"` // Succeeded, Failed, Error, Running, Terminating
Message string `json:"message"`
StartedAt time.Time `json:"startedAt"`
FinishedAt *time.Time `json:"finishedAt"`
SyncResult struct {
Revision string `json:"revision"`
Resources []ArgocdApiSyncResourceItem `json:"resources"`
} `json:"syncResult"`
}
type ArgocdApiSyncResourceItem struct {
Group string `json:"group"`
Version string `json:"version"`
Kind string `json:"kind"`
Namespace string `json:"namespace"`
Name string `json:"name"`
Status string `json:"status"`
Message string `json:"message"`
Images []string `json:"images"`
}
type ArgocdApiSyncOperationMetadata struct {
Images []string `json:"images"`
Resources []ArgocdApiSyncOperationMetadataResource `json:"resources"`
}
type ArgocdApiSyncOperationMetadataResource struct {
Images []string `json:"images"`
}
type ArgocdApiSyncOperationDetails struct {
Metadata ArgocdApiSyncOperationMetadata `json:"metadata"`
Sync ArgocdApiSyncOperationSync `json:"sync"`
}
type ArgocdApiSyncOperationSync struct {
Resources []ArgocdApiSyncResourceItem `json:"resources"`
}
func ExtractSyncOperations(taskCtx plugin.SubTaskContext) errors.Error {
data := taskCtx.GetData().(*ArgocdTaskData)
var summaryImages []string
application := &models.ArgocdApplication{}
db := taskCtx.GetDal()
if err := db.First(
application,
dal.Where("connection_id = ? AND name = ?", data.Options.ConnectionId, data.Options.ApplicationName),
); err != nil {
if !db.IsErrorNotFound(err) {
return errors.Default.Wrap(err, "error loading argocd application for summary images")
}
} else {
summaryImages = application.SummaryImages
}
summaryImages = normalizeImages(summaryImages)
revisionImageCache := make(map[string][]string)
revisionRecords := make([]models.ArgocdRevisionImage, 0)
err := db.All(
&revisionRecords,
dal.Where("connection_id = ? AND application_name = ?", data.Options.ConnectionId, data.Options.ApplicationName),
)
if err != nil && !db.IsErrorNotFound(err) {
return errors.Default.Wrap(err, "error loading argocd revision images")
}
for _, record := range revisionRecords {
if record.Revision == "" {
continue
}
normalized := normalizeImages(record.Images)
if len(normalized) == 0 {
continue
}
revisionImageCache[record.Revision] = normalized
}
revisionDirty := make(map[string][]string)
extractor, err := api.NewApiExtractor(api.ApiExtractorArgs{
RawDataSubTaskArgs: api.RawDataSubTaskArgs{
Ctx: taskCtx,
Table: RAW_SYNC_OPERATION_TABLE,
Params: models.ArgocdApiParams{
ConnectionId: data.Options.ConnectionId,
Name: data.Options.ApplicationName,
},
},
Extract: func(row *api.RawData) ([]interface{}, errors.Error) {
var apiOp ArgocdApiSyncOperation
err := json.Unmarshal(row.Data, &apiOp)
if err != nil {
return nil, errors.Default.Wrap(err, "error unmarshaling sync operation")
}
syncOp := &models.ArgocdSyncOperation{
ConnectionId: data.Options.ConnectionId,
ApplicationName: data.Options.ApplicationName,
NoPKModel: common.NewNoPKModel(),
}
normalize := func(t time.Time) *time.Time {
if t.IsZero() {
return nil
}
return &t
}
normalizePtr := func(t *time.Time) *time.Time {
if t == nil {
return nil
}
if t.IsZero() {
return nil
}
return t
}
isOperationState := apiOp.Phase != ""
if !isOperationState && apiOp.DeployedAt.IsZero() && apiOp.Revision == "" {
return nil, nil
}
if isOperationState {
start := normalize(apiOp.StartedAt)
if start != nil {
syncOp.DeploymentId = start.Unix()
} else {
syncOp.DeploymentId = time.Now().Unix()
}
syncOp.Revision = apiOp.SyncResult.Revision
syncOp.StartedAt = start
syncOp.FinishedAt = normalizePtr(apiOp.FinishedAt)
syncOp.Phase = apiOp.Phase
syncOp.Message = apiOp.Message
syncOp.ResourcesCount = len(apiOp.SyncResult.Resources)
} else {
deployedAt := normalize(apiOp.DeployedAt)
if deployedAt != nil {
syncOp.DeploymentId = deployedAt.Unix()
} else {
syncOp.DeploymentId = time.Now().Unix()
}
syncOp.Revision = apiOp.Revision
syncOp.FinishedAt = deployedAt
start := normalizePtr(apiOp.DeployStartedAt)
if start == nil {
start = deployedAt
}
syncOp.StartedAt = start
syncOp.Phase = "Succeeded"
if apiOp.InitiatedBy.Automated {
syncOp.InitiatedBy = "automated"
} else {
syncOp.InitiatedBy = apiOp.InitiatedBy.Username
}
}
syncOp.Kind = extractPrimaryDeploymentKind(apiOp.SyncResult.Resources)
payloadImages := collectContainerImages(&apiOp)
images := copyStringSlice(payloadImages)
if len(images) > 0 {
if syncOp.Revision != "" {
cached := revisionImageCache[syncOp.Revision]
if !stringSlicesEqual(cached, images) {
revisionImageCache[syncOp.Revision] = copyStringSlice(images)
revisionDirty[syncOp.Revision] = copyStringSlice(images)
}
}
} else if syncOp.Revision != "" {
if cached := revisionImageCache[syncOp.Revision]; len(cached) > 0 {
images = copyStringSlice(cached)
} else if len(summaryImages) > 0 {
// Fallback: use application summary images and cache for this revision.
images = copyStringSlice(summaryImages)
revisionImageCache[syncOp.Revision] = copyStringSlice(images)
revisionDirty[syncOp.Revision] = copyStringSlice(images)
}
}
if len(images) > 0 {
syncOp.ContainerImages = images
}
results := []interface{}{syncOp}
if syncOp.Revision != "" {
if dirtyImages, ok := revisionDirty[syncOp.Revision]; ok && len(dirtyImages) > 0 {
revision := &models.ArgocdRevisionImage{
ConnectionId: syncOp.ConnectionId,
ApplicationName: syncOp.ApplicationName,
Revision: syncOp.Revision,
Images: copyStringSlice(dirtyImages),
}
results = append(results, revision)
delete(revisionDirty, syncOp.Revision)
}
}
return results, nil
},
})
if err != nil {
return err
}
return extractor.Execute()
}
// extractPrimaryDeploymentKind identifies the primary deployment resource kind from the resources list.
// It prioritizes actual deployment resources over supporting resources like Services or Ingresses.
func extractPrimaryDeploymentKind(resources []ArgocdApiSyncResourceItem) string {
// Priority order for deployment resources
priorityKinds := []string{
"Rollout", // Argo Rollouts
"Deployment", // Standard K8s Deployment
"StatefulSet", // Stateful applications
"DaemonSet", // Node-level deployments
"ReplicaSet", // Direct ReplicaSet management
"Job", // Batch jobs
"CronJob", // Scheduled jobs
}
for _, priorityKind := range priorityKinds {
for _, resource := range resources {
if resource.Kind == priorityKind {
return priorityKind
}
}
}
for _, resource := range resources {
if resource.Kind != "" {
return resource.Kind
}
}
return ""
}
func collectContainerImages(apiOp *ArgocdApiSyncOperation) []string {
if apiOp == nil {
return nil
}
var collected []string
appendAll := func(images []string) {
if len(images) == 0 {
return
}
collected = append(collected, images...)
}
appendMetadataResources := func(resources []ArgocdApiSyncOperationMetadataResource) {
for _, r := range resources {
appendAll(r.Images)
}
}
appendResourceItems := func(resources []ArgocdApiSyncResourceItem) {
for _, r := range resources {
appendAll(r.Images)
}
}
appendAll(apiOp.Metadata.Images)
appendMetadataResources(apiOp.Metadata.Resources)
appendAll(apiOp.Operation.Metadata.Images)
appendMetadataResources(apiOp.Operation.Metadata.Resources)
appendResourceItems(apiOp.Operation.Sync.Resources)
appendResourceItems(apiOp.SyncResult.Resources)
return normalizeImages(collected)
}
func normalizeImages(images []string) []string {
if len(images) == 0 {
return nil
}
uniq := make(map[string]struct{}, len(images))
for _, image := range images {
trimmed := strings.TrimSpace(image)
if trimmed == "" {
continue
}
uniq[trimmed] = struct{}{}
}
if len(uniq) == 0 {
return nil
}
normalized := make([]string, 0, len(uniq))
for image := range uniq {
normalized = append(normalized, image)
}
sort.Strings(normalized)
return normalized
}
func copyStringSlice(values []string) []string {
if len(values) == 0 {
return nil
}
dup := make([]string, len(values))
copy(dup, values)
return dup
}
func stringSlicesEqual(a, b []string) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}