// db_impl.cc Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates){ ... // some code prepare for writing // Write WAL status = log_->AddRecord(WriteBatchInternal::Contents(write_batch)); bool sync_error = false; if (status.ok() && options.sync) { status = logfile_->Sync(); if (!status.ok()) { sync_error = true; } } if (status.ok()) { // insert the data to the memtable status = WriteBatchInternal::InsertInto(write_batch, mem_); }
// log_writer.cc Status Writer::AddRecord(const Slice& slice){ // the data and length of the log constchar* ptr = slice.data(); size_t left = slice.size();
// Fragment the record if necessary and emit it. Note that if slice // is empty, we still want to iterate once to emit a single // zero-length record Status s; bool begin = true; do { // ------------ Begin of Code Fragment A ------------- // the left size of current block constint leftover = kBlockSize - block_offset_; assert(leftover >= 0); if (leftover < kHeaderSize) { // If the left space cannot write a chunk header // Switch to a new block if (leftover > 0) { // Fill the trailer (literal below relies on kHeaderSize being 7) static_assert(kHeaderSize == 7, ""); dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover)); } // when switching to a new block, just set the offset in the block to 0 block_offset_ = 0; } // ------------ End of Code Fragment A -------------
// ------------ Begin of Code Fragment B ------------- // Invariant: we never leave < kHeaderSize bytes in a block. assert(kBlockSize - block_offset_ - kHeaderSize >= 0);
// the size of space used for writing data constsize_t avail = kBlockSize - block_offset_ - kHeaderSize; // if the fragment_length == left, current log can be completely written into this block // else current log should be divided into several fragment constsize_t fragment_length = (left < avail) ? left : avail;
RecordType type; constbool end = (left == fragment_length); if (begin && end) { type = kFullType; } elseif (begin) { type = kFirstType; } elseif (end) { type = kLastType; } else { type = kMiddleType; } // ------------ End of Code Fragment B -------------
// ------------ Begin of Code Fragment C ------------- // write current chunk to disk s = EmitPhysicalRecord(type, ptr, fragment_length); ptr += fragment_length; left -= fragment_length; begin = false; // ------------ End of Code Fragment C ------------- } while (s.ok() && left > 0); return s; }
这段代码还是比较清晰的,A 段代码主要是在检查当前 Block 剩余空间大小:如果当前 Block 剩余的空间小于一个 ChunkHeader 的大小,那么就会把 Block 剩下的空间用 0 填满,然后切换到一个新的 Block 中。这里切换新 Block 的操作非常简单,就是将记录当前数据 offset 的变量置为 0 。 B 段代码则是在检查当前这条日志的写入情况:如果 Block 剩余的空间大小可以将这条日志写完,那么新加入的 Chunk 类型就为 Full ;否则一条日志可能要写在多个 Chunk 中,那么就根据当前写的情况将 Chunk 标志为 First、Middle 或者 Last 。C 段代码则是调用函数将当前这条 Chunk 提交并写入到磁盘中。值得注意的是,假如我们当前日志的内容为空,即变量 left 一开始就为 0,那么这段代码仍然会提交一个 data 段为空的 Chunk 。
// log_writer.cc Status Writer::EmitPhysicalRecord(RecordType t, constchar* ptr, size_t length){ assert(length <= 0xffff); // Must fit in two bytes assert(block_offset_ + kHeaderSize + length <= kBlockSize);
// Format the header // The structure of Header // CheckSum 4 Byte // DataLength 2 Byte // ChunkType 1 Byte char buf[kHeaderSize]; // ------------ Start of Code Fragment A ------------- // Encode the DataLength buf[4] = static_cast<char>(length & 0xff); buf[5] = static_cast<char>(length >> 8); // Encode the ChunkType buf[6] = static_cast<char>(t);
// Compute the crc of the record type and the payload. uint32_t crc = crc32c::Extend(type_crc_[t], ptr, length); crc = crc32c::Mask(crc); // Adjust for storage EncodeFixed32(buf, crc); // ------------ End of Code Fragment A -------------
// ------------ Start of Code Fragment B ------------- // Write the header and the payload Status s = dest_->Append(Slice(buf, kHeaderSize)); if (s.ok()) { s = dest_->Append(Slice(ptr, length)); if (s.ok()) { // flush the chunk to disk s = dest_->Flush(); } } // ------------ End of Code Fragment B ------------- block_offset_ += kHeaderSize + length; return s; }
这一段代码分为两个部分:A 部分代码编码了 ChunkHeader ,我们可以看到校验和是通过 crc32c::Extend 和 crc32c::Mask 方法计算得到的,然后它们和 DataLength、 ChunkType 一起被编码进了 buf 中。B 部分将 ChunkHeader 和 Data 分别添加到了 dest_ 中。这里的 dest_ 是一个 WritableFile 对象,其 Append 的方法并不会将数据刷到文件中,而是保存在一个缓冲区中。当 Chunk Header 以及 Data 部分的数据都成功添加到了 dest_ 中后,就会调用其 Flush 方法,这个方法会将其缓冲区中的 Chunk 刷到磁盘上。有一点需要注意的是,在不同的操作系统下,dest_ 的具体类型会不同。WritableFile 是一个抽象类,在 Windows 下其实现为 WindowsWritableFile;在 Mac 和 Linux 系统下,其实现是 PosixWritableFile 。它们的区别在于写入数据时调用的系统调用不一样,编译时编译器会根据操作系统的信息来选择不同的实现。
// log_reader.cc boolReader::ReadRecord(Slice* record, std::string* scratch){ // ------------ Start of Code Fragment A ------------- // 如果 last_record_offset_ 小于 initial_offset_ // 那么 initial_offset 大于 0,并且第一个 chunk 还没有被读取 // 所以直接跳转到第一个 chunk 开始的位置 if (last_record_offset_ < initial_offset_) { if (!SkipToInitialBlock()) { returnfalse; } }
// scratch 用于缓存一条日志的数据 scratch->clear(); record->clear(); bool in_fragmented_record = false; // Record offset of the logical record that we're reading // 0 is a dummy value to make compilers happy // 记录当前正在读的这条日志在文件中的 offset uint64_t prospective_record_offset = 0;
Slice fragment; // ------------ End of Code Fragment A -------------
// ------------ Start of Code Fragment B ------------- while (true) { constunsignedint record_type = ReadPhysicalRecord(&fragment);
// ReadPhysicalRecord may have only had an empty trailer remaining in its // internal buffer. Calculate the offset of the next physical record now // that it has returned, properly accounting for its header size. // physical_record_offset 就是当前读出来的这个 chunk 在文件中开始的位置 uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size() - kHeaderSize - fragment.size();
switch (record_type) { case kFullType: // 如果读到了 FullType 的 Chunk,那么就已经读到了一整条日志,直接返回 if (in_fragmented_record) { // Handle bug in earlier versions of log::Writer where // it could emit an empty kFirstType record at the tail end // of a block followed by a kFullType or kFirstType record // at the beginning of the next block. if (!scratch->empty()) { ReportCorruption(scratch->size(), "partial record without end(1)"); } } prospective_record_offset = physical_record_offset; scratch->clear(); *record = fragment; last_record_offset_ = prospective_record_offset; returntrue;
case kFirstType: // 如果读到了 firstType 的 Chunk,那么就要将 in_fragmented_record 置为 true // 然后继续往下读,直到读到了一条 Last 类型的 chunk if (in_fragmented_record) { // Handle bug in earlier versions of log::Writer where // it could emit an empty kFirstType record at the tail end // of a block followed by a kFullType or kFirstType record // at the beginning of the next block. if (!scratch->empty()) { ReportCorruption(scratch->size(), "partial record without end(2)"); } } prospective_record_offset = physical_record_offset; scratch->assign(fragment.data(), fragment.size()); in_fragmented_record = true; break;
case kMiddleType: // 如果读到了 Middle 类型的 Chunk,将内容添加进 scratch 中,然后接着往下读 // 直到读到了一条 Last 类型的 Chunk if (!in_fragmented_record) { ReportCorruption(fragment.size(), "missing start of fragmented record(1)"); } else { scratch->append(fragment.data(), fragment.size()); } break;
case kLastType: // 读到了一条 Last 类型的日志,将这条日志的内容以及前面缓存的日志一起拼接起来 // 形成一条完整的日志并且返回 if (!in_fragmented_record) { ReportCorruption(fragment.size(), "missing start of fragmented record(2)"); } else { scratch->append(fragment.data(), fragment.size()); *record = Slice(*scratch); last_record_offset_ = prospective_record_offset; returntrue; } break;
case kEof: if (in_fragmented_record) { // This can be caused by the writer dying immediately after // writing a physical record but before completing the next; don't// scratch 用于缓存一条日志的数据 // treat it as a corruption, just ignore the entire logical record. scratch->clear(); } returnfalse;
case kBadRecord: if (in_fragmented_record) { ReportCorruption(scratch->size(), "error in middle of record"); in_fragmented_record = false; scratch->clear(); } break;
default: { char buf[40]; std::snprintf(buf, sizeof(buf), "unknown record type %u", record_type); ReportCorruption( (fragment.size() + (in_fragmented_record ? scratch->size() : 0)), buf); in_fragmented_record = false; scratch->clear(); break; } } } // ------------ End of Code Fragment B ------------- returnfalse; }
这个函数中主要分为两个部分,A 部分的代码主要是一些准备工作,例如将文件的 offset 移动到第一个 Chunk 的起始位置,清空缓冲区等;B 部分代码的工作是不断地读出下一个 Chunk 的内容,根据这个 Chunk 的类型来判断是否需要继续读下一个 Chunk(读到了 First 类型或者 Middle 类型的 Chunk),如果一条日志的内容读取完了(读到了 Full 类型或者 Last 类型的 Chunk),就将缓冲区中的数据组装成一个 Slice 对象返回,这个对象中的内容就是一条逻辑日志的完整内容。
// log_reader.cc // 从 buffer 中读取一个 Chunk,返回这个 chunk 的类型 // 如果当前 buffer 已经读完,则会从文件中读取下一个 Block 的内容到 buffer 中 // 如果文件的内容也被读完了,则返回 eof unsignedintReader::ReadPhysicalRecord(Slice* result){ while (true) { // ------------ Start of Code Fragment A ------------- // 如果 buffer 中剩余的数据大小小于一个 ChunkHeader 的大小 // 则说明剩下的都是 trailer,直接跳到下一个 Block if (buffer_.size() < kHeaderSize) { if (!eof_) { // Last read was a full read, so this is a trailer to skip buffer_.clear(); // 读取下一个 Block Status status = file_->Read(kBlockSize, &buffer_, backing_store_); end_of_buffer_offset_ += buffer_.size(); if (!status.ok()) { buffer_.clear(); ReportDrop(kBlockSize, status); eof_ = true; return kEof; } elseif (buffer_.size() < kBlockSize) { eof_ = true; } continue; } else { // Note that if buffer_ is non-empty, we have a truncated header at the // end of the file, which can be caused by the writer crashing in the // middle of writing the header. Instead of considering this an error, // just report EOF. buffer_.clear(); return kEof; } } // ------------ End of Code Fragment A -------------
// ------------ Start of Code Fragment B ------------- // Parse the header // 获取当前 Chunk 的 type 和 data length constchar* header = buffer_.data(); constuint32_t a = static_cast<uint32_t>(header[4]) & 0xff; constuint32_t b = static_cast<uint32_t>(header[5]) & 0xff; constunsignedint type = header[6]; constuint32_t length = a | (b << 8);
// 如果 Header 的大小加上数据部分的大小比 buffer 中数据的大小还大 // 说明数据的读取发生了异常,报告异常并返回 if (kHeaderSize + length > buffer_.size()) { size_t drop_size = buffer_.size(); buffer_.clear(); if (!eof_) { ReportCorruption(drop_size, "bad record length"); return kBadRecord; } // If the end of the file has been reached without reading |length| bytes // of payload, assume the writer died in the middle of writing the record. // Don't report a corruption. return kEof; }
if (type == kZeroType && length == 0) { // Skip zero length record without reporting any drops since // such records are produced by the mmap based writing code in // env_posix.cc that preallocates file regions. buffer_.clear(); return kBadRecord; }
// Check crc // 检查校验和,如果校验和不对,那么也报告异常 if (checksum_) { uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header)); uint32_t actual_crc = crc32c::Value(header + 6, 1 + length); if (actual_crc != expected_crc) { // Drop the rest of the buffer since "length" itself may have // been corrupted and if we trust it, we could find some // fragment of a real log record that just happens to look // like a valid log record. size_t drop_size = buffer_.size(); buffer_.clear(); ReportCorruption(drop_size, "checksum mismatch"); return kBadRecord; } } // ------------ End of Code Fragment B -------------
// ------------ Start of Code Fragment C ------------- // 读取成功,将当前 Block 的 offset 往前移动到下一条 Chunk 的开头 buffer_.remove_prefix(kHeaderSize + length);
// Skip physical record that started before initial_offset_ if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length < initial_offset_) { result->clear(); return kBadRecord; }
*result = Slice(header + kHeaderSize, length); return type; // ------------ End of Code Fragment C ------------- } }