BoltDB 使用入门实践 [图片] Abser Cat TechCats 成员/朋克程序员:看看,这就叫专业 Getting Started 安装 go get go.etcd.io/bbolt/... 会 get 两项 go package -> $GOPATH bolt command line -> ..

BoltDB 使用入门实践教程

BoltDB 使用入门实践

Abser Cat

Abser Cat

TechCats 成员/朋克程序员:看看,这就叫专业

Getting Started

安装

go get go.etcd.io/bbolt/...

会 get 两项

  1. go package -> $GOPATH
  2. bolt command line -> $GOBIN

Open Database

使用 kv 数据库都很简单,只需要一个文件路径即可搭建完成环境。

package main

import (
    "log"

    bolt "go.etcd.io/bbolt"
)

func main() {
    // Open the my.db data file in your current directory.
    // It will be created if it doesn't exist.
    db, err := bolt.Open("my.db", 0600, nil)
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    ...
}

这里到 db 不支持多链接。这是因为对于 database file 一个链接保持了一个文件锁 file lock

如果并发,后续链接会阻塞。

可以为单个链接添加 超时控制

db, err := bolt.Open("my.db", 0600, &bolt.Options{Timeout: 1 * time.Second})

Transaction

本文无关

与 google 的 levelDB 不同,bbolt 支持事务。 detail bolt 优缺点:detail 同时 bbolt 出自 bolt ,没太多不同,只是 bbolt 目前还在维护。

事务

并发读写

同时只能有

actions ⚠️:在事务开始时,会保持一个数据视图 这意味着事务处理过程中不会由于别处更改而改变

线程安全

单个事务和它所创建的所有对象(桶,键)都不是线程安全的。

建议加锁 或者 每一个 goroutine 并发都开启 一个 事务

当然,从 db 这个 bbolt 的顶级结构创建 事务 是 线程安全 的

死锁

前面提到的 读写事务 和 只读事务 拒绝相互依赖。当然也不能在同一个 goroutine 里。

死锁原因是 读写事务 需要周期性重新映射 data 文件(即database)。这在开启只读事务时是不可行的。

读写事务

使用 db.Update开启一个读写事务

err := db.Update(func(tx *bolt.Tx) error{
    ···
    return nil
})

上文提过,在一个事务中 ,数据视图是一样的。 (详细解释就是,在这个函数作用域中,数据对你呈现最终一致性)

返回值

bboltdb 根据你的返回值判断事务状态,你可以添加任意逻辑并认为出错时返回 return err bboltdb 会回滚,如果 return nil 则提交你的事务。

建议永远检查 Update 的返回值,因为他会返回如 硬盘压力 等造成事务失败的信息(这是在你的逻辑之外的情况)

⚠️:你自定义返回 error 的 error 信息同样会被传递出来。

只读事务

使用 db.View 来新建一个 只读事务

err := db.View(func(tx *bolt.Tx) error {
    ···
    return nil
})

同上,你会获得一个一致性的数据视图。

当然,只读事务 只能检索信息,不会有任何更改。(btw,但是你可以 copy 一个 database 的副本,毕竟这只需要读数据)

批量读写事务

读写事务 db.Update 最后需要对 database提交更改,这会等待硬盘就绪。

每一次文件读写都是和磁盘交互。这不是一个小开销。

你可以使用 db.Batch 开启一个 批处理事务。他会在最后批量提交(其实是多个 goroutines 开启 db.Batch 事务时有机会合并之后一起提交)从而减小了开销。 ⚠️:db.Batch 只对 goroutine 起效

使用 批处理事务 需要做取舍,用 幂等函数 换取 速度 ⚠️: db.Batch 在一部分事务失败的时候会尝试多次调用那些事务函数,如果不是幂等会造成不可预知的非最终一致性。

例:使用事务外的变量来使你的日志不那么奇怪

var id uint64
err := db.Batch(func(tx *bolt.Tx) error {
    // Find last key in bucket, decode as bigendian uint64, increment
    // by one, encode back to []byte, and add new key.
    ...
    id = newValue
    return nil
})
if err != nil {
    return ...
}
fmt.Println("Allocated ID %d", id)

手动事务

可以手动进行事务的 开启 ,回滚,新建对象,提交等操作。因为本身 db.Updatedb.View 就是他们的包装 ⚠️:手动事务记得 关闭 (Close)

开启事务使用 db.Begin(bool) 同时参数代表着是否可以写操作。如下:

// Start a writable transaction.
tx, err := db.Begin(true)
if err != nil {
    return err
}
defer tx.Rollback()

// Use the transaction...
_, err := tx.CreateBucket([]byte("MyBucket"))
if err != nil {
    return err
}

// Commit the transaction and check for error.
if err := tx.Commit(); err != nil {
    return err
}

Using Store ?

Using Buckets

桶是键值对的集合。在一个桶中,键值唯一。

创建

使用 Tx.CreateBucket()Tx.CreateBucketIfNotExists() 建立一个新桶(推荐使用第二个) 接受参数是 桶的名字

删除

使用 Tx.DeleteBucket() 根据桶的名字来删除

例子

func main() {
    db, err := bbolt.Open("./data", 0666, nil)
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    db.Update(func(tx *bbolt.Tx) error {
        b, err := tx.CreateBucketIfNotExists([]byte("MyBucket"))
        if err != nil {
            return fmt.Errorf("create bucket: %v", err)
        }

        if err = tx.DeleteBucket([]byte("MyBucket")); err != nil {
            return err
        }

        return nil
    })

}

Using key/value pairs ?

最重要的部分,就是 kv 存储怎么使用了,首先需要一个 桶 来存储键值对。

Put

使用Bucket.Put()来存储键值对,接收两个 []byte 类型的参数

db.Update(func(tx *bolt.Tx) error {
    b := tx.Bucket([]byte("MyBucket"))
    err := b.Put([]byte("answer"), []byte("42"))
    return err
})

很明显,上面的例子设置了 Pair: key:answer value:42

Get

使用 Bucket.Get() 来查询键值。参数是一个 []byte(别忘了这次我们只是查询,可以使用 只读事务)

db.View(func(tx *bolt.Tx) error {
    b := tx.Bucket([]byte("MyBucket"))
    v := b.Get([]byte("answer"))
    fmt.Printf("The answer is: %s\n", v)
    return nil
})

细心会注意到,Get是不会返回 error 的,这是因为 Get() 一定能正常工作(除非系统错误),相应的,当返回 nil 时,查询的键值对不存在。 ⚠️:注意 0 长度的值 和 不存在键值对 的行为是不一样的。(一个返回是 nil, 一个不是)

func main() {
    db, err := bolt.Open("./data.db", 0666, nil)
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    err = db.Update(func(tx *bolt.Tx) error {
        b, err := tx.CreateBucketIfNotExists([]byte("MyBucket"))
        if err != nil {
            return fmt.Errorf("create bucket: %v", err)
        }

        if err = b.Put([]byte("answer"), []byte("42")); err != nil {
            return err
        }

        if err = b.Put([]byte("zero"), []byte("")); err != nil {
            return err
        }

        return nil
    })

    db.View(func(tx *bolt.Tx) error {
        b := tx.Bucket([]byte("MyBucket"))
        v := b.Get([]byte("noexists"))
        fmt.Println(reflect.DeepEqual(v, nil)) // false
        fmt.Println(v == nil)                  // true

        v = b.Get([]byte("zero"))
        fmt.Println(reflect.DeepEqual(v, nil)) // false
        fmt.Println(v == nil)                  // true
        return nil
    })
}

Delete

使用 Bucket.Delete() 删除键值对

db.View(func(tx *bolt.Tx) error {
    b := tx.Bucket([]byte("MyBucket"))
    fmt.Println(b.Get([]byte("answer")))

    err := b.Delete([]byte("answer"))
    if err != nil {
        return err
    }
    return nil
})

⚠️: Get() 获取到的字节切片值只在当前事务(当前函数作用域)有效,如果要在其他事务中使用需要使用 copy() 将其拷贝到其他的字节切片

Tricks

桶的自增键

使用 NextSequence()来创建自增键,见下例

// CreateUser saves u to the store. The new user ID is set on u once the data is persisted.
func (s *Store) CreateUser(u *User) error {
    return s.db.Update(func(tx *bolt.Tx) error {
        // Retrieve the users bucket.
        // This should be created when the DB is first opened.
        b := tx.Bucket([]byte("users"))

        // Generate ID for the user.
        // This returns an error only if the Tx is closed or not writeable.
        // That can't happen in an Update() call so I ignore the error check.
        id, _ := b.NextSequence()
        u.ID = int(id)

        // Marshal user data into bytes.
        buf, err := json.Marshal(u)
        if err != nil {
            return err
        }

        // Persist bytes to users bucket.
        return b.Put(itob(u.ID), buf)
    })
}

// itob returns an 8-byte big endian representation of v.
func itob(v int) []byte {
    b := make([]byte, 8)
    binary.BigEndian.PutUint64(b, uint64(v))
    return b
}

type User struct {
    ID int
    ...
}

嵌套桶

很简单的,桶可以实现嵌套存储

func (*Bucket) CreateBucket(key []byte) (*Bucket, error)
func (*Bucket) CreateBucketIfNotExists(key []byte) (*Bucket, error)
func (*Bucket) DeleteBucket(key []byte) error

例子

假设您有一个多租户应用程序,其中根级别存储桶是帐户存储桶。该存储桶内部有一系列帐户的序列,这些帐户本身就是存储桶。在序列存储桶(子桶)中,可能有许多相关的存储桶(Users,Note 等)。

// createUser creates a new user in the given account.
func createUser(accountID int, u *User) error {
    // Start the transaction.
    tx, err := db.Begin(true)
    if err != nil {
        return err
    }
    defer tx.Rollback()

    // Retrieve the root bucket for the account.
    // Assume this has already been created when the account was set up.
    root := tx.Bucket([]byte(strconv.FormatUint(accountID, 10)))

    // Setup the users bucket.
    bkt, err := root.CreateBucketIfNotExists([]byte("USERS"))
    if err != nil {
        return err
    }

    // Generate an ID for the new user.
    userID, err := bkt.NextSequence()
    if err != nil {
        return err
    }
    u.ID = userID

    // Marshal and save the encoded user.
    if buf, err := json.Marshal(u); err != nil {
        return err
    } else if err := bkt.Put([]byte(strconv.FormatUint(u.ID, 10)), buf); err != nil {
        return err
    }

    // Commit the transaction.
    if err := tx.Commit(); err != nil {
        return err
    }

    return nil
}

遍历键值

在桶中,键值对根据 键 的 值是有字节序的。 使用 Bucket.Cursor()对其进行迭代

db.View(func(tx *bolt.Tx) error {
    // Assume bucket exists and has keys
    b := tx.Bucket([]byte("MyBucket"))

    c := b.Cursor()

    for k, v := c.First(); k != nil; k, v = c.Next() {
        fmt.Printf("key=%s, value=%s\n", k, v)
    }

    return nil
})

Cursor 有 5 种方法进行迭代

  1. First() Move to the first key.

  2. Last() Move to the last key.

  3. Seek() Move to a specific key.\

  4. Next() Move to the next key.\

  5. Prev() Move to the previous key.

每一个方法都返回 (key []byte, value []byte) 两个值 当方法所指值不存在时返回 两个 nil 值,发生在以下情况:

  1. 迭代到最后一个键值对时,再一次调用 Cursor.Next()
  2. 当前所指为第一个键值对时,调用 Cursor.Prev()
  3. 当使用 4.Next() 和 5. Prev()方法而未使用 1.First() 2.Last() 3. Seek()指定初始位置时

⚠️特殊情况:当 key 为 非 nilvaluenil 是,说明这是嵌套桶,value 值是子桶,使用 Bucket.Bucket() 方法访问 子桶,参数是 key

db.View(func(tx *bolt.Tx) error {
    c := b.Cursor()
    fmt.Println(c.First())
    k, v := c.Prev()
    fmt.Println(k == nil, v == nil) // true,true

    if k != nil && v == nil {
        subBucket := b.Bucket()
        // doanything
    }
    return nil
})

前缀遍历

通过使用 Cursor我们能够做到一些特殊的遍历,如:遍历拥有特定前缀的 键值对

db.View(func(tx *bolt.Tx) error {
    // Assume bucket exists and has keys
    c := tx.Bucket([]byte("MyBucket")).Cursor()

    prefix := []byte("1234")
    for k, v := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix); k, v = c.Next() {
        fmt.Printf("key=%s, value=%s\n", k, v)
    }

    return nil
})

范围遍历

在一个范围里遍历,如:使用可排序的时间编码(RFC3339)可以遍历特定日期范围的数据

db.View(func(tx *bolt.Tx) error {
    // Assume our events bucket exists and has RFC3339 encoded time keys.
    c := tx.Bucket([]byte("Events")).Cursor()

    // Our time range spans the 90's decade.
    min := []byte("1990-01-01T00:00:00Z")
    max := []byte("2000-01-01T00:00:00Z")

    // Iterate over the 90's.
    for k, v := c.Seek(min); k != nil && bytes.Compare(k, max) <= 0; k, v = c.Next() {
        fmt.Printf("%s: %s\n", k, v)
    }

    return nil
})

⚠️:Golang 实现的 RFC3339Nano 是不可排序的

ForEach

在桶中有值的情况下,可以使用 ForEach()遍历

db.View(func(tx *bolt.Tx) error {
    // Assume bucket exists and has keys
    b := tx.Bucket([]byte("MyBucket"))

    b.ForEach(func(k, v []byte) error {
        fmt.Printf("key=%s, value=%s\n", k, v)
        return nil
    })
    return nil
})

⚠️:在 ForEach()中遍历的键值对需要copy()到事务外才能在事务外使用

Advance

Backup

boltdb 是一个单一的文件,所以很容易备份。你可以使用Tx.writeto()函数写一致的数据库。如果从只读事务调用这个函数,它将执行热备份,而不会阻塞其他数据库的读写操作。

默认情况下,它将使用一个常规文件句柄,该句柄将利用操作系统的页面缓存。

有关优化大于 RAM 数据集的信息,请参见[Tx](https://link.zhihu.com/?target=https%3A//godoc.org/go.etcd.io/bbolt%23Tx)文档。

一个常见的用例是在 HTTP 上进行备份,这样您就可以使用像cURL这样的工具来进行数据库备份:

func BackupHandleFunc(w http.ResponseWriter, req *http.Request) {
    err := db.View(func(tx *bolt.Tx) error {
        w.Header().Set("Content-Type", "application/octet-stream")
        w.Header().Set("Content-Disposition", `attachment; filename="my.db"`)
        w.Header().Set("Content-Length", strconv.Itoa(int(tx.Size())))
        _, err := tx.WriteTo(w)
        return err
    })
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
    }
}

然后您可以使用此命令进行备份:

$ curl http://localhost/backup > my.db

或者你可以打开你的浏览器以http://localhost/backup,它会自动下载。

如果你想备份到另一个文件,你可以使用TX.copyfile()辅助功能。

Statistics

数据库对运行的许多内部操作保持一个运行计数,这样您就可以更好地了解发生了什么。通过捕捉两个时间点数据的快照,我们可以看到在这个时间范围内执行了哪些操作。

例如,我们可以用一个 goroutine 里记录统计每一个 10 秒:

go func() {
    // Grab the initial stats.
    prev := db.Stats()
    for {
        // Wait for 10s.
        time.Sleep(10 * time.Second)
        // Grab the current stats and diff them.
        stats := db.Stats()
        diff := stats.Sub(&prev)
        // Encode stats to JSON and print to STDERR.
        json.NewEncoder(os.Stderr).Encode(diff)
        // Save stats for the next loop.
        prev = stats
    }
}()

将这些信息通过管道输出到监控也很有用。

Read-Only Mode

可以开启只读模式防止错误更改

db, err := bolt.Open("my.db", 0666, &bolt.Options{ReadOnly: true})
if err != nil {
    log.Fatal(err)
}

现在使用 db.Update() 等开启读写事务 将会阻塞

Mobile Use

移动端支持由 gomobile 工具提供

Create a struct that will contain your database logic and a reference to a *bolt.DB with a initializing constructor that takes in a filepath where the database file will be stored. Neither Android nor iOS require extra permissions or cleanup from using this method.

func NewBoltDB(filepath string) *BoltDB {
 db, err := bolt.Open(filepath+"/demo.db", 0600, nil)
 if err != nil {
    log.Fatal(err)
 }
 return &BoltDB{db}
}
type BoltDB struct {
 db *bolt.DB
 ...
}
func (b *BoltDB) Path() string {
 return b.db.Path()
}
func (b *BoltDB) Close() {
 b.db.Close()
}

Database logic should be defined as methods on this wrapper struct. To initialize this struct from the native language (both platforms now sync their local storage to the cloud. These snippets disable that functionality for the database file):

Android

String path;
if (android.os.Build.VERSION.SDK_INT >=android.os.Build.VERSION_CODES.LOLLIPOP){
    path = getNoBackupFilesDir().getAbsolutePath();
} else{
    path = getFilesDir().getAbsolutePath();
}
Boltmobiledemo.BoltDB boltDB = Boltmobiledemo.NewBoltDB(path)

iOS

- (void)demo {
    NSString* path = [NSSearchPathForDirectoriesInDomains(NSLibraryDirectory,
                                                          NSUserDomainMask,
                                                          YES) objectAtIndex:0];
 GoBoltmobiledemoBoltDB * demo = GoBoltmobiledemoNewBoltDB(path);
 [self addSkipBackupAttributeToItemAtPath:demo.path];
 //Some DB Logic would go here
 [demo close];
}
- (BOOL)addSkipBackupAttributeToItemAtPath:(NSString *) filePathString
{
    NSURL* URL= [NSURL fileURLWithPath: filePathString];
    assert([[NSFileManager defaultManager] fileExistsAtPath: [URL path]]);
    NSError *error = nil;
    BOOL success = [URL setResourceValue: [NSNumber numberWithBool: YES]
                                  forKey: NSURLIsExcludedFromBackupKey error: &error];
    if(!success){
        NSLog(@"Error excluding %@ from backup %@", [URL lastPathComponent], error);
    }
    return success;
}

扩展阅读

更多指导

For more information on getting started with Bolt, check out the following articles:

与其他数据库的比较

Postgres,MySQL 和其他关系数据库

关系数据库将数据组织成行,并且只能通过使用 SQL 进行访问。这种方法在存储和查询数据方面提供了灵活性,但是在解析和计划 SQL 语句时也会产生开销。Bolt 通过字节切片键访问所有数据。这使得 Bolt 可以快速地通过键读取和写入数据,但是不提供将值连接在一起的内置支持。 大多数关系数据库(SQLite 除外)都是独立于应用程序运行的独立服务器。这使您的系统具有将多个应用程序服务器连接到单个数据库服务器的灵活性,但同时也增加了在网络上序列化和传输数据的开销。Bolt 作为应用程序中包含的库运行,因此所有数据访问都必须经过应用程序的过程。这使数据更接近您的应用程序,但限制了对数据的多进程访问。

LevelDB,RocksDB

LevelDB 及其派生类(RocksDB,HyperLevelDB)与 Bolt 类似,因为它们是捆绑到应用程序中的库,但是它们的底层结构是日志结构的合并树(LSM 树)。LSM 树通过使用预写日志和称为 SSTables 的多层排序文件来优化随机写入。Bolt 在内部使用 B + 树,并且仅使用一个文件。两种方法都需要权衡。 如果您需要较高的随机写入吞吐量(> 10,000 w / sec),或者需要使用旋转磁盘,那么 LevelDB 可能是一个不错的选择。如果您的应用程序是大量读取或进行大量范围扫描,那么 Bolt 可能是一个不错的选择。 另一个重要的考虑因素是 LevelDB 没有事务。它支持键/值对的批量写入,并且支持读取快照,但不能使您安全地执行比较和交换操作。Bolt 支持完全可序列化的 ACID 事务。

LMDB

Bolt 最初是 LMDB 的端口,因此在架构上相似。两者都使用 B + 树,具有 ACID 语义和完全可序列化的事务,并支持使用单个写入器和多个读取器的无锁 MVCC。 这两个项目有些分歧。LMDB 专注于原始性能,而 Bolt 专注于简单性和易用性。例如,出于性能考虑,LMDB 允许执行几种不安全的操作,例如直接写入。Bolt 选择禁止可能使数据库处于损坏状态的操作。Bolt 唯一的例外是DB.NoSync。 API 也有一些区别。打开 LMDB 时需要最大的 mmap 大小,mdb_env而 Bolt 会自动处理增量 mmap 的大小。LMDB 使用多个标志来重载 getter 和 setter 函数,而 Bolt 将这些特殊情况拆分为自己的函数。

注意事项和局限性

选择合适的工具来完成这项工作很重要,而 Bolt 也不例外。在评估和使用 Bolt 时,需要注意以下几点:

阅读资料

对于嵌入式,可序列化的事务性键/值数据库,Bolt 是一个相对较小的代码库(<5KLOC),因此对于那些对数据库的工作方式感兴趣的人来说,Bolt 可能是一个很好的起点。

最佳起点是 Bolt 的主要切入点:

如果您还有其他可能对他人有用的注释,请通过请求请求将其提交。

其他使用螺栓的项目

以下是使用 Bolt 的公共开源项目的列表:

If you are using Bolt in a project please send a pull request to add it to the list.

1 回帖
请输入回帖内容...