blob: c7281ad480c6b10255b405e0d6d9677ea892dc5f [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package vector_sync
import (
"context"
"github.com/apache/answer/internal/base/data"
"github.com/apache/answer/internal/base/queue"
"github.com/apache/answer/internal/repo/vector_search_sync"
"github.com/apache/answer/pkg/uid"
"github.com/apache/answer/plugin"
"github.com/segmentfault/pacman/log"
)
const (
ActionUpsert = "upsert"
ActionDelete = "delete"
ObjectTypeQuestion = "question"
ObjectTypeAnswer = "answer"
)
const maxRetry = 3
type Task struct {
Action string
ObjectType string
ObjectID string
}
type Service queue.Service[*Task]
func NewService(data *data.Data) Service {
q := queue.New[*Task]("vector_sync", 128)
q.RegisterHandler(func(ctx context.Context, msg *Task) error {
return handle(ctx, data, msg)
})
return q
}
func handle(ctx context.Context, data *data.Data, msg *Task) error {
if msg == nil || msg.ObjectID == "" {
return nil
}
var vectorSearch plugin.VectorSearch
_ = plugin.CallVectorSearch(func(vs plugin.VectorSearch) error {
vectorSearch = vs
return nil
})
if vectorSearch == nil {
return nil
}
objectID := uid.DeShortID(msg.ObjectID)
var lastErr error
for attempt := 1; attempt <= maxRetry; attempt++ {
err := handleOnce(ctx, data, vectorSearch, msg.Action, msg.ObjectType, objectID)
if err == nil {
return nil
}
lastErr = err
log.Warnf("vector sync failed: action=%s object_type=%s object_id=%s attempt=%d err=%v",
msg.Action, msg.ObjectType, objectID, attempt, err)
}
return lastErr
}
func handleOnce(ctx context.Context, data *data.Data, vectorSearch plugin.VectorSearch,
action, objectType, objectID string) error {
if action == ActionDelete {
return vectorSearch.DeleteContent(ctx, objectID)
}
if action != ActionUpsert {
return nil
}
var (
content *plugin.VectorSearchContent
err error
)
switch objectType {
case ObjectTypeQuestion:
content, err = vector_search_sync.BuildQuestionContentByID(ctx, data, objectID)
case ObjectTypeAnswer:
content, err = vector_search_sync.BuildAnswerContentByID(ctx, data, objectID)
default:
return nil
}
if err != nil {
return err
}
if content == nil {
return vectorSearch.DeleteContent(ctx, objectID)
}
return vectorSearch.UpdateContent(ctx, content)
}