P0 - C++ Primer

环境

我这里用的 WSL ubuntu 24.04,装了一下 clang 18.1.3,以及 clangd 和 lldb。

clone BusTub 就不说了,直接快进到正餐。

vscode 装一下 code lldb 以及 cmake tools 你就可以用 GUI 来点击编译和 debug 了。

Project 描述

情景大概是:你要检查你的网站是否有一些 IP 在高频访问以及潜在的 DDoS 攻击。所以需要实时记录这些 IP 的访问频率,然而请求的流毕竟大,传统方法可能太慢,或者 memory-hungry。所以我们引入了 Count-Min Sketch.

CM Sketch 是一种概率性数据结构,近似来统计 stream 中的特定 item 的频率(使用 sublinear memory)。

维护一个二维的计数矩阵,使用 d-independently seeded 的哈希函数来定位。每次更新时增加某一行的一个 cell,查询时返回的是这些计数器的最小值。此外,CM sketch 是 mergeable 的,也就是说,两个 sketch 合并的结果与一个 sketch 统计两个合并的流结果相同。

这个算法基于以下三个参数:

  • width(w) - hash 矩阵的列数。每个 hash 都把一个 item 映射到 [0, w-1] 的索引。w 越大,smaller additive error。

  • depth(d) - 行数 / independent 的 hash 函数。d 越大,lower probability of a bad overestimate.

  • hash family / seeds 一种导出 d 个 pairwise-independent 哈希函数的方法(比如每一行都设置一个不同的哈希)

之后给了一个示例以及三个介绍相关算法的资源。看一下就能看懂具体的原理。

需要注意的是,**代码要线程安全。**此外最后一个测试还对性能有一些要求。

实现

任务只有一个,实现 CM Sketch 就可以。

首先需要一个二维矩阵来维护,考虑到他要求支持并发,所以建议单独抽出一个结构来实现。

先实现一个单线程的版本,也不考虑任何优化。

基础的矩阵构造就是构造二维 std::vector ,内部的元素我维护了 uint64_t。构造直接转发就可以。然后记得检查参数抛异常。记得让内部的矩阵满足 RAII,这样外面构造 CountMinSketch 时,检查 width depth 是否合法的时候抛异常才是安全的。

这几个函数的单线程版本实现都很简单,记得 width 是 列数,depth 是行数就行。

读的时候建议使用 std::vector::at() 来读取,记得处理异常。

之后可以先来实现一个 naive 的并发版本,一把大锁锁上去,先看看把所有的测试通过(除了最后一个性能测试)。

我对二维矩阵单独定义了一个类出来,主要需要在对他进行操作时上锁。

首先需要考虑用什么锁,std::mutex 配合 std::scoped_lock 应该不错,因为这里需要实现移动,而这要同时锁两个锁,自己来锁明显就是等着死锁。推荐 C++17 的 std::scoped_lock,它提供 RAII 封装、可以同时锁多个锁,而且保证不会死锁。

这个也很容易实现,之后就是要考虑降低锁的粒度。

我考虑采用一个方式,把矩阵内部的元素全部换成原子变量,构造和移动时再使用一把锁整体锁住两边。

不过 std::atomic 没有移动和拷贝构造,所以记得用 std::shared_ptr 包一下。

Debug 模式下最后 Speedup 是 4.04.5,Release 模式下是 3.53.8。

最后提交前记得 cmake 执行 format 和 clang-tidy。

代码放一部分吧,主要是维护的二维矩阵,声明就不放了,只放一些函数定义。

原子操作的内存序应该是对的,不知道评分系统是不是 x86,如果是 x86 的话有问题也测不出来,所以请自行参考评估。

/** this is NOT atomic opreation, so it must be locked from outside */
CounterMatrix::CounterMatrix(uint32_t width, uint32_t depth) {
  matrix_ = std::vector<std::vector<std::shared_ptr<std::atomic<count_t>>>>{};
  matrix_.resize(depth);

  for (size_t row{}; row < depth; ++row) {
    matrix_[row].resize(width);
    for (size_t col{}; col < width; ++col) {
      matrix_[row][col] = std::make_shared<std::atomic<count_t>>(0);
    }
  }
}

void CounterMatrix::Update(size_t i_row, size_t i_col, count_t count) {
  matrix_.at(i_row).at(i_col)->fetch_add(count, std::memory_order_acq_rel);
}

auto CounterMatrix::Read(size_t i_row, size_t i_col) const -> count_t {
  return matrix_.at(i_row).at(i_col)->load(std::memory_order_acquire);
}

auto CounterMatrix::BatchRead(const std::vector<size_t> &col_indexes) const -> std::vector<count_t> {
  std::vector<count_t> counts;
  counts.reserve(col_indexes.size());

  for (size_t i{}; i < col_indexes.size(); ++i) {
    try {
      const auto count = this->Read(i, col_indexes[i]);
      counts.push_back(count);
    } catch (...) {
      counts.push_back(0);
    }
  }

  return counts;
}

void CounterMatrix::Clear() {
  /** this is NOT atomic opreation, so it must be locked from outside*/
  for (auto &row : matrix_) {
    for (auto &elem : row) {
      elem->store(0, std::memory_order_release);
    }
  }
}

提交

p0 的思路非常简单,提交之后一次直接 100 分,感觉非常有成就感。