0%

工程开发经验总结三,以 sspaddmm 为例,再来谈一谈并行加速

今天来写什么呢?准备整理一些多线程的东西,虽然多线程相对很熟悉了,可每次工程中都会有新的收获。这次不谈多线程的理论问题,毕竟计算机专业都懂多线程的理论并写过相关程序,也了解其使用的背景。之前使用多线程的时候,总是看一看加速比和结果是否准确就完了,那么这次以多线程使用者和多线程设计者的角度出发,来谈一谈如何更好的使用和设计多线程。

问题背景

给定一个 2D 的稀疏矩阵,稀疏矩阵的采用 COO 格式的表示,如下图所示:

稀疏矩阵由三部分组成,包括 Indices, Values, Shapes。具体而言,这是一个 100 X 100 的二维稀疏矩阵,索引的第一行表示行,第二行表示列,行列对应的索引处为具体的数值,初次之前全部是 0。比如,sparse[1][0]=0.6,注意:其中的索引是可以重复的,比如有两个 [2,1]

现在的问题是,稀疏矩阵乘以稠密矩阵,既然是矩阵乘法,很容易想到并行。不过在此之前,还是简单说一下矩阵乘法的流程:

假设稀疏矩阵第 $r_1$ 行和第 $c_1$ 列的值为 $v_1$,假设稠密矩阵由 $C$ 列,那么乘法就是:将 $v_1$ 与稠密矩阵中第 $c_1$ 行对应的所有的列的值相乘,结果的索引是 $(r_1, 1), (r_1, 2), \cdots, (r_1, C)$。举个例子:

如上的例子中,稀疏矩阵乘以稠密矩阵,将单个系数矩阵元素相乘的结果,直接以 += 的形式放到对应的位置中,既然画图了,那么也就能看出来并行的点:

  1. 方案 1:乘以稠密矩阵要遍历其所有的列,所以在取列的时候并行,这样不会有冲突
  2. 方案 2:稀疏矩阵并行,以上图为例,直接拿那两个的稀疏矩阵的元素去乘以稠密矩阵的列。不过这里会有冲突,比如稀疏矩阵中有多个行相同的索引,比如上图的稀疏矩阵 [1,0], [1, 2] 这样的,在 += 的时候不加锁,多线程会出问题。

以下,mat1 表示稀疏矩阵,mat2 表示稠密矩阵,且 sspaddmm 函数要求返回稀疏矩阵而不是稠密矩阵,上面的图只是为了便于理解。此外,能使用的并行工具有限,这取决于所在的工程,这个没办法,有点戴着镣铐跳舞的意思,高中老师经常用戴着镣铐跳舞来 PUA 我们,即:你必须听话,在听话的范围内做到很好,不能越界。

并行 API

API 是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 对矩阵中每个元素自增 10

// 串行
for (int i = 0; i < 10000; i++) {
arr[i] += 10;
}

auto shared = [&](uint start, uint end) {
for (uint i = start; i < end; i++) {
arr[i] += 10
}
};
parallelfor(10000, 10000 / n_threads, shared);

其中 parallelfor 的第一个元素是任务总数,n_threads 是线程的数量,最后传入定义的匿名函数,在匿名函数中,默认从 0 开始,到任务总数结束,并划分对应的 startend 区间,比如 1 到 5000 为一组,5000 到 10000 为另一组。

串行版本

矩阵的串行乘法比较容易理解,写出串行程序来,也就知道了如何写并行程序。先遍历第一个矩阵,在遍历第二个矩阵,相乘即可:

1
2
3
4
5
6
7
8
9
10
11
int cnt{0};
for (int i = 0; i < mat1_nums; i++) {
int row1 = mat1_indices[i][0];
int col1 = mat1_indices[i][1];
for (int j = 0; j < mat2_col; j++) {
output_val[cnt] += mat1_val[i] * mat2_val[col1][j];
output_idx[cnt][0] = row1;
output_idx[cnt][1] = j;
cnt++;
}
}

使用方案 1

方案 1 的并行似乎很简单,可真的是这样吗?看上文的串行程序,我们发现做完一次乘法,就将结果直接写到 output 对应的地址中,写完一次,cnt++,为下一次写出数据做准备。那么问题来了,parallelfor 不提供锁,那么变量在没被保护的情况下的自增就会错误,如何解决呢?

既然读写会出现错误,而多线程的读是没有问题的,那么就想办法去掉写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
int cnt{0};
unordered_map<int, unordered_map<int, int>> idx_map_cnt;
for (int i = 0; i < mat1_nums; i++) {
int row1 = mat1_indices[i][0];
for (int j = 0; j < mat2_col; j++) {
idx_map_cnt[row1][j] = cnt++;
}
}

for (int i = 0; i < mat1_nums; i++) {
int row1 = mat1_indices[i][0];
int col1 = mat1_indices[i][1];
auto shared = [&](uint start, uint end) {
for (int j = start; j < end; j++) {
int idx = idx_map_cnt[row1][j];
output_val[idx] += mat1_val[i] * mat2_val[col1][j];
output_idx[idx][0] = row1;
output_idx[idx][1] = j;
}
};
parallelfor(10000, 10000/n_threads, shared);
}

你看,在时间复杂度不变的情况下,借助一个额外的映射表就实现了并行,因为是针对 mat2 进行了并行,mat1 部分仍然是串行,也就是说,不会同时出现两个 row1 相等的情况,因此多线程中的 += 是安全的。

使用方案 2

如果明白了方案 1 的并行思路,那么问题又来了,假设 mat1 有 2000 个元素,mat2 的列也是 20,那么此时我们应该针对 mat1 进行并行,而不是 mat2。但是 mat1 并行会出乱子,因为使用多线程访问上述程序中的 row1,可能会用重复的结果,那么多线程中的 += 就是不安全的。如何解决呢?反而言之,多线程访问 mat1,如何保证每个线程拿到的 row1 是不一样的。我和师弟提出了两种思路:

  1. 先来看思路 1。设计一个数据结构,这个数据结构每次从 mat1_indices 中抽取出不重复的行,因为行是不同的,那么这样就可以并行了。举个例子,mat1 中的行是 [1, 2, 1, 2, 3, 4, 5, 5, 6, 7],那么我第一次选择 [1, 2, 3, 4, 5, 6, 7] 去并行,第二次选择 [1, 2, 5] 去运算,每次选择的元素不会有重复,这样多线程的 += 就安全了。可视化一下这样的结构:

来看如何实现这样的数据结构,也就是下文中的 unrepeated,因为映射的 key 是唯一的,所以使用 key 来记录不同的索引,选择的时候,也是选择其中的 key

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
std::unordered_map<int, std::vector<int>> unrepeated;
for (int64_t i = 0; i < mat1_num; i++) {
int _row = mat1_indices[i][0];
int _col = mat1_indices[i][1];
unrepeated[_row].push_back(_col);
}

vector<vector<int>> res;
vector<int> tmp

while (unrepeated.size()) {
for (auto it = unrepeated.begin(); it != unrepeated.end(); it++) {
if (unrepeated[it->first].size() == 1) {
res.push_back({it->first, it->second.back()});
tmp.push_back(it->first);
} else {
res.push_back({it->first, it->second.back()});
it->second.pop_back();
}
}
for (auto i : tmp) {
unrepeated.erase(i);
}
tmp.clear();
int n = res.size();
auto shared = [&](uint start, uint end) {
for (int i = start; i < end; i++) {
int row1 = res[i][0];
int col1 = res[i][1];
for (int j = 0; j > mat2_col; j++) {
int idx = idx_map_cnt[row1][j];
output_val[idx] += mat1_val[i] * mat2_val[col1][j];
output_idx[idx][0] = row1;
output_idx[idx][1] = j;
}
}
};
parallelfor(res.size(), res.size()/n_threads, shared);
res.clear();
}

有没有发现问题?我猜一定没有,因为不会有人盯着一段非必须的程序仔细的看。还是直接说把,上述程序中的 mat1_val[i] 不是合法的,因为并行化以后,i 对应的是 res.size() 的某个元素,而不是 mat1_values 的第 i 个数值。怎么办呢?再加一个映射 co_map_idx 呗,把索引映射为数值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
std::unordered_map<int, std::unordered_map<int, std::vector<int>>> co_map_idx;
for (int64_t i = 0; i < mat1_vals_num; i++) {
int _row = mat1_indices[i][0];
int _col = mat1_indices[i][1];
unrepeated[_row].push_back(_col);
co_map_idx[_row][_col].push_back(mat1_val_addr[i]);
}

......

auto shared = [&](uint start, uint end) {
for (int i = start; i < end; i++) {
int row1 = res[i][0];
int col1 = res[i][1];
double val = co_map_idx[row_mat1][row_mat2].back();
co_map_idx[row_mat1][row_mat2].pop_back();
for (int j = 0; j > mat2_col; j++) {
int idx = idx_map_cnt[row1][j];
output_val[idx] += val * mat2_val[col1][j];
output_idx[idx][0] = row1;
output_idx[idx][1] = j;
}
}
};

但是根据实际运行的结果来看中,这是一种很烂的方案,每次都要选择不重复元素、清空和删除,这会带来不必要的开销,时间会很慢。那么再来看二种并行思路:我们已经有了 unrepeated 这样的数据结构,那么直接对 unrepeated 直接分组不是更好吗?

直接对 unrepeatedkey 进行分组计算,这样就不会出现重复的 key,不过为了拿到不同的 key,仍然需要一些预处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
std::vector<int> res;
for (auto it = unrepeated.begin(); it != unrepeated.end(); it++)
res.push_back(it->first)
auto multi = [&](uint32_t start, uint32_t end) {
for (uint32_t i = start; i < end; i++) {
// get val
auto row_mat1 = res[i];
for (auto row_mat2 : unrepeated[row_mat1]) {
double val = co_map_idx[row_mat1][row_mat2].back();
co_map_idx[row_mat1][row_mat2].pop_back();
for (uint32_t j = 0; j < mat2_col; j++) {
// get val
uint32_t idx = idx_map_cnt[row_mat1][j];
*(out_val_addr + idx) +=
val * mat2_val_addr[row_mat2 * mat2_col + j];
out_idx_addr[idx] = row_mat1;
out_idx_addr[idx + out_num] = j;
}
}
}
};

看,方法总比困难多,问题被完美解决。在这之后,我尝试了各种并行方案,以及双重并行,上述代码是性能最好的一个,但性能与其他库比仍然查了很多。为什么呢?来分析原因。当矩阵规模很小的时候会执行串行乘法,而串行乘法的效率会远远好于其他库,这就说明乘法的思路没问题,我觉得是并行 API 设计的不够好,它只支持一维的并行,且不支持额外的自定义,这就导致了这个接口的应用受限,甚至为了使用这个 API 还要执行很多其他的辅助。如果一定要在矩阵乘法中套两次并行,效率会极度低下。

如何设计多线程 API?

既然 parallelfor 这么难用,那如果我们是工程师,该如何设计这个接口呢?首先我们要知道,多线程的使用场景多种多样变化无穷,以计算为例,可以以行为单位进行并行,也可以以列为单位,甚至可以以数据块为单位。我想了很久,并没有想出一个很好的方案。也许,我要是能想出来,甲方也不至于提供 parallelfor 这样的 API 。

感谢上学期间打赏我的朋友们。赛博乞讨:我,秦始皇,打钱。

欢迎订阅我的文章