blob: 4c4f5ee3730426e061fafa9ef53ec10b8201fdf3 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package datastore
import (
"bytes"
"encoding/gob"
"fmt"
"math/rand"
"time"
"github.com/pkg/errors"
bolt "go.etcd.io/bbolt"
)
// BoltBackend implements Backend interface with embedded Bolt storage
type BoltBackend struct {
db *bolt.DB
}
// NewBoltBackend creates a new Bolt backend using CoreOS bbolt implementation
func NewBoltBackend(filename string) (Backend, error) {
db, err := bolt.Open(
filename,
0600,
&bolt.Options{
Timeout: 1 * time.Second,
},
)
if err != nil {
return nil, errors.Wrap(err, "initialize bolt datastore backend")
}
return &BoltBackend{
db: db,
}, nil
}
// Set stores the value for a key of datatype using bolt datastore
func (bb *BoltBackend) Set(datatype, key string, value []byte, indexData map[string]string) error {
return bb.db.Update(func(tx *bolt.Tx) error {
// Get or create the root bucket for the datatype
bk, err := tx.CreateBucketIfNotExists([]byte(datatype))
if err != nil {
return errors.Wrap(err, "failed to create root bucket")
}
// Get or crerate the data bucket for storing the key/values
dataBk, err := bk.CreateBucketIfNotExists([]byte("data"))
if err != nil {
return errors.Wrap(err, "failed to create data bucket")
}
// Store data in the data bucket
if err := dataBk.Put([]byte(key), value); err != nil {
return errors.Wrap(err, "failed to put data")
}
// Delete the previous indexes if they exists
if err := deleteIndexes(key, bk); err != nil {
return errors.Wrap(err, "delete old indexes")
}
// Perform indexing
if err := createIndexes(key, indexData, bk); err != nil {
return errors.Wrap(err, "create indexes")
}
return nil
})
}
// Get retreives the value for specified key and datatyoe
// Returns ErrKeyNotFound if the key has no value set
func (bb *BoltBackend) Get(datatype, key string) (data []byte, err error) {
err = bb.db.View(func(tx *bolt.Tx) error {
// Get the root bucket
bk := tx.Bucket([]byte(datatype))
if bk == nil {
return ErrKeyNotFound
}
dataBk := bk.Bucket([]byte("data"))
if dataBk == nil {
return ErrKeyNotFound
}
data = dataBk.Get([]byte(key))
if data == nil {
return ErrKeyNotFound
}
return nil
})
return
}
// Del deletes a key and all the indexes
func (bb *BoltBackend) Del(datatype, key string) error {
return bb.db.Update(func(tx *bolt.Tx) error {
// Get the root bucket
bk := tx.Bucket([]byte(datatype))
if bk == nil {
return nil
}
dataBk := bk.Bucket([]byte("data"))
if dataBk == nil {
return nil
}
if err := deleteIndexes(key, bk); err != nil {
return errors.Wrap(err, "delete indexes")
}
return dataBk.Delete([]byte(key))
})
}
type iterFunc func() ([]byte, []byte)
// ListKeys lists all keys for specified datatype
func (bb *BoltBackend) ListKeys(datatype, index string, skip, limit int, reverse bool) (keys []string, err error) {
err = bb.db.View(func(tx *bolt.Tx) error {
// Get the root bucket
bk := tx.Bucket([]byte(datatype))
if bk == nil {
return nil
}
indexBk := bk.Bucket([]byte(fmt.Sprintf("index-%s", index)))
if indexBk == nil {
return nil
}
c := indexBk.Cursor()
var first, next iterFunc
switch reverse {
default:
first = c.First
next = c.Next
case true:
first = c.Last
next = c.Prev
}
i := 0
for k, v := first(); k != nil; k, v = next() {
i++
if i <= skip {
continue
}
keys = append(keys, string(v))
if limit > 0 && len(keys) >= limit {
break
}
}
return nil
})
return
}
// Close closes the database
func (bb *BoltBackend) Close() error {
return errors.Wrap(bb.db.Close(), "close bolt datastore backend database")
}
func createIndexes(key string, indexData map[string]string, rootBucket *bolt.Bucket) error {
// Iterate over index values
for indexName, v := range indexData {
indexBk, err := rootBucket.CreateBucketIfNotExists([]byte(fmt.Sprintf("index-%s", indexName)))
if err != nil {
return errors.Wrapf(err, "failed to create index bucket for index %s", indexName)
}
// generate unique and sortable key
valueKey := createIndexValueKey(v)
if err := indexBk.Put([]byte(valueKey), []byte(key)); err != nil {
return errors.Wrap(err, "failed to put index data")
}
// update the index data with the actual key
indexData[indexName] = valueKey
}
b := new(bytes.Buffer)
if err := gob.NewEncoder(b).Encode(&indexData); err != nil {
return errors.Wrap(err, "failed to encode index data")
}
return rootBucket.Put([]byte(fmt.Sprintf("indexes-%s", key)), b.Bytes())
}
func deleteIndexes(key string, rootBucket *bolt.Bucket) error {
kiData := rootBucket.Get([]byte(fmt.Sprintf("indexes-%s", key)))
if kiData == nil {
return nil
}
indexData := map[string]string{}
b := bytes.NewBuffer(kiData)
if err := gob.NewDecoder(b).Decode(&indexData); err != nil {
return errors.Wrap(err, "invalid index data")
}
for indexName, v := range indexData {
indexBk := rootBucket.Bucket([]byte(fmt.Sprintf("index-%s", indexName)))
if indexBk == nil {
return errors.Errorf("index not found: %s", indexName)
}
if err := indexBk.Delete([]byte(v)); err != nil {
return nil
}
}
return rootBucket.Delete([]byte(fmt.Sprintf("indexes-%s", key)))
}
func createIndexValueKey(v string) string {
randBytes := make([]byte, 8)
rand.Read(randBytes)
return fmt.Sprintf("%s\x00%s%x", v, time.Now().UTC().Format(time.RFC3339), randBytes)
}
func init() {
rand.Seed(time.Now().UnixNano())
}