LevelDB 源代码阅读(三):预写日志

在前面两篇文章中,我们介绍了 LevelDB 的写流程和读流程。原本准备这篇文章介绍 LevelDB 的 MemTable 结构,但是在 LevelDB Handbook 的内容编排中,MemTable 被放在了预写日志之后介绍。于是我也决定按照 Handbook 的顺序,先介绍预写日志,再介绍 MemTable 的结构。

WAL:预写日志

WAL 的英文全称为 Write-Ahead Log ,翻译过来就是预写日志。所谓预写,就是在正式地执行对数据的写操作前(插入或者删除),先将操作的内容通过日志记录下来,这么做的目的是为了防止因进程异常或者服务器异常导致的数据库异常。

我们考虑下面两种情况:

  • 日志未写完或者未开始写进程就异常退出
  • 日志写完了,数据没写完或者没开始写进程异常退出
  • 日志写完了,数据也写完了,进程异常退出

第一种情况下,在数据库恢复时会看到一条不完整的日志(写到一半)或者看不到该条日志(没开始写),这种情况下可以认为这次写操作失败,数据库也不会恢复这次操作,保证了写操作的原子性。

第二种情况下,数据库在恢复时可以通过读取到的日志内容来恢复未完成的写操作,同样保证了原子性。

第三种情况就是正常的情况,数据库在恢复时无须进行任何操作。

因此,WAL 在这三种情况下都可以保证数据库写操作的原子性。对于 WAL 的更多知识,可以查看这篇文章。下面我们看看 WAL 在 LevelDB 中是如何实现的。

LevelDB 中的 WAL 实现

LevelDB 的日志结构

上图是 LevelDB 中日志的结构。日志被存储的最小单位是 Chunk,一个 Chunk 包含了四个部分:校验和 Checksum(4 个字节)、数据长度 Length (2 个字节),Chunk 类型 Type(1 个字节)以及数据内容,前三个部分被称为 ChunkHeader 。校验和是由数据类型以及数据内容计算出来的,使用的算法是 CRC 校验算法。Chunk 类型一共有 4 种:Full、First、Midlle、Last 。之所以需要有四种 Chunk 类型,是因为一条日志有可能因为大小太大而被存在多个 Chunk 当中。假如一条日志被存放在了一个 Chunk 当中,那么这个 Chunk 的类型就是 Full ;假如一条日志被存放在了多个 Chunk 当中,那么第一个 Chunk 的类型为 First ,最后一个 Chunk 的类型为 Last ,First 和 Last 之间包含了零个或多个 Middle 类型的 Chunk ,这些所有的 Chunk 中的数据组合起来就是日志的原始数据。

一个或多个 Chunk 会组成一个 Block 。一个 Block 是有大小限制的,上限是 32 KB,即 32768 个字节。当一个 Block 的大小达到了阈值后,这个 Block 就会被关闭,然后开启一个新的 Block 。如果我们往一个 Block 中写日志时,Block 中剩余的空间足够写下一个 ChunkHeader, 但无法容下这条日志的所有内容,那么就会写入一个类型为 First 的 Chunk,然后开启一个新的 Block ,将剩余的数据写在新 Block 中。我们可以推理出,假如一个 Chunk 的类型为 Middle,那么这个 Chunk 一定占据了一整个 Block (想想这是为什么)。

写日志

LevelDB 中所有的写操作都会在 DBImpl::Write 函数中被处理,这当中也包含了对 WAL 的处理,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// db_impl.cc
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
... // some code prepare for writing
// Write WAL
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
bool sync_error = false;
if (status.ok() && options.sync) {
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
if (status.ok()) {
// insert the data to the memtable
status = WriteBatchInternal::InsertInto(write_batch, mem_);
}

... // some code after write

return status;
}

可以看到,WAL 确实是在正式写数据之前完成的。 WriteBatchInternal::Contents 函数会将 WriteBatch 中的 rep_ 包装成 Slice 对象(关于 WriteBatch 和 Slice 的介绍,可以参考前面介绍 LevelDB 写流程的文章),然后传入 Writer 的 AddRecord 函数中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// log_writer.cc
Status Writer::AddRecord(const Slice& slice) {
// the data and length of the log
const char* ptr = slice.data();
size_t left = slice.size();

// Fragment the record if necessary and emit it. Note that if slice
// is empty, we still want to iterate once to emit a single
// zero-length record
Status s;
bool begin = true;
do {
// ------------ Begin of Code Fragment A -------------
// the left size of current block
const int leftover = kBlockSize - block_offset_;
assert(leftover >= 0);
if (leftover < kHeaderSize) {
// If the left space cannot write a chunk header
// Switch to a new block
if (leftover > 0) {
// Fill the trailer (literal below relies on kHeaderSize being 7)
static_assert(kHeaderSize == 7, "");
dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
}
// when switching to a new block, just set the offset in the block to 0
block_offset_ = 0;
}
// ------------ End of Code Fragment A -------------

// ------------ Begin of Code Fragment B -------------
// Invariant: we never leave < kHeaderSize bytes in a block.
assert(kBlockSize - block_offset_ - kHeaderSize >= 0);

// the size of space used for writing data
const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
// if the fragment_length == left, current log can be completely written into this block
// else current log should be divided into several fragment
const size_t fragment_length = (left < avail) ? left : avail;

RecordType type;
const bool end = (left == fragment_length);
if (begin && end) {
type = kFullType;
} else if (begin) {
type = kFirstType;
} else if (end) {
type = kLastType;
} else {
type = kMiddleType;
}
// ------------ End of Code Fragment B -------------

// ------------ Begin of Code Fragment C -------------
// write current chunk to disk
s = EmitPhysicalRecord(type, ptr, fragment_length);
ptr += fragment_length;
left -= fragment_length;
begin = false;
// ------------ End of Code Fragment C -------------
} while (s.ok() && left > 0);
return s;
}

这段代码还是比较清晰的,A 段代码主要是在检查当前 Block 剩余空间大小:如果当前 Block 剩余的空间小于一个 ChunkHeader 的大小,那么就会把 Block 剩下的空间用 0 填满,然后切换到一个新的 Block 中。这里切换新 Block 的操作非常简单,就是将记录当前数据 offset 的变量置为 0 。 B 段代码则是在检查当前这条日志的写入情况:如果 Block 剩余的空间大小可以将这条日志写完,那么新加入的 Chunk 类型就为 Full ;否则一条日志可能要写在多个 Chunk 中,那么就根据当前写的情况将 Chunk 标志为 First、Middle 或者 Last 。C 段代码则是调用函数将当前这条 Chunk 提交并写入到磁盘中。值得注意的是,假如我们当前日志的内容为空,即变量 left 一开始就为 0,那么这段代码仍然会提交一个 data 段为空的 Chunk 。

下面我们来看看一个 Chunk 是如何被提交的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// log_writer.cc
Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr,
size_t length) {
assert(length <= 0xffff); // Must fit in two bytes
assert(block_offset_ + kHeaderSize + length <= kBlockSize);

// Format the header
// The structure of Header
// CheckSum 4 Byte
// DataLength 2 Byte
// ChunkType 1 Byte
char buf[kHeaderSize];
// ------------ Start of Code Fragment A -------------
// Encode the DataLength
buf[4] = static_cast<char>(length & 0xff);
buf[5] = static_cast<char>(length >> 8);
// Encode the ChunkType
buf[6] = static_cast<char>(t);

// Compute the crc of the record type and the payload.
uint32_t crc = crc32c::Extend(type_crc_[t], ptr, length);
crc = crc32c::Mask(crc); // Adjust for storage
EncodeFixed32(buf, crc);
// ------------ End of Code Fragment A -------------

// ------------ Start of Code Fragment B -------------
// Write the header and the payload
Status s = dest_->Append(Slice(buf, kHeaderSize));
if (s.ok()) {
s = dest_->Append(Slice(ptr, length));
if (s.ok()) {
// flush the chunk to disk
s = dest_->Flush();
}
}
// ------------ End of Code Fragment B -------------
block_offset_ += kHeaderSize + length;
return s;
}

这一段代码分为两个部分:A 部分代码编码了 ChunkHeader ,我们可以看到校验和是通过 crc32c::Extendcrc32c::Mask 方法计算得到的,然后它们和 DataLength、 ChunkType 一起被编码进了 buf 中。B 部分将 ChunkHeader 和 Data 分别添加到了 dest_ 中。这里的 dest_ 是一个 WritableFile 对象,其 Append 的方法并不会将数据刷到文件中,而是保存在一个缓冲区中。当 Chunk Header 以及 Data 部分的数据都成功添加到了 dest_ 中后,就会调用其 Flush 方法,这个方法会将其缓冲区中的 Chunk 刷到磁盘上。有一点需要注意的是,在不同的操作系统下,dest_ 的具体类型会不同。WritableFile 是一个抽象类,在 Windows 下其实现为 WindowsWritableFile;在 Mac 和 Linux 系统下,其实现是 PosixWritableFile 。它们的区别在于写入数据时调用的系统调用不一样,编译时编译器会根据操作系统的信息来选择不同的实现。

读日志

当系统在恢复时,需要读取硬盘上的日志然后恢复之前由于进程异常退出丢失的操作。下面我们来讲讲 LevelDB 是如何读取日志的。在前面写入日志时,代码都是中 log_writer.cc 中的。对应的,在读取日志时,也有一个 log_reader.cc 中的 Reader 类负责将日志读出并解析。在这个类中有一个函数 ReadRecord 直接读取下一条日志并返回,下面我们看看这个函数的内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
// log_reader.cc
bool Reader::ReadRecord(Slice* record, std::string* scratch) {
// ------------ Start of Code Fragment A -------------
// 如果 last_record_offset_ 小于 initial_offset_
// 那么 initial_offset 大于 0,并且第一个 chunk 还没有被读取
// 所以直接跳转到第一个 chunk 开始的位置
if (last_record_offset_ < initial_offset_) {
if (!SkipToInitialBlock()) {
return false;
}
}

// scratch 用于缓存一条日志的数据
scratch->clear();
record->clear();
bool in_fragmented_record = false;
// Record offset of the logical record that we're reading
// 0 is a dummy value to make compilers happy
// 记录当前正在读的这条日志在文件中的 offset
uint64_t prospective_record_offset = 0;

Slice fragment;
// ------------ End of Code Fragment A -------------


// ------------ Start of Code Fragment B -------------
while (true) {
const unsigned int record_type = ReadPhysicalRecord(&fragment);

// ReadPhysicalRecord may have only had an empty trailer remaining in its
// internal buffer. Calculate the offset of the next physical record now
// that it has returned, properly accounting for its header size.
// physical_record_offset 就是当前读出来的这个 chunk 在文件中开始的位置
uint64_t physical_record_offset =
end_of_buffer_offset_ - buffer_.size() - kHeaderSize - fragment.size();

if (resyncing_) {
if (record_type == kMiddleType) {
continue;
} else if (record_type == kLastType) {
resyncing_ = false;
continue;
} else {
resyncing_ = false;
}
}

switch (record_type) {
case kFullType:
// 如果读到了 FullType 的 Chunk,那么就已经读到了一整条日志,直接返回
if (in_fragmented_record) {
// Handle bug in earlier versions of log::Writer where
// it could emit an empty kFirstType record at the tail end
// of a block followed by a kFullType or kFirstType record
// at the beginning of the next block.
if (!scratch->empty()) {
ReportCorruption(scratch->size(), "partial record without end(1)");
}
}
prospective_record_offset = physical_record_offset;
scratch->clear();
*record = fragment;
last_record_offset_ = prospective_record_offset;
return true;

case kFirstType:
// 如果读到了 firstType 的 Chunk,那么就要将 in_fragmented_record 置为 true
// 然后继续往下读,直到读到了一条 Last 类型的 chunk
if (in_fragmented_record) {
// Handle bug in earlier versions of log::Writer where
// it could emit an empty kFirstType record at the tail end
// of a block followed by a kFullType or kFirstType record
// at the beginning of the next block.
if (!scratch->empty()) {
ReportCorruption(scratch->size(), "partial record without end(2)");
}
}
prospective_record_offset = physical_record_offset;
scratch->assign(fragment.data(), fragment.size());
in_fragmented_record = true;
break;

case kMiddleType:
// 如果读到了 Middle 类型的 Chunk,将内容添加进 scratch 中,然后接着往下读
// 直到读到了一条 Last 类型的 Chunk
if (!in_fragmented_record) {
ReportCorruption(fragment.size(),
"missing start of fragmented record(1)");
} else {
scratch->append(fragment.data(), fragment.size());
}
break;

case kLastType:
// 读到了一条 Last 类型的日志,将这条日志的内容以及前面缓存的日志一起拼接起来
// 形成一条完整的日志并且返回
if (!in_fragmented_record) {
ReportCorruption(fragment.size(),
"missing start of fragmented record(2)");
} else {
scratch->append(fragment.data(), fragment.size());
*record = Slice(*scratch);
last_record_offset_ = prospective_record_offset;
return true;
}
break;

case kEof:
if (in_fragmented_record) {
// This can be caused by the writer dying immediately after
// writing a physical record but before completing the next; don't// scratch 用于缓存一条日志的数据
// treat it as a corruption, just ignore the entire logical record.
scratch->clear();
}
return false;

case kBadRecord:
if (in_fragmented_record) {
ReportCorruption(scratch->size(), "error in middle of record");
in_fragmented_record = false;
scratch->clear();
}
break;

default: {
char buf[40];
std::snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
ReportCorruption(
(fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
buf);
in_fragmented_record = false;
scratch->clear();
break;
}
}
}
// ------------ End of Code Fragment B -------------
return false;
}

这个函数中主要分为两个部分,A 部分的代码主要是一些准备工作,例如将文件的 offset 移动到第一个 Chunk 的起始位置,清空缓冲区等;B 部分代码的工作是不断地读出下一个 Chunk 的内容,根据这个 Chunk 的类型来判断是否需要继续读下一个 Chunk(读到了 First 类型或者 Middle 类型的 Chunk),如果一条日志的内容读取完了(读到了 Full 类型或者 Last 类型的 Chunk),就将缓冲区中的数据组装成一个 Slice 对象返回,这个对象中的内容就是一条逻辑日志的完整内容。

读取下一个 Chunk 的执行逻辑主要在 ReadPhysicalRecord 函数中,它的内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
// log_reader.cc
// 从 buffer 中读取一个 Chunk,返回这个 chunk 的类型
// 如果当前 buffer 已经读完,则会从文件中读取下一个 Block 的内容到 buffer 中
// 如果文件的内容也被读完了,则返回 eof
unsigned int Reader::ReadPhysicalRecord(Slice* result) {
while (true) {
// ------------ Start of Code Fragment A -------------
// 如果 buffer 中剩余的数据大小小于一个 ChunkHeader 的大小
// 则说明剩下的都是 trailer,直接跳到下一个 Block
if (buffer_.size() < kHeaderSize) {
if (!eof_) {
// Last read was a full read, so this is a trailer to skip
buffer_.clear();
// 读取下一个 Block
Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
end_of_buffer_offset_ += buffer_.size();
if (!status.ok()) {
buffer_.clear();
ReportDrop(kBlockSize, status);
eof_ = true;
return kEof;
} else if (buffer_.size() < kBlockSize) {
eof_ = true;
}
continue;
} else {
// Note that if buffer_ is non-empty, we have a truncated header at the
// end of the file, which can be caused by the writer crashing in the
// middle of writing the header. Instead of considering this an error,
// just report EOF.
buffer_.clear();
return kEof;
}
}
// ------------ End of Code Fragment A -------------

// ------------ Start of Code Fragment B -------------
// Parse the header
// 获取当前 Chunk 的 type 和 data length
const char* header = buffer_.data();
const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
const unsigned int type = header[6];
const uint32_t length = a | (b << 8);

// 如果 Header 的大小加上数据部分的大小比 buffer 中数据的大小还大
// 说明数据的读取发生了异常,报告异常并返回
if (kHeaderSize + length > buffer_.size()) {
size_t drop_size = buffer_.size();
buffer_.clear();
if (!eof_) {
ReportCorruption(drop_size, "bad record length");
return kBadRecord;
}
// If the end of the file has been reached without reading |length| bytes
// of payload, assume the writer died in the middle of writing the record.
// Don't report a corruption.
return kEof;
}

if (type == kZeroType && length == 0) {
// Skip zero length record without reporting any drops since
// such records are produced by the mmap based writing code in
// env_posix.cc that preallocates file regions.
buffer_.clear();
return kBadRecord;
}

// Check crc
// 检查校验和,如果校验和不对,那么也报告异常
if (checksum_) {
uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
uint32_t actual_crc = crc32c::Value(header + 6, 1 + length);
if (actual_crc != expected_crc) {
// Drop the rest of the buffer since "length" itself may have
// been corrupted and if we trust it, we could find some
// fragment of a real log record that just happens to look
// like a valid log record.
size_t drop_size = buffer_.size();
buffer_.clear();
ReportCorruption(drop_size, "checksum mismatch");
return kBadRecord;
}
}
// ------------ End of Code Fragment B -------------

// ------------ Start of Code Fragment C -------------
// 读取成功,将当前 Block 的 offset 往前移动到下一条 Chunk 的开头
buffer_.remove_prefix(kHeaderSize + length);

// Skip physical record that started before initial_offset_
if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length <
initial_offset_) {
result->clear();
return kBadRecord;
}

*result = Slice(header + kHeaderSize, length);
return type;
// ------------ End of Code Fragment C -------------
}
}

这段代码中,主要包含三部分逻辑。A 部分主要判断当前 Block 中是否还有下一条日志,如果没有则切换到下一个 Block 。如果无法读取下一个 Block 或者下一个 Block 的大小小于一个标准 Block 的大小(32 KB),则说明文件已经读完,将 eof 标志置为 true 。B 部分代码主要是根据还原 ChunkHeader,并且根据数据长度、校验和等信息判断 Chunk 中数据的正确性。程序运行到 C 部分代码则表示前面的读取和校验都没有问题,那么只需要将 buffer 的 offset 前移,然后将读取到的内容包装成一个 Slice 返回即可。以上三段代码便组成了从 Block 中读取一个 Chunk 的逻辑。

在 A 部分代码中,如果当前 buffer(即当前内存中的 Block)已经读到了末尾,那么就需要从文件中载入下一个 Block 。这里使用的函数是 SequentialFile::Read ,这个函数就是简单地使用系统调用从文件中读取了一段数据出来,因此也就不展开讲了。

到这里,LevelDB 的 WAL 日志就讲完了。应该说整个逻辑还是比较清晰易懂的,主要就是一些编解码的过程。仔细体会这个过程,对我们理解数据编码解码还是很有帮助的。


LevelDB 源代码阅读(三):预写日志
https://thumarklau.github.io/2021/07/11/leveldb-source-code-reading-3/
作者
MarkLau
发布于
2021年7月11日
许可协议