0%

搜推广的工程化——有限内存下的单机大数据处理

22 年校招面字节的时候,面试官随口问了个问题:有几亿条数据存储在文件中,我该怎么排序?虽然在很多高频面经上看过这个问题,但完全没有在意,心想:正常排序呗还能怎么做,加载不进内存就分块处理。可具体怎么分块并没有仔细思考过。看似是一个算法题,但这完全是一个工程题。

今年遇到了类似的问题,有两组文件,第一组叫 bench,有 32 个文件,这些文件大约 1T 左右。每个文件里存储着搜推广的稀疏数据,id 和 embedding,每一个 id 对应一个 embedding 数据。第二组文件叫 test,有 8 个文件,这些文件同样约 1T 左右,同样存储了 id 和 embedding。这些 id 和对应的 embedding 完全乱序分布在文件中,毫无规律。

目标:遍历 bench 中的每个 id 和对应的 embedding,在 test 中查找对应的 id 和 embedding,判断是否相等。如果 id 不存在或者 embedding 不相等,打印出错的 id。

问题背景

如果使用 for 循环去处理,单个文件大约几十 GB,这里可以使用 yield 等关键字来避免巨大的内存开销。但时间复杂度是 $O(n^2)$,这将会是一个巨大的工作量:

1
2
3
4
5
6
7
8
for bench_file in range(bench_files):            # 遍历 1T
bench_data = yield open(benc_file)
for (id, embedding) in bench_data:
for test_file in range(test_files): # 遍历 1T
test_data = yield open(test_file)
for (id_, embedding_) in test_file:
if id == id_:
......

那么如何快速的处理呢?

暴力破解

先来回归数据,id 是 int64 类型(8个字节)的正数,如果只有 id 数据,即使 50 亿条的 id 数据也只需要 38GB 的数据,对于现在的开发机器,这个数据很容易被加载到内存。然后不管对 bench 和 test 的 id 进行排序($O(\log N)$)、对比,或者是时间复杂度更低的哈希($O(N)$),全过程都可以在内存中操作。

可是现在还要存储 id 对应的 embedding,并对比 embedding 是否一致,机器的内存明显已经不够用了。但我们可以不存储 embedding,转而存储 embedding 的文件名和 offset:

1
data[id] = (file_name, offset)

先按这种方式存储 test 文件的 id、file_name 和 offset 到字典汇总。在遍历 bench 文件,得到 id 和 embedding 后,根据 id 去桶中查找 test 文件的 file_name 和 offset,读到 test 的 embedding,和 bench 的 embedding 做对比即可。

1
2
3
4
5
test_file, offset = data[bench_id]
f = open(test_file, "rb")
f.seek(offset)
test_emb = f.read(embedding_length)
np.all(bench_emb == test_emb)

这些操作在一个服务器上不难执行,因为服务器的内存通常在几百 GB 左右,足够存储这些数据。事实上,当数据量很小时,这代码完全没问题,强大的硬件可以掩盖代码的性能问题。

海量数据

但是在大厂中,数据很容易能到海量级别,任何内存、io 的劣势都会被放大无数倍,导致生产事故。如果内存不够呢?没有这么大内存该怎么办呢?这种场景下最强的是桶(bucket)方法,针对 test 这组文件,我们建立 10000 个桶。

1
bucket_files = [open(file, "wb") for f"{file}.bin" in range(0, 10000)]

并根据 id 模 10000 的结果,得到 idx,这个 idx 就是要实际写入的桶。

1
2
def my_hash(id):
return idx = id % 10000

那要在桶里写入什么数据呢?这里有两种参考方案,一种是直接把 id 和 emebdding 写入桶;一种是写入id、embedding 对应的 file 以及 embedding 在 file 中的偏移 offset。

  • 第一种方案的优点是对比 id 的时候能直接取出 embedding,进行对比;缺点是浪费磁盘的存储空间
  • 第二种方案的优点是节省存储空间;缺点是每次对比 id 对应的 embedding 时,需要打开 file,并 seek offset,读取 embedding

本文采用了第二种方案,因为服务器上的带宽很大,可以快速的读取数据,而且可以利用多线程技术来掩盖 io 的时间。

1
2
idx = my_hash(id)
bucket_files[idx].write(struck.pack("qqq", (id, file, offset)))

之后,对每个桶文件进行排序:

1
2
3
data = np.fromfile(file, dtype=np.int64).reshape(-1, 3)
data = data[data[:, 0].argsort()]
data.to_file()

那么在最后的 id 查找与 embedding 对比阶段,就会方便很多。遍历 bench 这组文件,得到一个 id 和 embedding 后,根据 my_hash 函数计算桶的索引,去桶中进行二分查找得到 test 文件的 embedding 存储在哪里,读取并做对比:

1
2
3
4
5
6
7
8
idx = my_hash(bench_id)
data = open(bucket_fils[idx])
id_search = np.searchsort(id, data[:0])
test_file, offset = data[id_search, 1], data[id_search, 2]

f = open(test_file)
f.seek(offset)
np.all(f.read(embedding_length), embedding)

由于有 10000 个文件,每个文件都足够小,在小内存场景下也可以进行读取、二分查找这些操作,这个过程也可以进行多线程加速。这也是单机处理大数据的老底子。

但实际在大厂里,还可以写分布式代码,用多机的环境来处理这个问题,背后的原理和刚才说的差不多。假如现在有 32 个机器,那么把 id 划分到 32 个机器上,每个机器完成建桶、排序、查找的任务即可。

比如 id 为 128964897,128964897 % 32 为 1,128964897 % 10000 = 4897,那么这个 id 放到 1 号机器的第 4897 个桶中。

由于这里处理的是 id,可以假设这些 id 为均匀分布的;当 id 分布不均时,会出现数据倾斜问题,此时需要修改我们的 my_hash 函数,不能是简单的 % 10000,需要根据实际的业务而定。这也解释了一个道理,数据结构、算法都是为业务服务的,业务在变,数据结构、算法也需要适当的修改。

总结

回到开始的问题,字节面试官期望的不是我说出归并排序、快速排序的原理和 $O(\log (N))$ 的时间复杂度,而是:

  • 机器的配置是多少?
  • 带宽是否足够大?
  • 有没有数据倾斜?
  • 在哪里可以做分布式,在哪里可以多线程加速

所以大数据的背后,更多的是对计算机体系的考量,上层算法和数据结构的设计,到中间的分布式和数据均衡,到第底层内存、io等硬件特性,需要打通所有的关卡,了解底层的原理并用到极致。

感谢上学期间打赏我的朋友们。赛博乞讨:我,秦始皇,打钱。

欢迎订阅我的文章