Iawen's Blog

我喜欢这样自由的随手涂鸦, 因为我喜欢风......

BadgerDB 是一个用纯 Go 编写的可嵌入、持久且快速的键值 (KV) 数据库。它是Dgraph的底层数据库, 一个快速的分布式图数据库。它旨在成为 RocksDB 等非基于 Go 的键值存储的高性能替代品。

badger 是 dgraph 开源的 LSMTree 的 KV 引擎, 它相比 leveldb 有 KV 分离、事务、并发合并等增强, 是 go 生态中比较生产级的存储引擎了。市面上已经有一些知名的基于LSM tree的k/v数据库, 比如 leveldb、goleveldb、rocksdb、boltdb, 可是为什么还要创造新的轮子呢。我们不妨从LSM说起。

0. badger 的诞生

0.1 背景和动机

Badger 是为Dgraph而生的, 是Dgraph底层数据存储引擎。然而, 早期Badger的早期数据存储是使用的 rocksdb, 那么后来为什么又自己开发了一个KV数据库呢? 在 Dgraph 官方博客的一篇文章里有写到:

RocksDB 是市场上最流行, 也可能是最高效的键值对存储。RocksDB is the most popular and probably the most efficient key-value store in the market. 所以很自然, 如果你需要一个键值对存储, 你会倾向于 RocksDB。So naturally, if you need a key-value store, you’d gravitate towards RocksDB. 如果我用 C++ 编写 Dgraph, 我会很乐意使用它。And if I was writing Dgraph in C++, I’d happily use it.

0.2 Cgo

  • Go profiler 无法分析和监测 Cgo 代码段里的问题, 所有工具链都不起作用
  • 当涉及到 Cgo 时, 轻量级的 goroutine 会变成昂贵的 pthread
  • Cgo 造成了内存泄漏

0.3 WiscKey

WiscKey 论文引发里作者更好的想法。

We have built an efficient and persistent log structured merge (LSM) tree based key-value store, purely in Go language. It is based upon WiscKey paper included in USENIX FAST 2016. This design is highly SSD-optimized and separates keys from values to minimize I/O amplification; leveraging both the sequential and the random performance of SSDs.

为了实现 SSD 优化的键值存储, WiscKey 包含四个关键思想。首先, WiscKey 将键与值分开, 仅将 LSM 树中的键和值保留在单独的日志文件中。其次, 为了处理未排序的值(在范围查询期间需要随机访问), WiscKey 使用 SSD 设备的并行随机读取特性。第三, WiscKey 利用独特的崩溃一致性和垃圾收集技术来有效地管理值日志。最后, WiscKey 通过在不牺牲一致性的情况下删除 LSM-tree 日志来优化性能, 从而减少小型写入的系统调用开销。

WiscKey 相关介绍可以参考: WiscKey: Separating Keys from Values in SSD-Conscious Storage [归拢]

1. badger 简介使用

1.1 LSM 树

键值存储设计有两种流行的实现类型:

  • 基于日志结构合并 (LSM) 树
  • 基于 B+ 树

Log-structured merge-tree (简称 LSM tree) 可以追溯到1996年 Patrick O’Neil等人的论文。最简单的LSM tree是两层树状结构C0,C1。 C0比较小, 驻留在内存, 当C0超过一定的大小, 一些连续的片段会从C0移动到磁盘中的C1中, 这是一次merge的过程。在实际的应用中, 一般会分为更多的层级(level), 而层级C0都会驻留在内存中。 4

badger的设计基于一篇论文: WiscKey: Separating Keys from Values in SSD-conscious Storage(2016)。这篇论文提出了一种新的设计, 专门为SSD所优化, 将key和value分别存储以减少I/O放大。

LSM tree最主要的性能消耗在于compaction过程。在compaction的时候, 多个文件需要读进内存, 排序, 然后再写回。每个文件都固定大小, 如果文件中包含value, 文件大小会显著的增加, compaction会更频繁地发生。 Badger对key,value进行了分别的处理, 只有key存在LSM tree中, value存在WAL中, 叫做value log。通常情况下, key比较小, 所以LSM tree比较小, 当获取value值的时候, 再从SSD存储中读取, 适合key小value大的情况。

Badger对键使用 delta 编码以进一步减小有效大小。假设每个键 16 字节和每个值指针 16 字节, 一个 64MB 的文件可以存储 200 万个键值对。对于 1KB 的值和 7500 万个 22 字节的密钥, 整个数据集的原始大小为 72 GB。Badger用于此设置的 LSM 树大小仅为 1.7G, 可以轻松放入 RAM。

1.2 使用方式

1.2.1 内存模式/无盘模式

默认情况下, Badger 确保所有数据都保存在磁盘上。它还支持纯内存模式。当 Badger 在内存模式下运行时, 所有数据都存储在内存中。在内存模式下读写速度要快得多, 但在崩溃或关闭的情况下, 存储在 Badger 中的所有数据都会丢失。要在内存模式下打开 badger, 请设置InMemory选项。

opt := badger.DefaultOptions("").WithInMemory(true)

1.2.2 加密方式

如果在 Badger 上启用加密, 还需要设置索引缓存大小。拥有缓存可以提高性能。否则, 启用加密时, 您的读取速度会非常慢。

opts.IndexCache = 100 << 20 // 100 mb or some other size based on the amount of data

1.3 事务

Badger 事务参考了 Yabandeh 的 A Critique of Snapshot Isolation为允许并发执行事务、提供可序列化的快照隔离、避免写入倾斜提供了良好的基础。 Badger中的事务同时执行。当它们开始时, 它们会从内存中获取读取时间戳oracle。Badger支持MVCC, 因此所有读取都基于此时间戳完成。在执行读取时, 我们存储Key的指纹(指纹而不是Key以节省空间。在极少数情况下, 这可能导致错误的否定冲突检测, 中止事务, 需要重试)。在只读事务中, 我们完全避免跟踪读取。如果您正在拍摄Badger的快照或备份, 这可以防止内存爆炸。

Badger就是一个符合 ACID 的数据库, 提供并发事务和可序列化的快照隔离。

1.3.1 只读事务

要启动只读事务, 可以使用以下DB.View()方法:

err := db.View(func(txn *badger.Txn) error {
  // Your code here…
  return nil
})

您不能在此事务中执行任何写入或删除。Badger 确保您在此闭包中获得一致的数据库视图。事务开始后在其他地方发生的任何写入, 都不会被闭包内的调用看到。

1.3.1 读写事务

要启动读写事务, 可以使用以下DB.Update()方法:

err := db.Update(func(txn *badger.Txn) error {
  // Your code here…
  return nil
})

在读写事务中允许所有数据库操作。始终检查返回的错误值。如果您在闭包中返回错误, 它将被传递:

  • 发生ErrConflict冲突时会报错。根据您的应用程序的状态, 如果您收到此错误, 您可以选择重试该操作。
  • 如果ErrTxnTooBig事务中挂起的写入/删除的数量超过一定的限制, 将报告一个。在这种情况下, 最好立即提交事务并开始新事务。

1.3.3 手动管理事务

DB.View()和DB.Update()方法可以被DB.NewTransaction()和Txn.Commit()方法封装 (或者在只读事务下使用Txn.Discard())。这些辅助方法将启动事务, 执行一个函数, 然后在返回错误时安全地丢弃您的事务。这是使用 Badger 交易的推荐方式。

DB.NewTransaction(), 接受一个布尔参数来指定是否需要读写事务。对于读写事务, 需要调用Txn.Commit() 以确保事务被提交。对于只读事务, 调用 Txn.Discard()就足够了。Txn.Commit()也在内部调用Txn.Discard()以清理事务, 因此只需调用即可Txn.Commit()完成读写事务。但是, 如果由于某种原因没有调用 Txn.Commit()(例如, 它过早地返回错误), 那么请确保您Txn.Discard()在一个defer块中调用, 如:

// Start a writable transaction.
txn := db.NewTransaction(true)
defer txn.Discard()

// Use the transaction...
err := txn.Set([]byte("answer"), []byte("42"))
if err != nil {
    return err
}

// Commit the transaction and check for error.
if err := txn.Commit(); err != nil {
    return err
}

1.4 使用键/值对

要保存键/值对, 请使用以下Txn.Set()方法:

err := db.Update(func(txn *badger.Txn) error {
  err := txn.Set([]byte("answer"), []byte("42"))
  return err
})

键/值对也可以通过首先创建来保存Entry, 然后 Entry使用Txn.SetEntry(). Entry还公开了在其上设置属性的方法:

err := db.Update(func(txn *badger.Txn) error {
  e := badger.NewEntry([]byte("answer"), []byte("42"))
  err := txn.SetEntry(e)
  return err
})

要检索数据, 我们可以使用以下Txn.Get()方法:

err := db.View(func(txn *badger.Txn) error {
  item, err := txn.Get([]byte("answer"))
  handle(err)

  var valNot, valCopy []byte
  err := item.Value(func(val []byte) error {
    // This func with val would only be called if item.Value encounters no error.

    // Accessing val here is valid.
    fmt.Printf("The answer is: %s\n", val)

    // Copying or parsing val is valid.
    valCopy = append([]byte{}, val...)

    // Assigning val slice to another variable is NOT OK.
    valNot = val // Do not do this.
    return nil
  })
  handle(err)

  // DO NOT access val here. It is the most common cause of bugs.
  fmt.Printf("NEVER do this. %s\n", valNot)

  // You must copy it to use it outside item.Value(...).
  fmt.Printf("The answer is: %s\n", valCopy)

  // Alternatively, you could also use item.ValueCopy().
  valCopy, err = item.ValueCopy(nil)
  handle(err)
  fmt.Printf("The answer is: %s\n", valCopy)

  return nil
})

1.5 单调递增的整数

DB.GetSequence, 返回一个Sequence对象, 该对象是线程安全的, 可以通过各种 goroutine 并发使用。 Badger 将租用一系列整数从内存中分发, 并将带宽提供给DB.GetSequence. 完成磁盘写入的频率取决于此租用带宽和Next 调用频率。将带宽设置得太低会导致更多的磁盘写入, 如果将其设置得太高会导致在 Badger 关闭或崩溃时浪费整数。为避免浪费整数, 请Release在关闭 Badger 之前调用。

seq, err := db.GetSequence(key, 1000)
defer seq.Release()
for {
  num, err := seq.Next()
}

1.6 合并操作

Badger 提供对有序合并操作的支持:

  • 定义一个 MergeFunc接受现有值的类型的函数, 以及要与其合并的值。它返回一个新值, 这是合并操作的结果。所有值都在字节数组中指定。
  • 然后可以将此函数与一个键和一个持续时间值一起传递给 DB.GetMergeOperator()。持续时间指定合并函数对使用该方法MergeOperator.Add()添加的值运行的频率。
  • MergeOperator.Get()方法可用于检索与合并操作关联的键的累积值。
// Merge function to append one byte slice to another
func add(originalValue, newValue []byte) []byte {
  return append(originalValue, newValue...)
}

key := []byte("merge")

m := db.GetMergeOperator(key, add, 200*time.Millisecond)
defer m.Stop()

m.Add([]byte("A"))
m.Add([]byte("B"))
m.Add([]byte("C"))

res, _ := m.Get() // res should have value ABC encoded

然后可以将此函数DB.GetMergeOperator()与一个键和一个持续时间值一起传递给该方法。MergeOperator.Add() 持续时间指定合并函数对使用该方法添加的值运行的频率。

1.7 设置Key 的生存时间(TTL)和用户元数据

  • Entry.WithTTL()和Txn.SetEntry()
  • Entry.WithTTL()和Txn.SetEntry()
err := db.Update(func(txn *badger.Txn) error {
  e := badger.NewEntry([]byte("answer"), []byte("42")).WithMeta(byte(1)).WithTTL(time.Hour)
  err := txn.SetEntry(e)
  return err
})

1.8 遍历键

要迭代键, 我们可以使用Iterator, 可以使用 Txn.NewIterator()方法获得。迭代以按字节排序的字典顺序发生。 迭代器允许您移动到键列表中的特定点, 并一次一个地通过键向前或向后移动。 默认情况下, Badger 会预取接下来 100 个Items的值, 可以使用该IteratorOptions.PrefetchSize字段进行调整。但是, 将其设置为高于GOMAXPROCS(我们建议为 128 或更高)的值不应带来任何额外的好处。您还可以完全关闭值的获取。

err := db.View(func(txn *badger.Txn) error {
  opts := badger.DefaultIteratorOptions
  opts.PrefetchSize = 10
  it := txn.NewIterator(opts)
  defer it.Close()
  for it.Rewind(); it.Valid(); it.Next() {
    item := it.Item()
    k := item.Key()
    err := item.Value(func(v []byte) error {
      fmt.Printf("key=%s, value=%s\n", k, v)
      return nil
    })
    if err != nil {
      return err
    }
  }
  return nil
})

1.8.1 前缀扫描

要遍历一个键前缀, 可以组合Seek()and ValidForPrefix():

db.View(func(txn *badger.Txn) error {
  it := txn.NewIterator(badger.DefaultIteratorOptions)
  defer it.Close()
  prefix := []byte("1234")
  for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
    item := it.Item()
    k := item.Key()
    err := item.Value(func(v []byte) error {
      fmt.Printf("key=%s, value=%s\n", k, v)
      return nil
    })
    if err != nil {
      return err
    }
  }
  return nil
})

1.8.2 仅键迭代

Badger 支持一种独特的迭代模式, 称为key-only迭代。它比常规迭代快几个数量级, 因为它只涉及对 LSM 树的访问, 它通常完全驻留在 RAM 中。要启用仅键迭代, 您需要将该IteratorOptions.PrefetchValues 字段设置为false. 这也可用于在迭代期间对选定键进行稀疏读取, item.Value()仅在需要时调用。

err := db.View(func(txn *badger.Txn) error {
  opts := badger.DefaultIteratorOptions
  opts.PrefetchValues = false
  it := txn.NewIterator(opts)
  defer it.Close()
  for it.Rewind(); it.Valid(); it.Next() {
    item := it.Item()
    k := item.Key()
    fmt.Printf("key=%s\n", k)
  }
  return nil
})

1.9 Stream

Stream 使用 LSM 树中的 SSTables 创建的自然边界来快速生成键范围。然后每个 goroutine 选择一个范围并运行一个迭代器来迭代它。每个迭代器迭代所有版本的值并从同一个事务中创建, 从而在数据库的快照上工作。每次遇到新键时, 它都会调用ChooseKey(item), 然后调用KeyToList(key, itr)。这允许用户选择或拒绝该键, ​​如果选择, 则将值版本转换为自定义键值。goroutine 批量处理 4MB 的键值, 然后将其发送到通道。另一个 goroutine 使用智能批处理算法进一步批处理来自该通道的数据并Send串行调用。 该框架专为高吞吐量键值迭代而设计, 将迭代工作分散到许多 goroutine 中。DB.Backup使用此框架快速提供完整和增量备份。Dgraph 是这个框架的重度用户。事实上, 在移植到 Badger 之前, 这个框架是在 Dgraph 中开发和使用的。

stream := db.NewStream()
// db.NewStreamAt(readTs) for managed mode.

// -- Optional settings
stream.NumGo = 16                     // Set number of goroutines to use for iteration.
stream.Prefix = []byte("some-prefix") // Leave nil for iteration over the whole DB.
stream.LogPrefix = "Badger.Streaming" // For identifying stream logs. Outputs to Logger.

// ChooseKey is called concurrently for every key. If left nil, assumes true by default.
stream.ChooseKey = func(item *badger.Item) bool {
  return bytes.HasSuffix(item.Key(), []byte("er"))
}

// KeyToList is called concurrently for chosen keys. This can be used to convert
// Badger data into custom key-values. If nil, uses stream.ToList, a default
// implementation, which picks all valid key-values.
stream.KeyToList = nil

// -- End of optional settings.

// Send is called serially, while Stream.Orchestrate is running.
stream.Send = func(list *pb.KVList) error {
  return proto.MarshalText(w, list) // Write to w.
}

// Run the stream
if err := stream.Orchestrate(context.Background()); err != nil {
  return err
}

2. badger 进阶操作

2.1 垃圾收集

  • Badger 将值与 LSM 树分开保存。这意味着清理 LSM 树的压缩操作根本不会触及这些值。值需要单独清理。
  • 并发读/写事务可能会为单个键留下多个值, 因为它们以不同的版本存储。这些可能会累积, 并在需要这些旧版本之后占用不需要的空间。

Badger 依靠客户端在他们选择的时间执行垃圾收集。它提供了以下方法, 可以在适当的时候调用:

  • DB.RunValueLogGC()
  • DB.PurgeOlderVersions(): 此方法自 v1.5.0 起已弃用。现在, Badger 的 LSM 树会自动丢弃旧版本/无效版本的Key

2.2 数据库备份

  • DB.Backup() 和 DB.Load(): 可用于进行在线备份和恢复
  • 命令行工具 badger

2.3 crash 恢复

论文WiscKey: Separating Keys from Values in SSD-conscious Storage(2016)里讨论了有助于创建崩溃恢复系统的现代文件系统的属性:

[There is] an interesting property of modern file systems (such as ext4, btrfs, and xfs). … Consider a file that contains [a] sequence of bytes … and the user appends … to it. If a crash happens, … only some prefix of the appended bytes will be added to the end of the file. … It is not possible for random bytes or a non-prefix subset of the appended bytes to be added to the file. 现代文件系统(如ext4、btrfs和xfs)有一个有趣的特性…考虑一个包含[a]字节序列的文件, 用户将它附加到它。如果发生崩溃, …只会在文件末尾添加附加字节的一些前缀…不可能将随机字节或附加字节的非前缀子集添加到文件中。

3. 功能特性

3.1 功能对比

因为Badger不存储value, 而是存储value的指针, compaction的时候只移动key和value指针, 对于 1KB大小的value和16 byte的key, 写放大为(10*16 + 1024)/(16 + 1024) ~ 1.14。

因为Badger的LSM tree比较小, 所以它的层级相对于普通的LSM tree要少, 这也意味着查找会更少。例如1KB大小的value, 22byte的key, 7500万条数据的原始大小是 72GB,但是对于Badger的LSM tree来说, 只需要1.7G,完全可以放在内存中, 这也是Badger的随机读比RocksDB快3.5的原因。

Badger还使用技术对value值进行压缩, 以便使得log文件更小。

Badgerdb官网提供了一个功能对比:

功能 Badger RocksDB BoltDB
Design LSM tree with value log LSM tree only B+ tree
High Read throughput Yes No Yes
High Write throughput Yes Yes No
Designed for SSDs Yes (with latest research 1) Not specifically 2 No
Embeddable Yes Yes Yes
Sorted KV access Yes Yes Yes
Pure Go (no Cgo) Yes No Yes
Transactions Yes, ACID, concurrent with SSI3 Yes (but non-ACID) Yes, ACID
Snapshots Yes Yes Yes
TTL support Yes Yes No
3D access (key-value-version) Yes4 No No

3.2 BadgerDB 的回写策略

Badger的大value是存放在value log文件中, 它很聪明的一点是GC 接口只交给用户来调度, 而不是自己内部自主触发, 这样的责任划分就非常清晰了, 用户自己选择开启关闭GC, 来自己承担GC引入的读写问题, 真是机智。 当然BadgerDB 这里的GC回写并没有看到太亮眼的设计, 就是在对 value log 进行GC的时候和Titan不开启gc_merge_rewrite 逻辑差不多。

选择好了待GC的value-log文件, 先从lsm中尝试读取key, 存在则需要将value写入到新的value log中。 完成写入新的value-log之后, 会将最终的key, value-index 更新到lsm-tree中。 回写源代码基本在RunValueLogGC 函数中的rewrite处理逻辑中。

4. 事务

badger 中与事务相关的结构体包括 Txn 和 oracle 两个, Txn 内部的信息主要是开始时间戳、提交时间戳、读写的 key 列表, oracle 相当于事务管理器, 内部维护近期提交的事务列表、全局时间戳、当前活跃事务的最早时间戳等。 事务时间戳是逻辑时间戳, 每次事务提交时递增 1。 SSI 事务中冲突探测的逻辑就是, 找出在当前事务执行期间 Commit 的事务列表, 检查当前事务读取的 key 列表是否与这些事务的写入的 key 列表有重叠。 WaterMark 结构体内部是个堆, 用于管理、查找事务开始、结束的区段。oracle 的 txnMarker 主要用于协调等待 Commit 授时与落盘的时间窗口, readMarker 管理当前活跃事务的最早时间戳, 用于清理过期的 committedTxns。

Badger 支持事务, 且事务是基于MVCC实现的, 实现了 Serializable Snapshot 隔离级别(简称 SSI)的乐观并发控制的事务, 相比 Snapshot 隔离级别(简称 SI), SSI 除了跟踪写操作进行冲突检测, 也会对事务中的读操作进行跟踪, 在 Commit 时进行冲突检查, 当前事务读取过的数据, 如果在事务执行的期间被其他事务修改过, 则会提交失败: 0

4.1.1 事务的生命周期

乐观并发控制事务的生命周期大致上分为四段, 授时、跟踪读写、提交、清理:

  • 事务启动: 获取事务开始时刻的授时
  • 事务过程: 跟踪事务的读写操作涉及到的 key, 事务期间读操作按启动时刻的快照为准, 事务中的写入内容在内存中暂存
  • 事务提交: 根据事务中跟踪的 key 进行冲突检测, 获取事务提交时刻的授时, 使写入生效
  • 清理旧事务: 当活跃的事务完成后, 可以使已经不再需要的快照数据、冲突检测数据等事务相关数据得到释放

为了管理事务的生命周期, 需要为每个事务和全局层面记录两部分元信息:

  • 每个事务层面, 需要记录自己读写的 key 列表, 以及事务的开始时间戳和提交时间戳, 这部分信息维护在 Txn 结构体中
  • 全局层面, 需要管理全局时间戳, 以及最近提交的事务列表, 用于在新的事务提交中对事务开始与提交时间戳中间提交过的事务范围进行冲突检查, 乃至当前活跃的事务的最小时间戳, 用于清理旧事务信息, 这部分信息维护在 oracle 结构体中

这里授时得到的时间戳并非物理时间, 而是逻辑上的: 所有的数据变化均来自事务提交的时刻, 因此仅当事务提交时使时间戳递增。

1

以上面的图为例, 事务 4 在提交时需要与事务 3 和事务 1 进行冲突检测, 因为事务 3 和事务 1 的提交时间位于事务 4 的开始与提交之间, 事务 3 和事务 1 写入的 key 如果与事务 4 读写的 key 列表存在重叠, 则认为存在冲突。

4.1.2 事务开始

启动一个新事务的入口在 db.newTransaction() 函数。这个函数比较简单, 除了初始化几个字段, 唯一有行为语义的部分就是 txn.readTs = db.orc.readTs() 这一行申请授时的地方了。

看一下 readTs() 函数的实现:

func (o *oracle) readTs() uint64 {
	// 忽略 isManaged 部分逻辑

	var readTs uint64
	o.Lock()
	readTs = o.nextTxnTs - 1
	o.readMark.Begin(readTs)
	o.Unlock()

	// Wait for all txns which have no conflicts, have been assigned a commit
	// timestamp and are going through the write to value log and LSM tree
	// process. Not waiting here could mean that some txns which have been
	// committed would not be read.
	y.Check(o.txnMark.WaitForMark(context.Background(), readTs))
	return readTs
}

授时的逻辑很简单, 直接复制来自 oracle 对象的 nextTxnTs 字段中记录的当前时间戳即可。

这里有一个细节, 前面提到时间戳的递增发生于事务的提交, 会存在一个时间戳递增了但写入仍未落盘的时间窗口, 导致事务在这时开始的话, 会读到旧数据而非时间戳后的快照。解决办法就是启动事务前, 先等待当前时间戳的事务完成写入。

2

txnMark 字段是 WaterMark 结构体类型, 它内部会维护一个堆数据结构, 可以用于跟踪事务的时间戳区段的变化通知。

除了基于 txnMark 等待当前时间戳相关的事务完成写入, readTs 函数中还有一行 o.readMark.Begin(readTs)。readMark 与 txnMark 一样是一个 WaterMark 结构体, 但它没有利用 WaterMark 结构体等待点位的能力, 只利用它的堆数据结构来跟踪当前活跃的事务的时间戳范围, 用于找出哪些事务可以过期回收。

4.1.3 事务执行

事务执行期间, 写入会暂存在内存的 pendingWrites 缓冲中。managed 模式下, 如果在事务中对同一个 key 写入多次, 会将本事务内插入的历史版本数据存入 duplicateWrites 缓冲, 这里先忽略 duplicateWrites 字段。

事务期间的读取操作会首先读取 pendingWrites 缓冲, 随后再读取 LSM Tree 内的数据。badger 继承了 leveldb 中 iterator 组合的思想, 把 pendingWrites 的读取链路封装为了 Iterator, 并与 MemTableIterator、TableIterator 等 Iterator 通过 MergeIterator 组合为最终的 Iterator:

// NewIterator returns a new iterator. Depending upon the options, either only keys, or both
// key-value pairs would be fetched. The keys are returned in lexicographically sorted order.
// Using prefetch is recommended if you're doing a long running iteration, for performance.
//
// Multiple Iterators:
// For a read-only txn, multiple iterators can be running simultaneously.  However, for a read-write
// txn, iterators have the nuance of being a snapshot of the writes for the transaction at the time
// iterator was created. If writes are performed after an iterator is created, then that iterator
// will not be able to see those writes. Only writes performed before an iterator was created can be
// viewed.
func (txn *Txn) NewIterator(opt IteratorOptions) *Iterator {
	if txn.discarded {
		panic("Transaction has already been discarded")
	}
	if txn.db.IsClosed() {
		panic(ErrDBClosed.Error())
	}

	// Keep track of the number of active iterators.
	atomic.AddInt32(&txn.numIterators, 1)

	// TODO: If Prefix is set, only pick those memtables which have keys with
	// the prefix.
	tables, decr := txn.db.getMemTables()
	defer decr()
	txn.db.vlog.incrIteratorCount()
	var iters []y.Iterator
	if itr := txn.newPendingWritesIterator(opt.Reverse); itr != nil {
		iters = append(iters, itr)
	}
	for i := 0; i < len(tables); i++ {
		iters = append(iters, tables[i].sl.NewUniIterator(opt.Reverse))
	}
	iters = append(iters, txn.db.lc.iterators(&opt)...) // This will increment references.
	res := &Iterator{
		txn:    txn,
		iitr:   table.NewMergeIterator(iters, opt.Reverse),
		opt:    opt,
		readTs: txn.readTs,
	}
	return res
}

badger 会将 commitTs 作为 key 的后缀存储到 LSM Tree 中, Iterator 在迭代中也会对时间戳有感知, 按 readTs 时刻的快照数据进行迭代。这里与 leveldb 的 sequence 号与 Snapshot 的迭代行为是一致的。

4.1.4 事务提交

事务的提交入口位于 Commit() 函数, 它调用的 commitAndSend() 函数是逻辑的主体。大致上的过程包括:

  • 通过 orc.newCommitTs(txn) 进行事务冲突检测, 如果无冲突, 获取授时 commitTs
  • 循环为 pendingWrites 和 duplicateWrites 中的 Entry 的 version 绑定 commitTs, 并使存储的 key 绑定 commitTs
  • 调用 txn.db.sendToWriteCh(entries) 使写入缓冲进入落盘写入
  • 等待落盘完成后, 通知 orc.doneCommit(commitTs), 移动 txnMark 的点位

newCommitTs 内部会发起冲突检测和过期事务清理, 并使事务跟踪到 commitedTxns 中:

func (o *oracle) newCommitTs(txn *Txn) uint64 {
	o.Lock()
	defer o.Unlock()

	if o.hasConflict(txn) {
		return 0
	}

	var ts uint64
        o.doneRead(txn)
	o.cleanupCommittedTransactions()

	// This is the general case, when user doesn't specify the read and commit ts.
	ts = o.nextTxnTs
	o.nextTxnTs++
	o.txnMark.Begin(ts)

	y.AssertTrue(ts >= o.lastCleanupTs)

	if o.detectConflicts {
		// We should ensure that txns are not added to o.committedTxns slice when
		// conflict detection is disabled otherwise this slice would keep growing.
		o.committedTxns = append(o.committedTxns, committedTxn{
			ts:           ts,
			conflictKeys: txn.conflictKeys,
		})
	}

	return ts
}

其中冲突检测的逻辑很简单, 遍历 committedTxns, 找出当前事务开始之后提交的事务, 判断自己读到的 key 中, 是否存在于其他事务的写列表中:

// hasConflict must be called while having a lock.
  func (o *oracle) hasConflict(txn *Txn) bool {
      if len(txn.reads) == 0 {
          return false
      }
      for _, committedTxn := range o.committedTxns {
          // If the committedTxn.ts is less than txn.readTs that implies that the
          // committedTxn finished before the current transaction started.
          // We don't need to check for conflict in that case.
          // This change assumes linearizability. Lack of linearizability could
          // cause the read ts of a new txn to be lower than the commit ts of
          // a txn before it (@mrjn).
          if committedTxn.ts <= txn.readTs {
              continue
          }

          for _, ro := range txn.reads {
              if _, has := committedTxn.conflictKeys[ro]; has {
                  return true
              }
          }
      }

      return false
  }

4.1.5 事务清理

前面提到事务在提交时会结合 committedTxns 数组中的信息, 进行冲突检测。committedTxns 数组记录近期的已提交事务的信息, 显然是不能无限增长的。那么何时可以对 committedTxns 数组进行清理呢?标准就是最早的活跃的事务的开始时间戳, 如果历史事务的提交时间戳早于当前活跃的事务的开始时间戳, 冲突检查时就不需要考虑它了, 也就可以在 committedTxns 中回收它了。

3

func (o *oracle) cleanupCommittedTransactions() {
        // Must be called under o.Lock
	if !o.detectConflicts {
		// When detectConflicts is set to false, we do not store any
		// committedTxns and so there's nothing to clean up.
		return
	}
	// Same logic as discardAtOrBelow but unlocked
	var maxReadTs uint64
	if o.isManaged {
		maxReadTs = o.discardTs
	} else {
		maxReadTs = o.readMark.DoneUntil() // 在 readMark 堆中获取当前活跃事务的最早 readTs
	}

	y.AssertTrue(maxReadTs >= o.lastCleanupTs)

	// do not run clean up if the maxReadTs (read timestamp of the
	// oldest transaction that is still in flight) has not increased
	if maxReadTs == o.lastCleanupTs {
		return
	}
	o.lastCleanupTs = maxReadTs

	tmp := o.committedTxns[:0]
	for _, txn := range o.committedTxns {
		if txn.ts <= maxReadTs {
			continue
		}
		tmp = append(tmp, txn)
	}
	o.committedTxns = tmp
}

oracle 会记录 lastCleanupTs 记录上次清理的时间戳, 避免不必要的清理操作。

参考 https://dgraph.io/blog/post/badger/ https://dgraph.io/blog/post/badger-txn/