P0 - C++ Primer
环境
我这里用的 WSL ubuntu 24.04,装了一下 clang 18.1.3,以及 clangd 和 lldb。
clone BusTub 就不说了,直接快进到正餐。
vscode 装一下 code lldb 以及 cmake tools 你就可以用 GUI 来点击编译和 debug 了。
Project 描述
情景大概是:你要检查你的网站是否有一些 IP 在高频访问以及潜在的 DDoS 攻击。所以需要实时记录这些 IP 的访问频率,然而请求的流毕竟大,传统方法可能太慢,或者 memory-hungry。所以我们引入了 Count-Min Sketch.
CM Sketch 是一种概率性数据结构,近似来统计 stream 中的特定 item 的频率(使用 sublinear memory)。
维护一个二维的计数矩阵,使用 d-independently seeded 的哈希函数来定位。每次更新时增加某一行的一个 cell,查询时返回的是这些计数器的最小值。此外,CM sketch 是 mergeable 的,也就是说,两个 sketch 合并的结果与一个 sketch 统计两个合并的流结果相同。
这个算法基于以下三个参数:
-
width(w)- hash 矩阵的列数。每个 hash 都把一个 item 映射到[0, w-1]的索引。w 越大,smaller additive error。 -
depth(d)- 行数 / independent 的 hash 函数。d 越大,lower probability of a bad overestimate. -
hash family / seeds一种导出 d 个 pairwise-independent 哈希函数的方法(比如每一行都设置一个不同的哈希)
之后给了一个示例以及三个介绍相关算法的资源。看一下就能看懂具体的原理。
需要注意的是,**代码要线程安全。**此外最后一个测试还对性能有一些要求。
实现
任务只有一个,实现 CM Sketch 就可以。
首先需要一个二维矩阵来维护,考虑到他要求支持并发,所以建议单独抽出一个结构来实现。
先实现一个单线程的版本,也不考虑任何优化。
基础的矩阵构造就是构造二维 std::vector ,内部的元素我维护了 uint64_t。构造直接转发就可以。然后记得检查参数抛异常。记得让内部的矩阵满足 RAII,这样外面构造 CountMinSketch 时,检查 width depth 是否合法的时候抛异常才是安全的。
这几个函数的单线程版本实现都很简单,记得 width 是 列数,depth 是行数就行。
读的时候建议使用 std::vector::at() 来读取,记得处理异常。
之后可以先来实现一个 naive 的并发版本,一把大锁锁上去,先看看把所有的测试通过(除了最后一个性能测试)。
我对二维矩阵单独定义了一个类出来,主要需要在对他进行操作时上锁。
首先需要考虑用什么锁,std::mutex 配合 std::scoped_lock 应该不错,因为这里需要实现移动,而这要同时锁两个锁,自己来锁明显就是等着死锁。推荐 C++17 的 std::scoped_lock,它提供 RAII 封装、可以同时锁多个锁,而且保证不会死锁。
这个也很容易实现,之后就是要考虑降低锁的粒度。
我考虑采用一个方式,把矩阵内部的元素全部换成原子变量,构造和移动时再使用一把锁整体锁住两边。
不过 std::atomic 没有移动和拷贝构造,所以记得用 std::shared_ptr 包一下。
Debug 模式下最后 Speedup 是 4.04.5,Release 模式下是 3.53.8。
最后提交前记得 cmake 执行 format 和 clang-tidy。
代码放一部分吧,主要是维护的二维矩阵,声明就不放了,只放一些函数定义。
原子操作的内存序应该是对的,不知道评分系统是不是 x86,如果是 x86 的话有问题也测不出来,所以请自行参考评估。
/** this is NOT atomic opreation, so it must be locked from outside */
CounterMatrix::CounterMatrix(uint32_t width, uint32_t depth) {
matrix_ = std::vector<std::vector<std::shared_ptr<std::atomic<count_t>>>>{};
matrix_.resize(depth);
for (size_t row{}; row < depth; ++row) {
matrix_[row].resize(width);
for (size_t col{}; col < width; ++col) {
matrix_[row][col] = std::make_shared<std::atomic<count_t>>(0);
}
}
}
void CounterMatrix::Update(size_t i_row, size_t i_col, count_t count) {
matrix_.at(i_row).at(i_col)->fetch_add(count, std::memory_order_acq_rel);
}
auto CounterMatrix::Read(size_t i_row, size_t i_col) const -> count_t {
return matrix_.at(i_row).at(i_col)->load(std::memory_order_acquire);
}
auto CounterMatrix::BatchRead(const std::vector<size_t> &col_indexes) const -> std::vector<count_t> {
std::vector<count_t> counts;
counts.reserve(col_indexes.size());
for (size_t i{}; i < col_indexes.size(); ++i) {
try {
const auto count = this->Read(i, col_indexes[i]);
counts.push_back(count);
} catch (...) {
counts.push_back(0);
}
}
return counts;
}
void CounterMatrix::Clear() {
/** this is NOT atomic opreation, so it must be locked from outside*/
for (auto &row : matrix_) {
for (auto &elem : row) {
elem->store(0, std::memory_order_release);
}
}
}
提交
p0 的思路非常简单,提交之后一次直接 100 分,感觉非常有成就感。