0%

如何看懂 nndeploy

最近这半年实在是闲,秉承着下班少玩手机的目的,7 月开始学 cuda,8 9 月学了 C++,10 月懈怠了一个月,11 月学了 cuda 进阶,12 月我来祸害 nndeploy 了。

一来是学完 C++ 后看下我能看懂的优秀的开源项目,一方面在之前学校训练模型感觉没意思,是看看 AI 的工程化。

个人背景

  1. CC++ 薄弱,上班才开始学,cmake 也是上班后学的,项目经验少,只能看懂简单一些的
  2. 上学那会儿会用 pytorch 训练模型,用过常见的 CV、NLP 模型。对大模型完全未知
  3. 计算机出身,对线程池、内存池、有向无环、多级流水不陌生
  4. 工作一年,会用 neonOpenCL 写算子。下班时间自学了 CUDA
  5. 对部署、推理框架完全未知,全凭兴趣,代码一点点看吧
  6. 由于涉及相当多的知识,会以超链接的形式给出,语法知识点不在解释
  7. 以我看完代码的体验而言,C++,多线程,数据结构,AI 算法都得了解,不然代码会看的很难受
  8. 模型推理是模型部署中的重点,所以会重点看一下,所以即使标题中有推理引擎的部分,但它也只是计算图中的一个节点。由于下班时间自学了 CUDA,所以推理引擎部分选用的是 tensorrt

从 main 函数开始

获取参数

说实话打开项目的时候,这么多文件夹我都没找到入口在哪。cmake 中生成可执行文件的命令为:add_executable,搜索这个关键字,定位到了是 demo 文件夹。以检测为例,打开 demo/detect/demo.cc 开始阅读。

看到 main() 函数的时候发现了未知的 gflagsvscode 中甚至无法跳转。一般而言是第三方库,打开网页搜索,果然……如果想使用这个库,可以看这里

那么 main() 函数里的这段代码,都是获取用户的输入,并创建对应的数据类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
gflags::ParseCommandLineNonHelpFlags(&argc, &argv, true);
if (demo::FLAGS_usage) {
demo::showUsage();
return -1;
}

// 检测模型的有向无环图graph名称,例如
// NNDEPLOY_YOLOV5/NNDEPLOY_YOLOV6/NNDEPLOY_YOLOV8
std::string name = demo::getName();
// 推理后端类型,例如:
// kInferenceTypeOpenVino / kInferenceTypeTensorRt / kInferenceTypeOnnxRuntime
base::InferenceType inference_type = demo::getInferenceType();
// 推理设备类型,例如:

// kDeviceTypeCodeX86:0/kDeviceTypeCodeCuda:0/...
base::DeviceType device_type = demo::getDeviceType();

// 模型类型,例如:
// kModelTypeOnnx/kModelTypeMnn/...
base::ModelType model_type = demo::getModelType();

// 模型是否是路径
bool is_path = demo::isPath();

// 模型路径或者模型字符串
std::vector<std::string> model_value = demo::getModelValue();

// input path
std::string input_path = demo::getInputPath();

// input path
base::CodecFlag codec_flag = demo::getCodecFlag();
// output path
std::string ouput_path = demo::getOutputPath();
// base::kParallelTypePipeline / base::kParallelTypeSequential
base::ParallelType pt = demo::getParallelType();

以下面的代码为例:

1
base::ParallelType pt = demo::getParallelType();

ParallelType 的定义为 :

1
2
3
4
5
6
enum ParallelType : int {
kParallelTypeNone = 0x0001,
kParallelTypeSequential = 0x0001 << 1,
kParallelTypeTask = 0x0001 << 2,
kParallelTypePipeline = 0x0001 << 3,
};

只能从单词的意思上看出,这个参数表示部署的任务是并行还是串行。

计算图

计算图创建

学过数据结构的话,对图都不陌生,用边把节点连接起来。

Edge 定义

之后就是定义图的边:

1
2
3
4
5
// 有向无环图graph的输入边packert
dag::Edge input("detect_in");

// 有向无环图graph的输出边packert
dag::Edge output("detect_out");

打开 Edge 这个类简单阅读一下,发现它继承自 NonCopyable

1
class NNDEPLOY_CC_API Edge : public base::NonCopyable

简而言之这个类被禁用了拷贝构造赋值,移动构造和赋值,我们给他留下一个数据不可被拷贝的印象就可以了。额外的,NNDEPLOY_CC_API 是项目中常见的宏定义,一般生成动态链接库会选择 release 模式。NNDEPLOY_CC_API 则会控制符号表是否对外可见,运行时出错时可以根据出错地址找到对应的符号,也就是哪个函数报错了。

Edge 的方法进行大致浏览,可以分为内存和节点位置索引相关:

  • 内存:可以操作 buffer、tensorparam
1
2
3
4
5
6
7
8
9
10
11
12
13
14
device::Buffer *create(device::Device *device, const device::BufferDesc &desc,
int index);
base::Status set(device::Buffer &buffer, int index);
device::Buffer *getBuffer(const Node *node);

base::Status set(device::Tensor *tensor, int index, bool is_external = true);
base::Status set(device::Tensor &tensor, int index);
device::Tensor *create(device::Device *device, const device::TensorDesc &desc,
int index);

base::Status set(base::Param *param, int index, bool is_external = true);
base::Status set(base::Param &param, int index);
base::Param *getParam(const Node *node);
base::Param *getGraphOutputParam();
  • 节点位置相关,目测是获取索引或者位置
1
2
3
4
5
6
int getIndex(const Node *node);
int getGraphOutputIndex();


int getPosition(const Node *node);
int getGraphOutputPosition();

至于更多内容,需要的时候再看。

Graph 定义

graph 类继承自 Node 类:

1
class NNDEPLOY_CC_API Graph : public Node

初步推测一个 graph 可以视为一个节点,被添加到其他 graph 中。简单浏览 Node 类的方法,发现它可以获取 Edge

1
2
std::vector<Edge *> getAllInput();
std::vector<Edge *> getAllOutput();

以及设置和获取一些运行时信息:

1
2
3
4
5
void setDebugFlag(bool flag);
bool getDebugFlag();

void setRunningFlag(bool flag);
bool isRunning();

setRunningFlag 为例,发现当打开 is_time_profile_ 选项后,当 ENABLE_NNDEPLOY_TIME_PROFILER 宏开启时,会通过 NNDEPLOY_TIME_POINT_START 宏定义去记录 node 的执行时间。额外的,NNDEPLOY_LOGE 日志函数,NNDEPLOY_RETURN_ON_NEQ 返回状态检查也是通过宏定义do-while(0) 的技巧实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void Node::setRunningFlag(bool flag) {
is_running_ = flag;
if (is_time_profile_) {
if (is_running_) {
NNDEPLOY_TIME_POINT_START(name_ + " run()");
} else {
NNDEPLOY_TIME_POINT_END(name_ + " run()");
}
}
if (is_debug_) {
if (is_running_) {
NNDEPLOY_LOGE("%s run start.\n", name_.c_str());
} else {
NNDEPLOY_LOGE("%s run end.\n", name_.c_str());
}
}
}

那么简单的推测:Edge 负责资源申请等管理,Node 负责调度资源运行。因为之前完全没接触过部署相关的项目,所以一边看代码一边猜测了。

至于 Graph 这个类,作者的注释很详细了,创建 NodeEdge,具体如何使用,继续往下看。

Graph 注册

1
dag::Graph *graph = new dag::Graph("demo", nullptr, &output);

这里就是创建了一个图,并且把之前创建的 Edge 添加了进去。不过迷惑一些的在后面:

1
2
3
4
5
6
7
8
// 创建检测模型有向无环图graph
dag::Graph *detect_graph =
    dag::createGraph(name, inference_type, device_type, &input, &output,
                      model_type, is_path, model_value);
if (detect_graph == nullptr) {
  NNDEPLOY_LOGE("detect_graph is nullptr");
  return -1;
}

createGraph 函数跳转进去,我看了十分钟寻思没看错呀,会直接返回空指针,报错退出。后面发现在 getGlobalGraphCreatorMap 中有两个变量是 static 的,莫非在其他地方这个函数被调用过了?

又浏览了下目标检测相关头文件 yolo.h,以及这个文件夹下 config.cmake 的写法:

1
2
3
4
5
6
7
8
9
10
11
12
file(GLOB_RECURSE SOURCE
"${ROOT_PATH}/demo/detect/*.h"
"${ROOT_PATH}/demo/detect/*.cc"
)
file(GLOB DEMO_SOURCE
"${ROOT_PATH}/demo/*.h"
"${ROOT_PATH}/demo/*.cc"
)
set(SOURCE ${SOURCE} ${DEMO_SOURCE})
# OBJECT
# BINARY
add_executable(${BINARY} ${SOURCE} ${OBJECT})

发现在 using namespace nndeploy 时,在 yolo.cc 中已经注册过了:

1
2
3
4
5
6
7
8
9
// 编程规范:g_ 开头的变量是全局变量
dag::TypeGraphRegister g_register_yolov5_graph(NNDEPLOY_YOLOV5,
createYoloV5Graph);

dag::TypeGraphRegister g_register_yolov6_graph(NNDEPLOY_YOLOV6,
createYoloV6Graph);

dag::TypeGraphRegister g_register_yolov8_graph(NNDEPLOY_YOLOV8,
createYoloV8Graph);

我也第一次见这种形式的代码,是通过注册全局变量的形式调用图创建函数。简化一下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <iostream>
#include <map>

std::map<std::string, int> map;

class TypeGraphRegister {
public:
// explicit 不允许隐式类型转换
explicit TypeGraphRegister(const std::string &name, int v) {
map[name] = v;
}
};

namespace A{
TypeGraphRegister a{"a", 1};
};


int main() {
using namespace A;
std::cerr << map["a"] << std::endl;
return 0;
}

用全局变量是因为,无法在名称空间中进行变量赋值,也就是下面的代码是错误的:

1
2
3
4
5
6
7
8
9
10
11
12
#include <iostream>

namespace A {
int a = 1;
a += 4; // 错误
}

int main() {
using namespace A;
std::cerr << A::a << std::endl;
return 0;
}

创建目标检测图

yolov5 为例,会调用 createYoloV5Graph 函数,根据用户指定的 inference_type 推理类型,device_type 设备类型等信息创建目标检测的计算图。

先创建一个图:

1
dag::Graph *graph = new dag::Graph(name, input, output);

之后在输入边 input 边和推理边 infer_input 边直接增加节点 pre,完成颜色空间转换和 resize,印象中目标检测模型是要把输入的图像 resize 到固定的尺寸来。

1
2
3
4
5
6
7
8
9
dag::Node *pre = graph->createNode<preprocess::CvtColorResize>(
"preprocess", input, infer_input);
preprocess::CvtclorResizeParam *pre_param =
dynamic_cast<preprocess::CvtclorResizeParam *>(pre->getParam());
pre_param->src_pixel_type_ = base::kPixelTypeBGR;
pre_param->dst_pixel_type_ = base::kPixelTypeRGB;
pre_param->interp_type_ = base::kInterpTypeLinear;
pre_param->h_ = 640;
pre_param->w_ = 640;

推理输入边 infer_input 和推理输出边 infer_output 之间增加推理节点 infer 完成模型推理。同理,推理结束后增加 post 节点,完成目标检测中的 nms 抑制,置信度筛选等:

1
2
3
4
dag::Node *infer = graph->createInfer<infer::Infer>(
"infer", inference_type, infer_input, infer_output);
dag::Node *post =
graph->createNode<YoloPostProcess>("postprocess", infer_output, output);

对于 createNode<YoloPostProcess> 形式的调用,看一下 createNode 方法:

1
2
3
4
template <typename T, typename... Args,
typename std::enable_if<std::is_base_of<Node, T>{}, int>::type>
Node *Graph::createNode(const std::string &name, Edge *input, Edge *output,
Args &...args)

模板那里的写法是 SFINAE,有兴趣可以看下。

额外的,增加的 pre, infer, post 这些节点都继承自 Node 类,并实现了 run 方法。启动计算图时,通过 Node 基类去调用 noderun 方法,这样就可以执行计算图中的所有 node 节点。

目标检测图中的推理引擎

创建推理节点:

1
2
dag::Node *infer = graph->createInfer<model::Infer>(
"infer", inference_type, infer_input, infer_output);

首先是构造 Infer 这个类:

1
2
3
Infer(const std::string &name, base::InferenceType type,
std::initializer_list<dag::Edge *> inputs,
std::initializer_list<dag::Edge *> outputs);

在这个构造函数中,调用 Node 构造函数传入输入输出边外,还有创建推理引擎:inference::createInference(type);。推理引擎的创建和之前的全局注册一样,以 tensorrt 为例,会创建一个全局变量:

1
2
TypeInferenceRegister<TypeInferenceCreator<TensorRtInference>>
g_tensorrt_inference_register(base::kInferenceTypeTensorRt);

在创建 TensorRtInference 的时候,会调用 Inference 的构造函数创建参数:

1
2
3
4
Inference::Inference(base::InferenceType type) {
type_ = type;
inference_param_ = createInferenceParam(type);
}

参数创建对应的代码是:

1
2
3
4
5
6
TensorRtInferenceParam::TensorRtInferenceParam() : InferenceParam() {
model_type_ = base::kModelTypeOnnx;
device_type_.code_ = base::kDeviceTypeCodeCuda;
device_type_.device_id_ = 0;
gpu_tune_kernel_ = 1;
}

可以看到 tensorrt 运行在 cuda 的单卡上面。

之后将检测模型视为 node 添加到 graph 中:graph->addNode(detect_graph);

后面是创建解码节点和编码节点,createDecodeNodecreateGraph 在实现逻辑上是类似的,大概猜测是对输入的图像或者视频进行编码解码。两者均位于 codec 名称空间下,中间还有一个 drawbox 节点,会调用 opencv 画出图片中的检测框。

图这部分的流程如下图所示:

wrapper 相关

EdgeWrapper

在调用 createEdge 的时候,将每个 edge 封装成 edge_warpper,放到当前图的 edge_repository_ 里面。这里使用 new 申请 edge_wrapper ,不恰当的释放、程序异常退出没调用析构函数时,会有内存泄漏。

1
2
3
4
5
6
7
8
9
Edge *Graph::createEdge(const std::string &name) {
Edge *edge = new Edge(name);
EdgeWrapper *edge_wrapper = new EdgeWrapper();
edge_wrapper->is_external_ = false;
edge_wrapper->edge_ = edge;
edge_wrapper->name_ = name;
edge_repository_.emplace_back(edge_wrapper);
return edge;
}

EdgeWrapper 类的代码如下,producers_consumers_ 推测用于管理边的输入节点和输出节点。

1
2
3
4
5
6
7
8
class NNDEPLOY_CC_API EdgeWrapper {
public:
bool is_external_;
Edge *edge_;
std::string name_;
std::vector<NodeWrapper *> producers_;
std::vector<NodeWrapper *> consumers_;
};

NodeWrapper

addNodecreateNode 代码类似,需要有输入边和输出边这两个参数,因此相比 createEdge 麻烦一些,多了下面的内容:

1
2
3
4
5
6
7
8
9
10
11
12
EdgeWrapper *input_wrapper = findEdgeWrapper(edge_repository_, input);
if (input_wrapper == nullptr) {
input_wrapper = this->addEdge(input);
}
input_wrapper->consumers_.emplace_back(node_wrapper);
EdgeWrapper *output_wrapper = findEdgeWrapper(edge_repository_, output);
if (output_wrapper == nullptr) {
output_wrapper = this->addEdge(output);
}
output_wrapper->producers_.emplace_back(node_wrapper);

node_repository_.emplace_back(node_wrapper);

首先调用 findEdgeWrapper 找到输入边的 wrapper,如果边不在 graph 就添加进来。输入边的 consumers_ 需要添加这个 node;同理,对于输出边的 produces_ 也需要添加这个 node。不过需要注意的是,允许有多条边的 consumers_ 是同一个节点,允许一个节点是多条边的 produces_

NodeWrapper 代码如下:

1
2
3
4
5
6
7
8
9
class NNDEPLOY_CC_API NodeWrapper {
public:
bool is_external_;
Node *node_;
std::string name_;
std::vector<NodeWrapper *> predecessors_;
std::vector<NodeWrapper *> successors_;
base::NodeColorType color_ = base::kNodeColorWhite;
};

推测其中的 predecessors_successors_ 对应 EdgeWrapperconsumers_produces_

在看 createNode 代码的时候发现了未知代码 std::initializer_list学习了一下,粗浅理解为轻量的迭代同类型对象的类模板。

1
2
3
Node *Graph::createNode(const std::string &name,
std::initializer_list<Edge *> inputs,
std::initializer_list<Edge *> outputs, Args &...args)

计算图初始化

之后就是计算图的初始化、执行和释放。之前的代码难度还 OK,到了这里感觉代码难度飞升。调用 status = graph->init(); 时完成计算图的初始化,看一下初始化了哪些内容。

首先是 this->construct(); 函数检查 graphnode, edge 是否为空,并检查 edge_wrapper 的生产者和消费者是否为空。如果这些都是空的话,说明创建的计算图有问题。

Node 处理

而后是 Node 节点的处理,首先为 Node 设置基础的信息:运行方式,是否计时等。

1
2
3
4
node->setDebugFlag(is_debug_);
node->setTimeProfileFlag(is_time_profile_);
node->setParallelType(parallel_type_);
node->setInnerFlag(true);

而后来看下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
std::vector<Edge *> inputs = node->getAllInput();
for (auto input : inputs) {
EdgeWrapper *input_wrapper = findEdgeWrapper(edge_repository_, input);
NNDEPLOY_CHECK_PARAM_NULL_RET_STATUS(input_wrapper,
"input_wrapper is null!");

for (auto producer : input_wrapper->producers_) {
insertUnique(node_wrapper->predecessors_, producer);
}
}

std::vector<Edge *> outputs = node->getAllOutput();
for (auto output : outputs) {
EdgeWrapper *output_wrapper = findEdgeWrapper(edge_repository_, output);
NNDEPLOY_CHECK_PARAM_NULL_RET_STATUS(output_wrapper,
"output_wrapper is null!");

for (auto consumer : output_wrapper->consumers_) {
insertUnique(node_wrapper->successors_, consumer);
}
}

首先调用 getAllInput 方法获取 Node 节点的输入,以 Infer 节点为例,对于 graph->createInfer<model::Infer> 这个 Infer 节点,调用 getAllInput 会调用 Node 类的方法得到输入 inputs_

1
std::vector<Edge *> Node::getAllInput() { return inputs_; }

inputs_ 是在创建子类时由子类的构造函数的参数决定的,看下 infer 类的构造函数:

1
2
3
4
Infer::Infer(const std::string &name, base::InferenceType type,
std::initializer_list<dag::Edge *> inputs,
std::initializer_list<dag::Edge *> outputs)
: dag::Node(name, inputs, outputs)

调用了父类 dag::Node 的构造函数:

1
2
3
4
5
6
7
8
Node::Node(const std::string &name, std::vector<Edge *> inputs,
std::vector<Edge *> outputs)
: name_(name) {
device_type_ = device::getDefaultHostDeviceType();
inputs_ = inputs;
outputs_ = outputs;
constructed_ = true;
}

也就是在父类的构造函数中,指定了 inputs_ 是输入边。获取节点的输入边后,获取边的 producers_。也就是在当前节点的 predecessors 中添加指向当前节点的节点。之后的处理同理,在当前节点的 successors 中添加当前节点指向的节点。说起来有点乱,看图吧:

对于黄色节点而言,蓝色节点是 predecessors,绿色节点是 successors

Edge 处理

处理节点之后开始处理边:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
for (auto edge_wrapper : edge_repository_) {
std::vector<Node *> producers;
for (auto producer : edge_wrapper->producers_) {
producers.emplace_back(producer->node_);
}
std::vector<Node *> consumers;
for (auto consumer : edge_wrapper->consumers_) {
consumers.emplace_back(consumer->node_);
}
base::Status status = edge_wrapper->edge_->setParallelType(parallel_type);
NNDEPLOY_RETURN_ON_NEQ(status, base::kStatusCodeOk,
"setParallelType failed!");
// 必须在abstract_edge管理该字段
status = edge_wrapper->edge_->increaseProducers(producers);
NNDEPLOY_RETURN_ON_NEQ(status, base::kStatusCodeOk,
"increaseProducers failed!");
status = edge_wrapper->edge_->increaseConsumers(consumers);
NNDEPLOY_RETURN_ON_NEQ(status, base::kStatusCodeOk,
"increaseConsumers failed!");
status = edge_wrapper->edge_->construct();
NNDEPLOY_RETURN_ON_NEQ(status, base::kStatusCodeOk,
"construct edge failed!");
}

这里有个疑问,std::vector<Node *> consumers 使用了指针,那么如果 abstract_edge 修改了 Node 的内容,edge_wrapper 跟踪的 Node 的内容也会被修改。会不会有影响?

setParallelType 时,创建了 abstract_edge。这里感觉不太合适,函数的用途是创建 abstract_edge,而函数名确实设置并行方式。我还以为和 node_wrapper 的处理方式一样只是设置并行方式,找了半天才找到 abstract_edge 的创建藏在 setParallelType 方法中。而后由 abstract_edge 管理边的生产者和消费者,并调用 abstract_edgeconstruct 方法。

来看一下 abstract_edge,这个边的创建形式和前面讲过的 createYoloV5Graph 一样,由 TypeEdgeRegister 注册,支持 FixedEdge(串行、任务并行)和 PipelineEdge(流水并行)。

PipelineEdgeconstruct 方法中,会将消费者添加到数据包中,用于任务并行,当数据一到位,立马执行。来看一下这个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
std::list<PipelineDataPacket *> data_packets_;
// 每个消费者 消费 的数据包最新索引 与下面当前数据包的关系为该索引为其+1
std::map<Node *, int> to_consume_index_;
// 每个消费者 消费 的当前数据包
std::map<Node *, PipelineDataPacket *> consuming_dp_;

base::Status PipelineEdge::construct() {
consumers_size_ = consumers_.size();
for (auto iter : consumers_) {
if (to_consume_index_.find(iter) == to_consume_index_.end()) {
to_consume_index_.insert({iter, 0});
}
if (consuming_dp_.find(iter) == consuming_dp_.end()) {
consuming_dp_.insert({iter, nullptr});
}
}
return base::kStatusCodeOk;
}

执行器初始化

之后就是调用 status = this->executor();,根据用户传入的执行类型创建执行器 executor_,选择串行执行还是并行执行,并行执行又分为任务并行和流水线并行。这三个概念可以看项目的 README,里面有详细的解释:

  1. 串行:按照模型部署的有向无环图的拓扑排序,依次执行每个节点。

  2. 流水线并行:在处理多帧的场景下,基于有向无环图的模型部署方式,可将前处理 Node、推理 Node、后处理 Node 绑定三个不同的线程,每个线程又可绑定不同的硬件设备下,从而三个 Node 可流水线并行处理。在多模型以及多硬件设备的的复杂场景下,更加可以发挥流水线并行的优势,从而可显著提高整体吞吐量。

  3. 任务并行:在多模型以及多硬件设备的的复杂场景下,基于有向无环图的模型部署方式,可充分挖掘模型部署中的并行性,缩短单次算法全流程运行耗时

之后对执行器 executor_ 进行初始化:

1
status = executor_->init(edge_repository_, node_repository_);

接下来仔细看看这 3 个执行器吧。到目前为止,由 graph->init() 引发的代码还没看完。

SequentialExecutor 初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
base::Status SequentialExecutor::init(
std::vector<EdgeWrapper *> &edge_repository,
std::vector<NodeWrapper *> &node_repository) {
base::Status status = topoSortDFS(node_repository, topo_sort_node_);
for (auto iter : topo_sort_node_) {
iter->node_->setInitializedFlag(false);
status = iter->node_->init();
if (status != base::kStatusCodeOk) {
NNDEPLOY_LOGE("Node %s init failed\n", iter->node_->getName().c_str());
return status;
}
iter->node_->setInitializedFlag(true);
}
edge_repository_ = edge_repository;
return status;
}

调用 topoSortDFS 函数进行了拓扑排序,而后对所有拓扑后的 Node 进行初始化 status = iter->node_->init();。比如 NodeInfer 节点,就调用 Infer 节点的初始化,完成推理引擎的初始化。如果推理引擎是 tensorrt,就会调用 base::Status TensorRtInference::init()

仔细看下 topoSortDFS 函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
base::Status topoSortDFS(std::vector<NodeWrapper *> &node_repository,
std::vector<NodeWrapper *> &topo_sort_node) {
base::Status status = base::kStatusCodeOk;
std::vector<NodeWrapper *> start_nodes = findStartNodes(node_repository);
if (start_nodes.empty()) {
NNDEPLOY_LOGE("No start node found in graph");
return base::kStatusCodeErrorInvalidValue;
}
std::stack<NodeWrapper *> dst;
for (auto node_wrapper : start_nodes) {
if (node_wrapper->color_ == base::kNodeColorWhite) {
status = TopoSortDFSRecursive(node_wrapper, dst);
} else if (node_wrapper->color_ == base::kNodeColorGray) {
NNDEPLOY_LOGE("Cycle detected in graph");
status = base::kStatusCodeErrorInvalidValue;
} else {
continue;
}
}
while (!dst.empty()) {
topo_sort_node.emplace_back(dst.top());
dst.pop();
}

checkUnuseNode(node_repository);

return base::kStatusCodeOk;
}

看到了熟悉的数据结构和 leetcode 的味道,首先寻找根节点,没有 predecessors_ 的就是根节点。其中的 TopoSortDFSRecursive 递归方法是图染色算法,也是经典数据结构和 leetcode 题,将白色的节点染成黑色,如果染色器件重复对灰色点染色,就说明计算图存在环路,报错退出。将拓扑排序后的节点放到 topo_sort_node 中。

ParallelTaskExecutor 初始化

kParallelTypeSequential 相比,DFS 算法换成了 BFS 算法,这是因为 DFSBFS 算法得到的拓扑排序不同,后者适用于并行的情况,也就是由两个节点可以并行执行。将节点全部置回了白色,因为后面 run 的时候用来判断节点是否执行过,如果执行过,设置为黑色。

而且多了线程池的初始化,至于线程池,这个东西感觉没啥好讲的。网上很多线程池的代码,如果有兴趣,看看条件变量、互斥锁的用法,最多一天差不多能看完,我之前写过 C 版本的线程池,所以这里不在展开讲了。把他理解为一个任务执行器,可以同时执行很多任务并返回就可以了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
base::Status ParallelTaskExecutor::init(
std::vector<EdgeWrapper*>& edge_repository,
std::vector<NodeWrapper*>& node_repository) {
// TODO:
// 计算图的最大并行度,决定线程的数量
thread_pool_ = new thread_pool::ThreadPool();
thread_pool_->init();
start_nodes_ = findStartNodes(node_repository);
base::Status status = topoSortBFS(node_repository, topo_sort_node_);
all_task_count_ = topo_sort_node_.size();
if (start_nodes_.empty()) {
NNDEPLOY_LOGE("No start node found in graph");
return base::kStatusCodeErrorInvalidValue;
}

for (auto iter : topo_sort_node_) {
iter->color_ = base::kNodeColorWhite;
iter->node_->setInitializedFlag(false);
status = iter->node_->init();
NNDEPLOY_RETURN_ON_NEQ(status, base::kStatusCodeOk, "node init failure");
iter->node_->setInitializedFlag(true);
}

edge_repository_ = edge_repository;
return status;
}

ParallelPipelineExecutor 初始化

除了初始化线程池外,还执行了 this->commitThreadPool();,直接提交任务开始执行。和串行、任务并行执行器最大的不同是:这个执行器在 init() 里运行,并没有 run 方法,所以准备放到后面执行器执行的时候在看了。

commitThreadPool 方法里最重要的是 updataInput,会调用绝对边的 update 方法,由于流水线并行的执行器只能用 PipelineEdge,看一下这个类的 update 方法:

推理引擎初始化

推理引擎也是一种 Node,会在执行器初始化 Node 的时候初始化推理引擎。不过这个 Node 的初始化相比之下比较重要,所以重点看一下。

1
2
3
4
5
6
7
8
9
10
11
base::Status Infer::init() {
base::Status status = base::kStatusCodeOk;
status = inference_->init();
NNDEPLOY_RETURN_ON_NEQ(status, base::kStatusCodeOk,
"abstract_inference init failed");
is_input_dynamic_ = inference_->isInputDynamic();
is_output_dynamic_ = inference_->isOutputDynamic();
can_op_input_ = inference_->canOpInput();
can_op_output_ = inference_->canOpOutput();
return status;
}

首先是调用 inference_ 的初始化,也就是调用 TensorRtInference::init() 方法。前面这一坨代码仿佛在初始化模型(我没用过任何推理引擎,智能猜代码啥意思了):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
TensorRtInferenceParam *tensorrt_inference_param =
dynamic_cast<TensorRtInferenceParam *>(inference_param_);
if (tensorrt_inference_param->is_path_) {
model_buffer = base::openFile(tensorrt_inference_param->model_value_[0]);
} else {
model_buffer = tensorrt_inference_param->model_value_[0];
}

if (tensorrt_inference_param->model_type_ == base::kModelTypeOnnx) {
status = initWithOnnxModel(model_buffer, tensorrt_inference_param);
NNDEPLOY_RETURN_ON_NEQ(status, base::kStatusCodeOk,
"initWithOnnxModel failed");
} else if (tensorrt_inference_param->model_type_ ==
base::kModelTypeTensorRt) {
status = initWithTensorRtModel(model_buffer, tensorrt_inference_param);
NNDEPLOY_RETURN_ON_NEQ(status, base::kStatusCodeOk,
"initWithTensorRtModel failed");
} else {
NNDEPLOY_LOGE("not support this model type(%d)!\n",
tensorrt_inference_param->model_type_);
return base::kStatusCodeErrorInferenceTensorRt;
}

之后是通过绑定,来获取模型输入、输出、中间缓存的绑定数,来准确的分配内存。这里可以通过名字找索引,也可以通过索引找名字。

1
2
3
4
5
for (auto i = 0; i < getNbBindings(); ++i) {
std::string name = std::string(getBindingName(i));
io_name_index_[name] = i;
io_index_name_[i] = name;
}

获取模型输入的名字和 shape

1
2
3
4
5
6
7
for (auto i = 0; i < getNbBindings(); ++i) {
if (bindingIsInput(i)) {
std::string name = std::string(getBindingName(i));
auto shape = TensorRtConvert::convertToShape(getBindingDimensions(i));
current_shape.insert({name, shape});
}
}

下面的代码我觉得 max_shape_ 为空,因为没看到在哪创建的,所以不会进入循环:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
for (auto iter : tensorrt_inference_param->max_shape_) {
auto tmp = current_shape.find(iter.first);
if (tmp != current_shape.end()) {
auto &shape = current_shape[iter.first];
if (base::shapeEqual(iter.second, shape)) {
continue;
} else {
int idx = io_name_index_[iter.first];
nvinfer1::Dims dims = TensorRtConvert::convertFromShape(iter.second);
setBindingDimensions(idx, dims);
}
} else {
NNDEPLOY_LOGE("reshape failed, not found input tensor(%s)!\n",
iter.first.c_str());
return base::kStatusCodeErrorInferenceTensorRt;
}
}

之后是获取设备:

1
device::Device *device = device::getDevice(inference_param_->device_type_);

获取设备的时候会注册一个 CudaArchitecture,也就是一个管理 cuda 设备的类。后续的代码是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
for (auto i = 0; i < num_binds; ++i) {
std::string name = std::string(getBindingName(i));
base::IntVector shape =
TensorRtConvert::convertToShape(getBindingDimensions(i));
base::DataType data_type =
TensorRtConvert::convertToDataType(getBindingDataType(i));
base::DataFormat data_format =
TensorRtConvert::convertToDataFormat(getBindingFormat(i));

if (bindingIsInput(i)) {
device::TensorDesc desc;
desc.data_type_ = data_type;
desc.data_format_ = data_format;
desc.shape_ = shape;
device::Tensor *max_input_tensor = new device::Tensor(device, desc, name);
max_input_tensors_.insert({name, max_input_tensor});

device::Buffer *max_input_buffer = max_input_tensor->getBuffer();
device::Tensor *current_input_tensor =
new device::Tensor(desc, max_input_buffer, name);
input_tensors_.insert({name, current_input_tensor});

// bindings_[i] = max_input_buffer->getData();
} else {
device::TensorDesc desc;
desc.data_type_ = data_type;
desc.data_format_ = data_format;
desc.shape_ = shape;
device::Tensor *max_output_tensor =
new device::Tensor(device, desc, name);
max_output_tensors_.insert({name, max_output_tensor});

device::Buffer *max_output_buffer = max_output_tensor->getBuffer();
device::Tensor *current_output_tensor =
new device::Tensor(desc, max_output_buffer, name);
output_tensors_.insert({name, current_output_tensor});

// bindings_[i] = max_output_buffer->getData();
}
}
  1. 获取绑定的 shape,类型和格式
  2. 如果是输入,创建对应的 tensor,存入 input_tensors_max_input_tensors_,为什么存两次存疑
  3. 如果不是输入,就存到 max_output_tensors_output_tensors_

额外的,TensorDesc 用于描述内存中的数据,而 tensorbuffer_ 管理申请的 Buffer,具体看一下内存的申请,new device::Tensor(device, desc, name) 会调用:

1
2
3
4
5
6
7
8
Tensor::Tensor(Device *device, const TensorDesc &desc, const std::string &name,
const base::IntVector &config)
: name_(name), desc_(desc), is_external_(false) {
BufferDesc buffer_desc = device->toBufferDesc(desc, config);
void *ptr = device->allocate(buffer_desc);
buffer_ = new Buffer(device, buffer_desc, ptr, base::kMemoryTypeAllocate);
ref_count_ = new int(1);
}

device->toBufferDesc(desc, config) 用于获取内存的大小,device->allocate(buffer_desc); 会根据大小调用 cudaMalloc 申请内存:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void *CudaDevice::allocate(const BufferDesc &desc) {
void *data = nullptr;
cudaError_t status = cudaMalloc(&data, desc.size_[0]);
if (cudaSuccess != status) {
NNDEPLOY_LOGE("cuda alloc failed with size %lu for %p, status:%d\n",
desc.size_[0], data, status);
return nullptr;
}
if (data == nullptr) {
NNDEPLOY_LOGE("cuda alloc got nullptr\n");
return nullptr;
}
return data;
}

最后调用 new Buffer 使用 Buffer 这个类管理申请到的内存。

总结:获取输入输出的名字、尺寸、数据类型,并创建对应的内存 buffer

计算图执行

对应代码中的 graph->run(),具体也就是调用执行器的 run 方法:status = executor_->run();

kParallelTypeSequential 执行

暗自庆幸一下这是最简单的一个:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
base::Status SequentialExecutor::run() {
base::Status status = base::kStatusCodeOk;
for (auto iter : topo_sort_node_) {
base::EdgeUpdateFlag edge_update_flag = iter->node_->updataInput();
if (edge_update_flag == base::kEdgeUpdateFlagComplete) {
iter->node_->setRunningFlag(true);
status = iter->node_->run();
NNDEPLOY_RETURN_ON_NEQ(status, base::kStatusCodeOk,
"node execute failed!\n");
iter->node_->setRunningFlag(false);
} else if (edge_update_flag == base::kEdgeUpdateFlagTerminate) {
;
} else {
NNDEPLOY_LOGE("Failed to node[%s] updataInput();\n",
iter->node_->getName().c_str());
return base::kStatusCodeErrorDag;
}
}
return status;
}

iter->node_->updataInput(); 会调用 abstact_edge_->update(node);,由于此时计算图还没有执行完毕,FixedEdge 会将节点设置为 kEdgeUpdateFlagComplete。之后就是调用 noderun 方法。对于 infer 节点,如果推理引擎是 tensorrt,就会调用 TensorRtInference::run() 方法。

实例说明,目标检测的节点运行与数据

解码节点运行

以目标检测的计算图为例,一共有 3 个节点:CvtColorResizeInferYoloPostProcess。还记得在 CvtColorResize 节点前面还有一个解码节点吗?以单张图像的目标检测为例,会读取图像并创建 mat,并将 mat 放到输出边中,完成数据的传递:

1
2
3
4
5
6
7
8
base::Status OpenCvImageDecodeNode::run() {
cv::Mat *mat = new cv::Mat(cv::imread(path_));
width_ = mat->cols;
height_ = mat->rows;
outputs_[0]->set(mat, index_, false);
index_++;
return base::kStatusCodeOk;
}

不知道为啥一定要用 outputs_[0],如果有多天输出边呢?其中的 set 方法对应:

1
2
3
4
5
6
7
8
9
10
11
12
base::Status DataPacket::set(void *anything, int index, bool is_external) {
base::Status status = base::kStatusCodeOk;
if (anything != anything_) {
destory();
}
is_external_ = is_external;
index_ = index;
flag_ = kFlagVoid;
written_ = true;
anything_ = anything;
return status;
}
CvtColorResize 运行

由于 CvtColorResize 的输入是解码节点的输出,所以可以直接在 CvtColorResizerun 方法中拿到解码节点的输出:

1
cv::Mat *src = inputs_[0]->getCvMat(this);

而后获取 host 端的设备:

1
device::Device *device = device::getDefaultHostDevice();

根据输入的参数创建描述 tensor 的描述符 desc

1
2
3
4
5
6
7
8
9
10
device::TensorDesc desc;
desc.data_type_ = tmp_param->data_type_;
desc.data_format_ = tmp_param->data_format_;
if (desc.data_format_ == base::kDataFormatNCHW) {
desc.shape_ = {1, getChannelByPixelType(tmp_param->dst_pixel_type_),
tmp_param->h_, tmp_param->w_};
} else {
desc.shape_ = {1, tmp_param->h_, tmp_param->w_,
getChannelByPixelType(tmp_param->dst_pixel_type_)};
}

根据内存信息创建 dst,也就是这条边的输出:

1
2
device::Tensor *dst =
outputs_[0]->create(device, desc, inputs_[0]->getIndex(this));

其中 getIndex 是获取当前节点的索引,由于解码节点在添加数据后进行了 index++,所以这里拿到的 index 实际为 1。至于其中的 create 方法就是创建这个节点的 tensor 输出,来看一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
device::Tensor *DataPacket::create(device::Device *device,
const device::TensorDesc &desc, int index,
const std::string &name) {
base::Status status = base::kStatusCodeOk;
device::Tensor *tensor = nullptr;
if (anything_ == nullptr) {
tensor = new device::Tensor(device, desc, name);
} else {
if (flag_ != kFlagTensor) {
destory();
tensor = new device::Tensor(device, desc, name);
} else {
tensor = (device::Tensor *)(anything_);
if (tensor->getDesc() != desc) {
destory();
tensor = new device::Tensor(device, desc, name);
}
}
}
is_external_ = false;
index_ = index;
flag_ = kFlagTensor;
written_ = false;
anything_ = (void *)(tensor);
return tensor;
}

anything 指向了实际的数据。而后对输入进行颜色空间转换和 resize 操作,这个好像是数字图像处理的部分,比如将 BGR 的图转换为 RGB 的图,并 resize 到固定尺寸。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
cv::Mat tmp_cvt;
if (tmp_param->src_pixel_type_ != tmp_param->dst_pixel_type_) {
base::CvtColorType cvt_type = base::calCvtColorType(
tmp_param->src_pixel_type_, tmp_param->dst_pixel_type_);
if (cvt_type == base::kCvtColorTypeNotSupport) {
NNDEPLOY_LOGE("cvtColor type not support");
return base::kStatusCodeErrorNotSupport;
}
int cv_cvt_type = OpenCvConvert::convertFromCvtColorType(cvt_type);
cv::cvtColor(*src, tmp_cvt, cv_cvt_type);
} else {
tmp_cvt = *src;
}

cv::Mat tmp_resize;
if (tmp_param->interp_type_ != base::kInterpTypeNotSupport) {
int interp_type =
OpenCvConvert::convertFromInterpType(tmp_param->interp_type_);
cv::resize(tmp_cvt, tmp_resize, cv::Size(w, h), 0.0, 0.0, interp_type);
} else {
tmp_resize = tmp_cvt;
}
OpenCvConvert::convertToTensor(tmp_resize, dst, tmp_param->normalize_,
tmp_param->scale_, tmp_param->mean_,
tmp_param->std_);

然后是 outputs_[0]->notifyWritten(dst);,通知 dst 数据准备好了。

infer 节点运行

获取所有输入的 tensorindex

1
2
3
4
5
6
7
8
9
10
11
12
13
for (auto input : inputs_) {
device::Tensor *tensor = input->getTensor(this);
tensors.emplace_back(tensor);
int index = input->getIndex(this);
indexs.emplace_back(index);
}
int index = indexs[0];
for (int i = 1; i < indexs.size(); i++) {
if (index != indexs[i]) {
NNDEPLOY_LOGE("index not equal");
return base::kStatusCodeErrorInvalidValue;
}
}

getTensor 对应的就是获取 data_packetanything

1
2
3
4
5
6
7
device::Tensor *DataPacket::getTensor() {
if (flag_ != kFlagTensor) {
return nullptr;
} else {
return (device::Tensor *)(anything_);
}
}

获取 index 也是数据包的 index,如果输入边的 index 不同,说明这个节点收到了错误的输入,需要报错退出。而后是为推理引擎设置输入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
for (auto tensor : tensors) {
inference_->setInputTensor(tensor->getName(), tensor);
}

base::Status Inference::setInputTensor(const std::string &name,
device::Tensor *input_tensor) {
base::Status status = base::kStatusCodeOk;

std::string new_name = "";
if (!name.empty()) {
new_name = name;
} else if (!input_tensor->getName().empty()) {
new_name = input_tensor->getName();
} else {
new_name = getInputName(0);
}

if (input_tensors_.count(new_name) > 0) {
if (input_tensor != input_tensors_[new_name]) {
external_input_tensors_[new_name] = input_tensor;
}
} else {
NNDEPLOY_LOGI("input_tensor name: %s not exist!\n", new_name.c_str());
}

return status;
}

可以看到如果这个 tensor 没有在最初的 input_tensors_ 中(初始化指定)时,视为外部的输入。在数据准备好后,推理引擎开始执行:

1
status = inference_->run();

最后就是将推理引擎的输出设置到边上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
for (auto output : outputs_) {
std::string name = output->getName();
base::ParallelType parallel_type = output->getParallelType();
bool flag = parallel_type == base::kParallelTypePipeline;
device::Tensor *tensor =
inference_->getOutputTensorAfterRun(name, device_type_, flag);
if (tensor == nullptr) {
NNDEPLOY_LOGE("can't getOutputTensorAfterRun[%s].\n", name.c_str());
status = base::kStatusCodeErrorInvalidParam;
break;
}

output->set(tensor, index, false);
}
post 节点运行

YoloPostProcess 这个节点的 run 方法比较简单,就是对输出的 tensor 进行 nms 和阈值筛选处理,不在过多解释。

ParallelTaskExecutor 执行

以所有的根节点 start_nodes_ 为起点,并行的形式执行这个计算图。

1
2
3
4
for (auto iter : start_nodes_) {
process(iter);
}
wait();

这个 wait 方法是以条件变量的形式等待完成任务的节点数大于等于总任务数,所以等待完成任务的节点数是原子类型的模板类:std::atomic<int>

至于 process(iter) 方法,除了执行节点外,还有 afterNodeRun(node_wrapper) 方法,重点来看一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
completed_task_count_++;
node_wrapper->color_ = base::kNodeColorBlack;
for (auto successor : node_wrapper->successors_) {
bool all_pre_done = true;
for (auto iter : successor->predecessors_) {
all_pre_done &= (iter->color_ == base::kNodeColorBlack);
}
if (all_pre_done && successor->color_ == base::kNodeColorWhite) {
if (successor->predecessors_.size() <= 1) {
process(successor);
} else {
submitTaskSynchronized(successor);
}
}
}

首先是已完成的任务数 +1,而后将已完成节点的颜色设置为黑色。遍历这个节点的后继节点 successors_,对于后继节点而言,如果全部的前任节点都执行完毕,那么 all_pre_done 会为 true。此时进行判断:

  • 如果这个后继节点只有一个前任节点,那么调用 process 处理这个后继节点
  • 如果这个后继节点有多个前任节点,那么已加锁的形式调用 process 处理这个后继节点,防止多线程环境下 process 函数操作 node_wrapper 这个临界资源。

最后提交任务后,以

1
2
3
4
5
std::lock_guard<std::mutex> lock(main_lock_);

if (completed_task_count_ >= all_task_count_) {
cv_.notify_one();
}

的形式唤醒等待的主线程,也就是 cv_.notify_one() 那里。最终将节点的颜色改为白色。额外的一点点小知识

使用条件变量时,在检查条件之前加锁,并在等待之前释放锁。因为唤醒的线程需要重新检查条件是否成立(因为可能会发生虚假唤醒)。如果不加锁,唤醒的线程可能会在其他线程修改条件之前就继续执行,导致逻辑错误。

虚假唤醒。这是一种能保证执行效率的方法。假设此时有10个线程处于等待中,在收到一个唤醒信号后,操作系统尝试去唤醒所有的线程,这会打破发送信号与唤醒之间一对一的关系。所以此时只能唤醒一个线程,而其余九个线程处于等待阶段。为了更灵活的处理这种情况,所以无论条件是否满足,操作系统允许等待中的线程自己醒来,称为虚假唤醒。

ParallelPipelineExecutor 执行

为了更好的读懂流水线并行的代码,简单写了份模拟的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <iostream>           // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable

std::mutex mtx;
std::condition_variable cv;
bool ready = false;

void run() {
std::unique_lock<std::mutex> lck(mtx);
ready = true;
cv.notify_all();
}

void func1(int id) {
std::unique_lock<std::mutex> lock(mtx);
while (!ready) {
cv.wait(lock);
}
std::cerr << "Node Run : thread id = " << id << "\n";
}

void commit() {
std::thread threads[10];
for (int i = 0; i < 10; i++) {
threads[i] = std::thread(func1, i); // 模拟 update 方法
}
run(); // node.run() 方法
for (auto& th : threads) {
th.join();
}
}

int main() {
commit(); // 模拟线程池提交任务
return 0;
}

一个线程等待『条件变量的条件成立』而挂起;另一个线程使『条件成立』。为了防止竞争,条件的检测是在互斥锁的保护下进行的,线程在改变条件状态前先要锁住互斥量。如果一个条件为假,则一个线程自动阻塞,该线程处于等待状态,并释放相关变量的互斥锁。如果另一个线程改变了条件,它将信号发送给关联的条件变量,唤醒一个或多个处于等待中的线程,使其重新获得互斥锁,重新评价条件。

如果能看懂上面代码的话,再来看流水线并行的代码。和前两个执行器不同的是,ParallelPipelineExecutor 执行器在 init 的时候直接将所有节点提交到线程池开始执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void ParallelPipelineExecutor::commitThreadPool() {
// NNDEPLOY_LOGE("ppe run Thread ID: %d.\n", std::this_thread::get_id());
for (auto iter : topo_sort_node_) {
auto func = [iter]() -> base::Status {
base::Status status = base::kStatusCodeOk;
while (true) {
base::EdgeUpdateFlag edge_update_flag = iter->node_->updataInput();
if (edge_update_flag == base::kEdgeUpdateFlagComplete) {
iter->node_->setRunningFlag(true);
status = iter->node_->run();
NNDEPLOY_RETURN_ON_NEQ(status, base::kStatusCodeOk,
"node execute failed!\n");
iter->node_->setRunningFlag(false);
} else if (edge_update_flag == base::kEdgeUpdateFlagTerminate) {
break;
} else {
NNDEPLOY_LOGE("Failed to node[%s] updataInput();\n",
iter->node_->getName().c_str());
status = base::kStatusCodeErrorDag;
break;
}
}
return status;
};
thread_pool_->commit(std::bind(func));
}
}

上面最重要的是 updataInput 方法,会调用所有输入边的 update 方法:

1
2
3
4
5
6
for (auto input : inputs_) {
flag = input->update(this);
if (flag != base::kEdgeUpdateFlagComplete) {
break;
}
}

由于 decode 节点没有输入边,所以会直接跳过这一环节执行 run 方法,会调用 PipeEdeLineset 方法写入数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
base::Status PipelineEdge::set(void *anything, int index, bool is_external) {
// 上锁
std::lock_guard<std::mutex> lock(mutex_);
PipelineDataPacket *dp = new PipelineDataPacket(consumers_size_);
NNDEPLOY_CHECK_PARAM_NULL_RET_STATUS(dp, "PipelineDataPacket is null.\n");

data_packets_.push_back(dp);
cv_.notify_all();
// set
base::Status status = dp->set(anything, index, is_external);
NNDEPLOY_RETURN_ON_NEQ(status, base::kStatusCodeOk,
"PipelineDataPacket set error.\n");
return status;
}

总感觉这里先放数据,在唤醒比较合适。

解码节点执行完毕后,继续循环执行 updataInput 节点,也就是调用在 PipeEdgeLineupdate 方法:

1
2
3
4
5
cv_.wait(lock, [this, tmp_node] {
return to_consume_index_[tmp_node] < data_packets_.size() ||
terminate_flag_; // 消费者需求的数据已存在,否则等待最新数据 ||
// 数据被消耗结束
});

由于 data_packets_ 中插入了数据,所以这个条件会满足继续向下执行。后续的代码我颅内 debug 了很长时间。

大概意思是:节点在一次执行后,将节点对应的索引自增。在第二次执行这个节点时,由于边管理的 data_packet 会有两条数据(前面的节点放进来的),所以这个节点需要找到对应的数据,删除不用的数据。

这个流水线并行实现了下图的效果:

如果是 yolo 对视频进行目标检测,就会有多帧的输入图像。每帧是一个 node 节点,也就是有多个输入节点,一个推理节点,多个输出节点。

  • 第一帧前处理,第一帧推理,第一帧后处理
  • 第二帧前处理,第二帧推理,第二帧后处理
  • 第 N 帧前处理,第 N 帧推理,第 N 帧后处理

这样就流水处理了起来。

计算图释放

也就是最后的 graph->deinit(),还是对应执行器的释放。

SequentialExecutor 释放

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
base::Status SequentialExecutor::deinit() {
base::Status status = base::kStatusCodeOk;
for (auto iter : edge_repository_) {
bool flag = iter->edge_->requestTerminate();
if (!flag) {
NNDEPLOY_LOGE("failed iter->edge_->requestTerminate()!\n");
return base::kStatusCodeErrorDag;
}
}
for (auto iter : topo_sort_node_) {
status = iter->node_->deinit();
NNDEPLOY_RETURN_ON_NEQ(status, base::kStatusCodeOk,
"failed iter->node_->deinit()");
iter->node_->setInitializedFlag(false);
}
return status;
}

代码很容易看懂,通过 requestTerminate 将边设置为计算完毕,执行所有的节点的释放操作。如果是 tensorrt 的推理节点,将执行推理引擎的释放。

ParallelTaskExecutor 释放

相比 SequentialExecutor,多了一步释放线程池。

ParallelPipelineExecutor 释放

ParallelTaskExecutor 释放。

在最后退出的时候,手动删除了图资源 delete graph,会调用图的析构函数,删除之前由 new 申请的 node wrapperedge wrapper

推理引擎释放

SequentialExecutorParallelTaskExecutorParallelPipelineExecutor 在释放的时候会释放节点,重点看下 infer 这个节点的释放,

1
2
3
4
5
6
base::Status Infer::deinit() {
base::Status status = base::kStatusCodeOk;
status = inference_->deinit();
NNDEPLOY_RETURN_ON_NEQ(status, base::kStatusCodeOk, "deinit failed");
return status;
}

也就是反初始化推理引擎,注意就是内存释放:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
base::Status TensorRtInference::deinit() {
base::Status status = base::kStatusCodeOk;
for (auto iter : input_tensors_) {
delete iter.second;
}
input_tensors_.clear();
for (auto iter : max_input_tensors_) {
delete iter.second;
}
max_input_tensors_.clear();
for (auto iter : output_tensors_) {
delete iter.second;
}
output_tensors_.clear();
for (auto iter : max_output_tensors_) {
delete iter.second;
}
max_output_tensors_.clear();
device::Device *device = device::getDevice(inference_param_->device_type_);
if (inner_forward_buffer_ != nullptr) {
device->deallocate(inner_forward_buffer_);
}
return status;
}

delete iter.second 会手动删除 tensor,也就是调用 tensor 类的析构函数,删除数据的 buffer 并清除引用计数。

总结

整个计算图的代码就梳理完了,对于 README 中提到的:上述模式的组合并行,我好像还不知道怎么组合。

目前对推理引擎完全未知,怎么写自己的高性能 op 还没了解到。

我想部署一个大模型!好像还差的很远。

  • DAG 的组织有点乱,而且有内存泄漏,内存这块并没有很好的管理
  • CvtColorResizerun 为什么两次 notify ?
  • CvtColorResize 用的是 inputs_[0]Infer 需要遍历所有的 inputs,很迷惑

需要实际编译运行一下看看了。

感谢上学期间打赏我的朋友们。赛博乞讨:我,秦始皇,打钱。

欢迎订阅我的文章