blob: 72946bedc0c027029c80c8cbb0fc81af57970657 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datastore
import (
"beam.apache.org/playground/backend/internal/constants"
"beam.apache.org/playground/backend/internal/db/entity"
"beam.apache.org/playground/backend/internal/logger"
"beam.apache.org/playground/backend/internal/utils"
"cloud.google.com/go/datastore"
"context"
"errors"
)
// GetCurrentDbMigrationVersion returns the current version of the schema
func (d *Datastore) GetCurrentDbMigrationVersion(ctx context.Context) (int, error) {
query := datastore.NewQuery(constants.SchemaKind).
Namespace(utils.GetNamespace(ctx)).Order("-version").Limit(1)
var schemas []*entity.SchemaEntity
if _, err := d.Client.GetAll(ctx, query, &schemas); err != nil {
logger.Errorf("Datastore: GetCurrentDbMigrationVersion(): error during getting current version, err: %s\n", err.Error())
return -1, err
}
if len(schemas) == 0 {
logger.Errorf("Datastore: GetCurrentDbMigrationVersion(): no schema versions found\n")
return -1, errors.New("no schema versions found")
}
return schemas[0].Version, nil
}
// HasSchemaVersion returns true if the schema version is applied
func (d *Datastore) hasSchemaVersion(ctx context.Context, version int) (bool, error) {
key := utils.GetSchemaVerKey(ctx, version)
schemaEntity := new(entity.SchemaEntity)
err := d.Client.Get(ctx, key, schemaEntity)
if err != nil {
if err == datastore.ErrNoSuchEntity {
return false, nil
}
logger.Errorf("Datastore: hasSchemaVersion(): error during getting schema version, err: %s\n", err.Error())
return false, err
}
logger.Infof("Datastore: hasSchemaVersion(): found SchemaEntity: %v\n", schemaEntity)
return true, nil
}
// applyMigration applies the given migration to the database.
func (d *Datastore) applyMigration(ctx context.Context, migration Migration, sdkConfigPath string) error {
logger.Infof("Datastore: applyMigration(): applying migration \"%d: %s\"\n", migration.GetVersion(), migration.GetDescription())
_, err := d.Client.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
if err := migration.Apply(ctx, tx, sdkConfigPath); err != nil {
logger.Errorf("Datastore: applyMigration(): error during migration \"%d: %s\" applying, rolling back, err: %s\n",
migration.GetVersion(),
migration.GetDescription(),
err.Error())
return err
}
// Record the migration version
if err := putSchemaVersion(ctx, tx, migration.GetVersion(), migration.GetDescription()); err != nil {
logger.Errorf("Datastore: applyMigration(): error during migration \"%d: %s\" applying, rolling back, err: %s\n",
migration.GetVersion(),
migration.GetDescription(),
err.Error())
return err
}
return nil
})
if err != nil {
logger.Errorf("Datastore: applyMigration(): error during migration \"%d: %s\" applying, err: %s\n",
migration.GetVersion(),
migration.GetDescription(),
err.Error())
return err
}
logger.Infof("Datastore: applyMigration(): migration \"%d: %s\" applied successfully\n", migration.GetVersion(), migration.GetDescription())
return nil
}
// ApplyMigrations applies all migrations to the database.
func (d *Datastore) ApplyMigrations(ctx context.Context, migrations []Migration, sdkConfigPath string) error {
for _, migration := range migrations {
if applied, err := d.hasSchemaVersion(ctx, migration.GetVersion()); err != nil {
logger.Errorf("Datastore: ApplyMigrations(): Error checking migration \"%d: %s\" : %s", migration.GetVersion(), migration.GetDescription(), err.Error())
return err
} else if applied {
logger.Infof("Datastore: ApplyMigrations(): migration \"%d: %s\" already applied, skipping\n", migration.GetVersion(), migration.GetDescription())
continue
}
if err := d.applyMigration(ctx, migration, sdkConfigPath); err != nil {
logger.Errorf("Datastore: ApplyMigrations(): Error applying migration \"%d: %s\" : %s", migration.GetVersion(), migration.GetDescription(), err.Error())
return err
}
}
return nil
}
// putSchemaVersion puts the schema entity to datastore
func putSchemaVersion(ctx context.Context, tx *datastore.Transaction, version int, description string) error {
key := utils.GetSchemaVerKey(ctx, version)
if _, err := tx.Put(key, &entity.SchemaEntity{
Version: version,
Descr: description,
}); err != nil {
logger.Errorf("Datastore: putSchemaVersion(): error during entity saving, err: %s\n", err.Error())
return err
}
return nil
}
func (d *Datastore) PutSDKs(ctx context.Context, sdks []*entity.SDKEntity) error {
_, err := d.Client.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
return TxPutSDKs(ctx, tx, sdks)
})
if err != nil {
logger.Errorf("Datastore: PutSDKs(): error during transaction, err: %s\n", err.Error())
return err
}
return nil
}
// TxPutSDKs puts the SDK entity to datastore in a transaction
func TxPutSDKs(ctx context.Context, tx *datastore.Transaction, sdks []*entity.SDKEntity) error {
if sdks == nil || len(sdks) == 0 {
logger.Errorf("Datastore: TxPutSDKs(): sdks are empty")
return nil
}
var keys []*datastore.Key
for _, sdk := range sdks {
keys = append(keys, utils.GetSdkKey(ctx, sdk.Name))
}
if _, err := tx.PutMulti(keys, sdks); err != nil {
logger.Errorf("Datastore: TxPutSDKs(): error during entity saving, err: %s\n", err.Error())
return err
}
return nil
}