blob: 01625855bda16630deb885c36a997bbd85569f6c [file] [log] [blame]
package integration
import (
"fmt"
"github.com/greenplum-db/gpbackup/backup"
"github.com/greenplum-db/gpbackup/testutils"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
var _ = Describe("synchronized snapshot integration tests", func() {
BeforeEach(func() {
if false {
Skip("snapshot feature not supported")
}
})
Describe("GetSynchronizedSnapshot", func() {
It("returns a correctly formatted snapshot identifier", func() {
snapshotId, err := backup.GetSynchronizedSnapshot(connectionPool)
Expect(err).ToNot(HaveOccurred())
Expect(snapshotId).ToNot(BeEmpty())
Expect(snapshotId).To(MatchRegexp("[0-9a-fA-F]+-\\d"))
})
It("returns a valid snapshot identifier that can be used to set a snapshot", func() {
exportConn := testutils.SetupTestDbConn("testdb")
importConn := testutils.SetupTestDbConn("testdb")
exportConn.MustBegin()
importConn.MustBegin()
snapshotId, err := backup.GetSynchronizedSnapshot(exportConn)
Expect(err).ToNot(HaveOccurred())
_, err = importConn.Exec(fmt.Sprintf("SET TRANSACTION SNAPSHOT '%s'", snapshotId), 0)
Expect(err).ToNot(HaveOccurred())
exportConn.MustCommit()
importConn.MustCommit()
exportConn.Close()
importConn.Close()
})
})
Describe("SetSynchronizedSnapshot", func() {
It("sets snapshot to snapshotId", func() {
var snapshotId string
exportConn := testutils.SetupTestDbConn("testdb")
importConn := testutils.SetupTestDbConn("testdb")
exportConn.MustBegin()
importConn.MustBegin()
err := exportConn.Get(&snapshotId, "SELECT pg_catalog.pg_export_snapshot()", 0)
Expect(err).ToNot(HaveOccurred())
err = backup.SetSynchronizedSnapshot(importConn, 0, snapshotId)
Expect(err).ToNot(HaveOccurred())
exportConn.MustCommit()
importConn.MustCommit()
exportConn.Close()
importConn.Close()
})
})
Describe("functionality tests", func() {
BeforeEach(func() {
connectionPool.Exec("CREATE TABLE IF NOT EXISTS snapshot_test (a int)")
connectionPool.Exec("TRUNCATE TABLE snapshot_test")
})
It("handles concurrent insert", func() {
query := "SELECT count(*) FROM snapshot_test"
var result int
exportConn := testutils.SetupTestDbConn("testdb")
importConn := testutils.SetupTestDbConn("testdb")
connectionPool.Exec("INSERT INTO snapshot_test values(1)")
exportConn.MustBegin(0)
snapshotId, err := backup.GetSynchronizedSnapshot(exportConn)
Expect(err).ToNot(HaveOccurred())
// External command inserts row
connectionPool.Exec("INSERT INTO snapshot_test values(2)")
connectionPool.Get(&result, query)
Expect(result).To(Equal(2))
// export should not see inserted row
err = exportConn.Get(&result, query)
Expect(err).ToNot(HaveOccurred())
Expect(result).To(Equal(1))
// import should see inserted row
err = importConn.Get(&result, query)
Expect(err).ToNot(HaveOccurred())
Expect(result).To(Equal(2))
importConn.MustBegin(0)
err = backup.SetSynchronizedSnapshot(importConn, 0, snapshotId)
// Should see snapshot with 1 row
Expect(err).ToNot(HaveOccurred())
importConn.Get(&result, query)
Expect(result).To(Equal(1))
// after commits, inserted row is visible to both conns
err = exportConn.Commit(0)
Expect(err).ToNot(HaveOccurred())
err = importConn.Commit(0)
Expect(err).ToNot(HaveOccurred())
err = exportConn.Get(&result, query)
Expect(err).ToNot(HaveOccurred())
Expect(result).To(Equal(2))
err = importConn.Get(&result, query)
Expect(err).ToNot(HaveOccurred())
Expect(result).To(Equal(2))
exportConn.Close()
importConn.Close()
})
It("handles concurrent update", func() {
var result int
query := "SELECT count(*) from snapshot_test WHERE a=99"
exportConn := testutils.SetupTestDbConn("testdb")
importConn := testutils.SetupTestDbConn("testdb")
connectionPool.Exec("INSERT INTO snapshot_test SELECT a FROM generate_series(1,5) a")
// export the snapshot
exportConn.MustBegin(0)
snapshotId, err := backup.GetSynchronizedSnapshot(exportConn)
Expect(err).ToNot(HaveOccurred())
Expect(snapshotId).ToNot(BeEmpty())
// external command updates row
_, err = connectionPool.Exec("UPDATE snapshot_test SET a=99 WHERE a=1")
Expect(err).ToNot(HaveOccurred())
err = importConn.Get(&result, query)
Expect(err).ToNot(HaveOccurred())
Expect(result).To(Equal(1))
// export should not see updated row
err = exportConn.Get(&result, query)
Expect(err).ToNot(HaveOccurred())
Expect(result).To(Equal(0))
// import should see updated row
err = importConn.Get(&result, query)
Expect(err).ToNot(HaveOccurred())
Expect(result).To(Equal(1))
// now set snapshot, import should not see updated row
importConn.MustBegin(0)
err = backup.SetSynchronizedSnapshot(importConn, 0, snapshotId)
Expect(err).ToNot(HaveOccurred())
err = importConn.Get(&result, query)
Expect(err).ToNot(HaveOccurred())
Expect(result).To(Equal(0))
// after commits, updated row is visible to both conns
err = exportConn.Commit(0)
Expect(err).ToNot(HaveOccurred())
err = importConn.Commit(0)
Expect(err).ToNot(HaveOccurred())
err = exportConn.Get(&result, query)
Expect(err).ToNot(HaveOccurred())
Expect(result).To(Equal(1))
err = importConn.Get(&result, query)
Expect(err).ToNot(HaveOccurred())
Expect(result).To(Equal(1))
exportConn.Close()
importConn.Close()
})
It("handles concurrent delete", func() {
query := "SELECT count(*) FROM snapshot_test"
var result int
exportConn := testutils.SetupTestDbConn("testdb")
importConn := testutils.SetupTestDbConn("testdb")
connectionPool.Exec("INSERT INTO snapshot_test SELECT a FROM generate_series(1,5) a")
exportConn.MustBegin(0)
snapshotId, err := backup.GetSynchronizedSnapshot(exportConn)
Expect(err).ToNot(HaveOccurred())
// External command deletes row
connectionPool.Exec("DELETE FROM snapshot_test where a=1")
connectionPool.Get(&result, query)
Expect(result).To(Equal(4))
// export should see all 5 rows
err = exportConn.Get(&result, query)
Expect(err).ToNot(HaveOccurred())
Expect(result).To(Equal(5))
// import should see 4 rows
err = importConn.Get(&result, query)
Expect(err).ToNot(HaveOccurred())
Expect(result).To(Equal(4))
importConn.MustBegin(0)
err = backup.SetSynchronizedSnapshot(importConn, 0, snapshotId)
// Should see snapshot with 5 rows
Expect(err).ToNot(HaveOccurred())
importConn.Get(&result, query)
Expect(result).To(Equal(5))
// after commits, deleted row is not visible to either conn
err = exportConn.Commit(0)
Expect(err).ToNot(HaveOccurred())
err = importConn.Commit(0)
Expect(err).ToNot(HaveOccurred())
err = exportConn.Get(&result, query)
Expect(err).ToNot(HaveOccurred())
Expect(result).To(Equal(4))
err = importConn.Get(&result, query)
Expect(err).ToNot(HaveOccurred())
Expect(result).To(Equal(4))
exportConn.Close()
importConn.Close()
})
})
})