最近这半年实在是闲,秉承着下班少玩手机的目的,7 月开始学 cuda
,8 9 月学了 C++
,10 月懈怠了一个月,11 月学了 cuda
进阶,12 月我来祸害 nndeploy
了。
一来是学完 C++
后看下我能看懂的优秀的开源项目,一方面在之前学校训练模型感觉没意思,是看看 AI
的工程化。
个人背景
C
和 C++
薄弱,上班才开始学,cmake
也是上班后学的,项目经验少,只能看懂简单一些的
上学那会儿会用 pytorch
训练模型,用过常见的 CV、NLP 模型。对大模型完全未知
计算机出身,对线程池、内存池、有向无环、多级流水不陌生
工作一年,会用 neon
、OpenCL
写算子。下班时间自学了 CUDA
对部署、推理框架完全未知,全凭兴趣,代码一点点看吧
由于涉及相当多的知识,会以超链接的形式给出,语法知识点不在解释
以我看完代码的体验而言,C++
,多线程,数据结构,AI 算法都得了解,不然代码会看的很难受
模型推理是模型部署中的重点,所以会重点看一下,所以即使标题中有推理引擎的部分,但它也只是计算图中的一个节点。由于下班时间自学了 CUDA
,所以推理引擎部分选用的是 tensorrt
。
从 main 函数开始 获取参数 说实话打开项目的时候,这么多文件夹我都没找到入口在哪。cmake
中生成可执行文件的命令为:add_executable
,搜索这个关键字,定位到了是 demo
文件夹。以检测为例,打开 demo/detect/demo.cc
开始阅读。
看到 main()
函数的时候发现了未知的 gflags
,vscode
中甚至无法跳转。一般而言是第三方库,打开网页搜索,果然……如果想使用这个库,可以看这里 。
那么 main()
函数里的这段代码,都是获取用户的输入,并创建对应的数据类型:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 gflags::ParseCommandLineNonHelpFlags (&argc, &argv, true ); if (demo::FLAGS_usage) { demo::showUsage (); return -1 ; } std::string name = demo::getName (); base::InferenceType inference_type = demo::getInferenceType (); base::DeviceType device_type = demo::getDeviceType (); base::ModelType model_type = demo::getModelType (); bool is_path = demo::isPath ();std::vector<std::string> model_value = demo::getModelValue (); std::string input_path = demo::getInputPath (); base::CodecFlag codec_flag = demo::getCodecFlag (); std::string ouput_path = demo::getOutputPath (); base::ParallelType pt = demo::getParallelType ();
以下面的代码为例:
1 base::ParallelType pt = demo::getParallelType ();
ParallelType
的定义为 :
1 2 3 4 5 6 enum ParallelType : int { kParallelTypeNone = 0x0001 , kParallelTypeSequential = 0x0001 << 1 , kParallelTypeTask = 0x0001 << 2 , kParallelTypePipeline = 0x0001 << 3 , };
只能从单词的意思上看出,这个参数表示部署的任务是并行还是串行。
计算图 计算图创建 学过数据结构的话,对图都不陌生,用边把节点连接起来。
Edge 定义 之后就是定义图的边:
1 2 3 4 5 dag::Edge input ("detect_in" ) ;dag::Edge output ("detect_out" ) ;
打开 Edge
这个类简单阅读一下,发现它继承自 NonCopyable
:
1 class NNDEPLOY_CC_API Edge : public base::NonCopyable
简而言之这个类被禁用了拷贝构造赋值,移动构造和赋值,我们给他留下一个数据不可被拷贝的印象就可以了。额外的,NNDEPLOY_CC_API
是项目中常见的宏定义,一般生成动态链接库会选择 release
模式。NNDEPLOY_CC_API
则会控制符号表是否对外可见,运行时出错时可以根据出错地址找到对应的符号,也就是哪个函数报错了。
对 Edge
的方法进行大致浏览,可以分为内存和节点位置索引相关:
内存:可以操作 buffer、tensor
和 param
1 2 3 4 5 6 7 8 9 10 11 12 13 14 device::Buffer *create (device::Device *device, const device::BufferDesc &desc, int index) ;base::Status set (device::Buffer &buffer, int index) ;device::Buffer *getBuffer (const Node *node) ;base::Status set (device::Tensor *tensor, int index, bool is_external = true ) ;base::Status set (device::Tensor &tensor, int index) ;device::Tensor *create (device::Device *device, const device::TensorDesc &desc, int index) ;base::Status set (base::Param *param, int index, bool is_external = true ) ;base::Status set (base::Param ¶m, int index) ;base::Param *getParam (const Node *node) ;base::Param *getGraphOutputParam () ;
1 2 3 4 5 6 int getIndex (const Node *node) ;int getGraphOutputIndex () ;int getPosition (const Node *node) ;int getGraphOutputPosition () ;
至于更多内容,需要的时候再看。
Graph 定义 graph
类继承自 Node
类:
1 class NNDEPLOY_CC_API Graph : public Node
初步推测一个 graph
可以视为一个节点,被添加到其他 graph
中。简单浏览 Node
类的方法,发现它可以获取 Edge
:
1 2 std::vector<Edge *> getAllInput () ;std::vector<Edge *> getAllOutput () ;
以及设置和获取一些运行时信息:
1 2 3 4 5 void setDebugFlag (bool flag) ;bool getDebugFlag () ;void setRunningFlag (bool flag) ;bool isRunning () ;
以 setRunningFlag
为例,发现当打开 is_time_profile_
选项后,当 ENABLE_NNDEPLOY_TIME_PROFILER
宏开启时,会通过 NNDEPLOY_TIME_POINT_START
宏定义 去记录 node
的执行时间。额外的,NNDEPLOY_LOGE
日志函数,NNDEPLOY_RETURN_ON_NEQ
返回状态检查也是通过宏定义 和 do-while(0)
的技巧实现的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 void Node::setRunningFlag (bool flag) { is_running_ = flag; if (is_time_profile_) { if (is_running_) { NNDEPLOY_TIME_POINT_START (name_ + " run()" ); } else { NNDEPLOY_TIME_POINT_END (name_ + " run()" ); } } if (is_debug_) { if (is_running_) { NNDEPLOY_LOGE ("%s run start.\n" , name_.c_str ()); } else { NNDEPLOY_LOGE ("%s run end.\n" , name_.c_str ()); } } }
那么简单的推测:Edge
负责资源申请等管理,Node
负责调度资源运行。因为之前完全没接触过部署相关的项目,所以一边看代码一边猜测了。
至于 Graph
这个类,作者的注释很详细了,创建 Node
和 Edge
,具体如何使用,继续往下看。
Graph 注册 1 dag::Graph *graph = new dag::Graph ("demo" , nullptr , &output);
这里就是创建了一个图,并且把之前创建的 Edge
添加了进去。不过迷惑一些的在后面:
1 2 3 4 5 6 7 8 dag::Graph *detect_graph = dag::createGraph (name, inference_type, device_type, &input, &output, model_type, is_path, model_value); if (detect_graph == nullptr ) { NNDEPLOY_LOGE ("detect_graph is nullptr" ); return -1 ; }
createGraph
函数跳转进去,我看了十分钟寻思没看错呀,会直接返回空指针,报错退出。后面发现在 getGlobalGraphCreatorMap
中有两个变量是 static
的,莫非在其他地方这个函数被调用过了?
又浏览了下目标检测相关头文件 yolo.h
,以及这个文件夹下 config.cmake
的写法:
1 2 3 4 5 6 7 8 9 10 11 12 file (GLOB_RECURSE SOURCE "${ROOT_PATH}/demo/detect/*.h" "${ROOT_PATH}/demo/detect/*.cc" ) file (GLOB DEMO_SOURCE "${ROOT_PATH}/demo/*.h" "${ROOT_PATH}/demo/*.cc" ) set (SOURCE ${SOURCE} ${DEMO_SOURCE} )add_executable (${BINARY} ${SOURCE} ${OBJECT} )
发现在 using namespace nndeploy
时,在 yolo.cc
中已经注册过了:
1 2 3 4 5 6 7 8 9 dag::TypeGraphRegister g_register_yolov5_graph (NNDEPLOY_YOLOV5, createYoloV5Graph) ;dag::TypeGraphRegister g_register_yolov6_graph (NNDEPLOY_YOLOV6, createYoloV6Graph) ;dag::TypeGraphRegister g_register_yolov8_graph (NNDEPLOY_YOLOV8, createYoloV8Graph) ;
我也第一次见这种形式的代码,是通过注册全局变量的形式调用图创建函数。简化一下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 #include <iostream> #include <map> std ::map <std ::string , int > map ;class TypeGraphRegister { public: explicit TypeGraphRegister (const std ::string &name, int v) { map [name] = v; } }; namespace A{ TypeGraphRegister a{"a" , 1 }; }; int main () { using namespace A; std ::cerr << map ["a" ] << std ::endl ; return 0 ; }
用全局变量是因为,无法在名称空间中进行变量赋值,也就是下面的代码是错误的:
1 2 3 4 5 6 7 8 9 10 11 12 #include <iostream> namespace A { int a = 1 ; a += 4 ; } int main () { using namespace A; std ::cerr << A::a << std ::endl ; return 0 ; }
创建目标检测图 以 yolov5
为例,会调用 createYoloV5Graph
函数,根据用户指定的 inference_type
推理类型,device_type
设备类型等信息创建目标检测的计算图。
先创建一个图:
1 dag::Graph *graph = new dag::Graph (name, input, output);
之后在输入边 input
边和推理边 infer_input
边直接增加节点 pre
,完成颜色空间转换和 resize
,印象中目标检测模型是要把输入的图像 resize
到固定的尺寸来。
1 2 3 4 5 6 7 8 9 dag::Node *pre = graph->createNode <preprocess::CvtColorResize>( "preprocess" , input, infer_input); preprocess::CvtclorResizeParam *pre_param = dynamic_cast <preprocess::CvtclorResizeParam *>(pre->getParam ()); pre_param->src_pixel_type_ = base::kPixelTypeBGR; pre_param->dst_pixel_type_ = base::kPixelTypeRGB; pre_param->interp_type_ = base::kInterpTypeLinear; pre_param->h_ = 640 ; pre_param->w_ = 640 ;
推理输入边 infer_input
和推理输出边 infer_output
之间增加推理节点 infer
完成模型推理。同理,推理结束后增加 post
节点,完成目标检测中的 nms
抑制 ,置信度筛选等:
1 2 3 4 dag::Node *infer = graph->createInfer <infer::Infer>( "infer" , inference_type, infer_input, infer_output); dag::Node *post = graph->createNode <YoloPostProcess>("postprocess" , infer_output, output);
对于 createNode<YoloPostProcess>
形式的调用,看一下 createNode
方法:
1 2 3 4 template <typename T, typename ... Args, typename std::enable_if<std::is_base_of<Node, T>{}, int >::type> Node *Graph::createNode (const std::string &name, Edge *input, Edge *output, Args &...args)
模板那里的写法是 SFINAE
,有兴趣可以看下。
额外的,增加的 pre, infer, post
这些节点都继承自 Node
类,并实现了 run
方法。启动计算图时,通过 Node
基类去调用 node
的 run
方法,这样就可以执行计算图中的所有 node
节点。
目标检测图中的推理引擎 创建推理节点:
1 2 dag::Node *infer = graph->createInfer <model::Infer>( "infer" , inference_type, infer_input, infer_output);
首先是构造 Infer
这个类:
1 2 3 Infer (const std::string &name, base::InferenceType type, std::initializer_list<dag::Edge *> inputs, std::initializer_list<dag::Edge *> outputs);
在这个构造函数中,调用 Node
构造函数传入输入输出边外,还有创建推理引擎:inference::createInference(type);
。推理引擎的创建和之前的全局注册一样,以 tensorrt
为例,会创建一个全局变量:
1 2 TypeInferenceRegister<TypeInferenceCreator<TensorRtInference>> g_tensorrt_inference_register (base::kInferenceTypeTensorRt);
在创建 TensorRtInference
的时候,会调用 Inference
的构造函数创建参数:
1 2 3 4 Inference::Inference (base::InferenceType type) { type_ = type; inference_param_ = createInferenceParam (type); }
参数创建对应的代码是:
1 2 3 4 5 6 TensorRtInferenceParam::TensorRtInferenceParam () : InferenceParam () { model_type_ = base::kModelTypeOnnx; device_type_.code_ = base::kDeviceTypeCodeCuda; device_type_.device_id_ = 0 ; gpu_tune_kernel_ = 1 ; }
可以看到 tensorrt
运行在 cuda
的单卡上面。
之后将检测模型视为 node
添加到 graph
中:graph->addNode(detect_graph);
。
后面是创建解码节点和编码节点,createDecodeNode
和 createGraph
在实现逻辑上是类似的,大概猜测是对输入的图像或者视频进行编码解码。两者均位于 codec
名称空间下,中间还有一个 drawbox
节点,会调用 opencv
画出图片中的检测框。
图这部分的流程如下图所示:
wrapper 相关 EdgeWrapper 在调用 createEdge
的时候,将每个 edge
封装成 edge_warpper
,放到当前图的 edge_repository_
里面。这里使用 new
申请 edge_wrapper
,不恰当的释放、程序异常退出没调用析构函数时,会有内存泄漏。
1 2 3 4 5 6 7 8 9 Edge *Graph::createEdge (const std::string &name) { Edge *edge = new Edge (name); EdgeWrapper *edge_wrapper = new EdgeWrapper (); edge_wrapper->is_external_ = false ; edge_wrapper->edge_ = edge; edge_wrapper->name_ = name; edge_repository_.emplace_back (edge_wrapper); return edge; }
而 EdgeWrapper
类的代码如下,producers_
和 consumers_
推测用于管理边的输入节点和输出节点。
1 2 3 4 5 6 7 8 class NNDEPLOY_CC_API EdgeWrapper { public : bool is_external_; Edge *edge_; std::string name_; std::vector<NodeWrapper *> producers_; std::vector<NodeWrapper *> consumers_; };
NodeWrapper addNode
和 createNode
代码类似,需要有输入边和输出边这两个参数,因此相比 createEdge
麻烦一些,多了下面的内容:
1 2 3 4 5 6 7 8 9 10 11 12 EdgeWrapper *input_wrapper = findEdgeWrapper (edge_repository_, input); if (input_wrapper == nullptr ) { input_wrapper = this ->addEdge (input); } input_wrapper->consumers_.emplace_back (node_wrapper); EdgeWrapper *output_wrapper = findEdgeWrapper (edge_repository_, output); if (output_wrapper == nullptr ) { output_wrapper = this ->addEdge (output); } output_wrapper->producers_.emplace_back (node_wrapper); node_repository_.emplace_back (node_wrapper);
首先调用 findEdgeWrapper
找到输入边的 wrapper
,如果边不在 graph
就添加进来。输入边的 consumers_
需要添加这个 node
;同理,对于输出边的 produces_
也需要添加这个 node
。不过需要注意的是,允许有多条边的 consumers_
是同一个节点,允许一个节点是多条边的 produces_
。
NodeWrapper
代码如下:
1 2 3 4 5 6 7 8 9 class NNDEPLOY_CC_API NodeWrapper { public : bool is_external_; Node *node_; std::string name_; std::vector<NodeWrapper *> predecessors_; std::vector<NodeWrapper *> successors_; base::NodeColorType color_ = base::kNodeColorWhite; };
推测其中的 predecessors_
和 successors_
对应 EdgeWrapper
的 consumers_
和 produces_
。
在看 createNode
代码的时候发现了未知代码 std::initializer_list
,学习 了一下,粗浅理解为轻量的迭代同类型对象的类模板。
1 2 3 Node *Graph::createNode (const std::string &name, std::initializer_list<Edge *> inputs, std::initializer_list<Edge *> outputs, Args &...args)
计算图初始化 之后就是计算图的初始化、执行和释放。之前的代码难度还 OK
,到了这里感觉代码难度飞升。调用 status = graph->init();
时完成计算图的初始化,看一下初始化了哪些内容。
首先是 this->construct();
函数检查 graph
的 node, edge
是否为空,并检查 edge_wrapper
的生产者和消费者是否为空。如果这些都是空的话,说明创建的计算图有问题。
Node 处理 而后是 Node
节点的处理,首先为 Node
设置基础的信息:运行方式,是否计时等。
1 2 3 4 node->setDebugFlag (is_debug_); node->setTimeProfileFlag (is_time_profile_); node->setParallelType (parallel_type_); node->setInnerFlag (true );
而后来看下面的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 std::vector<Edge *> inputs = node->getAllInput (); for (auto input : inputs) { EdgeWrapper *input_wrapper = findEdgeWrapper (edge_repository_, input); NNDEPLOY_CHECK_PARAM_NULL_RET_STATUS (input_wrapper, "input_wrapper is null!" ); for (auto producer : input_wrapper->producers_) { insertUnique (node_wrapper->predecessors_, producer); } } std::vector<Edge *> outputs = node->getAllOutput (); for (auto output : outputs) { EdgeWrapper *output_wrapper = findEdgeWrapper (edge_repository_, output); NNDEPLOY_CHECK_PARAM_NULL_RET_STATUS (output_wrapper, "output_wrapper is null!" ); for (auto consumer : output_wrapper->consumers_) { insertUnique (node_wrapper->successors_, consumer); } }
首先调用 getAllInput
方法获取 Node
节点的输入,以 Infer
节点为例,对于 graph->createInfer<model::Infer>
这个 Infer
节点,调用 getAllInput
会调用 Node
类的方法得到输入 inputs_
:
1 std::vector<Edge *> Node::getAllInput () { return inputs_; }
而 inputs_
是在创建子类时由子类的构造函数的参数决定的,看下 infer
类的构造函数:
1 2 3 4 Infer::Infer (const std::string &name, base::InferenceType type, std::initializer_list<dag::Edge *> inputs, std::initializer_list<dag::Edge *> outputs) : dag::Node (name, inputs, outputs)
调用了父类 dag::Node
的构造函数:
1 2 3 4 5 6 7 8 Node::Node (const std::string &name, std::vector<Edge *> inputs, std::vector<Edge *> outputs) : name_ (name) { device_type_ = device::getDefaultHostDeviceType (); inputs_ = inputs; outputs_ = outputs; constructed_ = true ; }
也就是在父类的构造函数中,指定了 inputs_
是输入边。获取节点的输入边后,获取边的 producers_
。也就是在当前节点的 predecessors
中添加指向当前节点的节点。之后的处理同理,在当前节点的 successors
中添加当前节点指向的节点。说起来有点乱,看图吧:
对于黄色节点而言,蓝色节点是 predecessors
,绿色节点是 successors
。
Edge 处理 处理节点之后开始处理边:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 for (auto edge_wrapper : edge_repository_) { std::vector<Node *> producers; for (auto producer : edge_wrapper->producers_) { producers.emplace_back (producer->node_); } std::vector<Node *> consumers; for (auto consumer : edge_wrapper->consumers_) { consumers.emplace_back (consumer->node_); } base::Status status = edge_wrapper->edge_->setParallelType (parallel_type); NNDEPLOY_RETURN_ON_NEQ (status, base::kStatusCodeOk, "setParallelType failed!" ); status = edge_wrapper->edge_->increaseProducers (producers); NNDEPLOY_RETURN_ON_NEQ (status, base::kStatusCodeOk, "increaseProducers failed!" ); status = edge_wrapper->edge_->increaseConsumers (consumers); NNDEPLOY_RETURN_ON_NEQ (status, base::kStatusCodeOk, "increaseConsumers failed!" ); status = edge_wrapper->edge_->construct (); NNDEPLOY_RETURN_ON_NEQ (status, base::kStatusCodeOk, "construct edge failed!" ); }
这里有个疑问,std::vector<Node *> consumers
使用了指针,那么如果 abstract_edge
修改了 Node
的内容,edge_wrapper
跟踪的 Node
的内容也会被修改。会不会有影响?
在 setParallelType
时,创建了 abstract_edge
。这里感觉不太合适,函数的用途是创建 abstract_edge
,而函数名确实设置并行方式。我还以为和 node_wrapper
的处理方式一样只是设置并行方式,找了半天才找到 abstract_edge
的创建藏在 setParallelType
方法中。而后由 abstract_edge
管理边的生产者和消费者,并调用 abstract_edge
的 construct
方法。
来看一下 abstract_edge
,这个边的创建形式和前面讲过的 createYoloV5Graph
一样,由 TypeEdgeRegister
注册,支持 FixedEdge
(串行、任务并行)和 PipelineEdge
(流水并行)。
在 PipelineEdge
的 construct
方法中,会将消费者添加到数据包中,用于任务并行,当数据一到位,立马执行。来看一下这个方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 std::list<PipelineDataPacket *> data_packets_; std::map<Node *, int > to_consume_index_; std::map<Node *, PipelineDataPacket *> consuming_dp_; base::Status PipelineEdge::construct () { consumers_size_ = consumers_.size (); for (auto iter : consumers_) { if (to_consume_index_.find (iter) == to_consume_index_.end ()) { to_consume_index_.insert ({iter, 0 }); } if (consuming_dp_.find (iter) == consuming_dp_.end ()) { consuming_dp_.insert ({iter, nullptr }); } } return base::kStatusCodeOk; }
执行器初始化 之后就是调用 status = this->executor();
,根据用户传入的执行类型创建执行器 executor_
,选择串行执行还是并行执行,并行执行又分为任务并行和流水线并行。这三个概念可以看项目的 README
,里面有详细的解释:
串行:按照模型部署的有向无环图的拓扑排序,依次执行每个节点。
流水线并行:在处理多帧的场景下,基于有向无环图的模型部署方式,可将前处理 Node
、推理 Node
、后处理 Node
绑定三个不同的线程,每个线程又可绑定不同的硬件设备下,从而三个 Node
可流水线并行处理。在多模型以及多硬件设备的的复杂场景下,更加可以发挥流水线并行的优势,从而可显著提高整体吞吐量。
任务并行:在多模型以及多硬件设备的的复杂场景下,基于有向无环图的模型部署方式,可充分挖掘模型部署中的并行性,缩短单次算法全流程运行耗时
之后对执行器 executor_
进行初始化:
1 status = executor_->init (edge_repository_, node_repository_);
接下来仔细看看这 3 个执行器吧。到目前为止,由 graph->init()
引发的代码还没看完。
SequentialExecutor 初始化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 base::Status SequentialExecutor::init ( std::vector<EdgeWrapper *> &edge_repository, std::vector<NodeWrapper *> &node_repository) { base::Status status = topoSortDFS (node_repository, topo_sort_node_); for (auto iter : topo_sort_node_) { iter->node_->setInitializedFlag (false ); status = iter->node_->init (); if (status != base::kStatusCodeOk) { NNDEPLOY_LOGE ("Node %s init failed\n" , iter->node_->getName ().c_str ()); return status; } iter->node_->setInitializedFlag (true ); } edge_repository_ = edge_repository; return status; }
调用 topoSortDFS
函数进行了拓扑排序,而后对所有拓扑后的 Node
进行初始化 status = iter->node_->init();
。比如 Node
是 Infer
节点,就调用 Infer
节点的初始化,完成推理引擎的初始化。如果推理引擎是 tensorrt
,就会调用 base::Status TensorRtInference::init()
。
仔细看下 topoSortDFS
函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 base::Status topoSortDFS (std::vector<NodeWrapper *> &node_repository, std::vector<NodeWrapper *> &topo_sort_node) { base::Status status = base::kStatusCodeOk; std::vector<NodeWrapper *> start_nodes = findStartNodes (node_repository); if (start_nodes.empty ()) { NNDEPLOY_LOGE ("No start node found in graph" ); return base::kStatusCodeErrorInvalidValue; } std::stack<NodeWrapper *> dst; for (auto node_wrapper : start_nodes) { if (node_wrapper->color_ == base::kNodeColorWhite) { status = TopoSortDFSRecursive (node_wrapper, dst); } else if (node_wrapper->color_ == base::kNodeColorGray) { NNDEPLOY_LOGE ("Cycle detected in graph" ); status = base::kStatusCodeErrorInvalidValue; } else { continue ; } } while (!dst.empty ()) { topo_sort_node.emplace_back (dst.top ()); dst.pop (); } checkUnuseNode (node_repository); return base::kStatusCodeOk; }
看到了熟悉的数据结构和 leetcode
的味道,首先寻找根节点,没有 predecessors_
的就是根节点。其中的 TopoSortDFSRecursive
递归方法是图染色算法 ,也是经典数据结构和 leetcode
题,将白色的节点染成黑色,如果染色器件重复对灰色点染色,就说明计算图存在环路,报错退出。将拓扑排序后的节点放到 topo_sort_node
中。
ParallelTaskExecutor 初始化 和 kParallelTypeSequential
相比,DFS
算法换成了 BFS
算法,这是因为 DFS
和 BFS
算法得到的拓扑排序不同,后者适用于并行的情况,也就是由两个节点可以并行执行。将节点全部置回了白色,因为后面 run
的时候用来判断节点是否执行过,如果执行过,设置为黑色。
而且多了线程池的初始化,至于线程池,这个东西感觉没啥好讲的。网上很多线程池的代码,如果有兴趣,看看条件变量、互斥锁的用法,最多一天差不多能看完,我之前写过 C 版本的线程池,所以这里不在展开讲了。把他理解为一个任务执行器,可以同时执行很多任务并返回就可以了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 base::Status ParallelTaskExecutor::init ( std::vector<EdgeWrapper*>& edge_repository, std::vector<NodeWrapper*>& node_repository) { thread_pool_ = new thread_pool::ThreadPool (); thread_pool_->init (); start_nodes_ = findStartNodes (node_repository); base::Status status = topoSortBFS (node_repository, topo_sort_node_); all_task_count_ = topo_sort_node_.size (); if (start_nodes_.empty ()) { NNDEPLOY_LOGE ("No start node found in graph" ); return base::kStatusCodeErrorInvalidValue; } for (auto iter : topo_sort_node_) { iter->color_ = base::kNodeColorWhite; iter->node_->setInitializedFlag (false ); status = iter->node_->init (); NNDEPLOY_RETURN_ON_NEQ (status, base::kStatusCodeOk, "node init failure" ); iter->node_->setInitializedFlag (true ); } edge_repository_ = edge_repository; return status; }
ParallelPipelineExecutor 初始化 除了初始化线程池外,还执行了 this->commitThreadPool();
,直接提交任务开始执行。和串行、任务并行执行器最大的不同是:这个执行器在 init()
里运行,并没有 run
方法,所以准备放到后面执行器执行的时候在看了。
commitThreadPool
方法里最重要的是 updataInput
,会调用绝对边的 update
方法,由于流水线并行的执行器只能用 PipelineEdge
,看一下这个类的 update
方法:
推理引擎初始化 推理引擎也是一种 Node
,会在执行器初始化 Node
的时候初始化推理引擎。不过这个 Node
的初始化相比之下比较重要,所以重点看一下。
1 2 3 4 5 6 7 8 9 10 11 base::Status Infer::init () { base::Status status = base::kStatusCodeOk; status = inference_->init (); NNDEPLOY_RETURN_ON_NEQ (status, base::kStatusCodeOk, "abstract_inference init failed" ); is_input_dynamic_ = inference_->isInputDynamic (); is_output_dynamic_ = inference_->isOutputDynamic (); can_op_input_ = inference_->canOpInput (); can_op_output_ = inference_->canOpOutput (); return status; }
首先是调用 inference_
的初始化,也就是调用 TensorRtInference::init()
方法。前面这一坨代码仿佛在初始化模型(我没用过任何推理引擎,智能猜代码啥意思了):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 TensorRtInferenceParam *tensorrt_inference_param = dynamic_cast <TensorRtInferenceParam *>(inference_param_); if (tensorrt_inference_param->is_path_) { model_buffer = base::openFile (tensorrt_inference_param->model_value_[0 ]); } else { model_buffer = tensorrt_inference_param->model_value_[0 ]; } if (tensorrt_inference_param->model_type_ == base::kModelTypeOnnx) { status = initWithOnnxModel (model_buffer, tensorrt_inference_param); NNDEPLOY_RETURN_ON_NEQ (status, base::kStatusCodeOk, "initWithOnnxModel failed" ); } else if (tensorrt_inference_param->model_type_ == base::kModelTypeTensorRt) { status = initWithTensorRtModel (model_buffer, tensorrt_inference_param); NNDEPLOY_RETURN_ON_NEQ (status, base::kStatusCodeOk, "initWithTensorRtModel failed" ); } else { NNDEPLOY_LOGE ("not support this model type(%d)!\n" , tensorrt_inference_param->model_type_); return base::kStatusCodeErrorInferenceTensorRt; }
之后是通过绑定,来获取模型输入、输出、中间缓存的绑定数,来准确的分配内存。这里可以通过名字找索引,也可以通过索引找名字。
1 2 3 4 5 for (auto i = 0 ; i < getNbBindings (); ++i) { std::string name = std::string (getBindingName (i)); io_name_index_[name] = i; io_index_name_[i] = name; }
获取模型输入的名字和 shape
:
1 2 3 4 5 6 7 for (auto i = 0 ; i < getNbBindings (); ++i) { if (bindingIsInput (i)) { std::string name = std::string (getBindingName (i)); auto shape = TensorRtConvert::convertToShape (getBindingDimensions (i)); current_shape.insert ({name, shape}); } }
下面的代码我觉得 max_shape_
为空,因为没看到在哪创建的,所以不会进入循环:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 for (auto iter : tensorrt_inference_param->max_shape_) { auto tmp = current_shape.find (iter.first); if (tmp != current_shape.end ()) { auto &shape = current_shape[iter.first]; if (base::shapeEqual (iter.second, shape)) { continue ; } else { int idx = io_name_index_[iter.first]; nvinfer1::Dims dims = TensorRtConvert::convertFromShape (iter.second); setBindingDimensions (idx, dims); } } else { NNDEPLOY_LOGE ("reshape failed, not found input tensor(%s)!\n" , iter.first.c_str ()); return base::kStatusCodeErrorInferenceTensorRt; } }
之后是获取设备:
1 device::Device *device = device::getDevice (inference_param_->device_type_);
获取设备的时候会注册一个 CudaArchitecture
,也就是一个管理 cuda
设备的类。后续的代码是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 for (auto i = 0 ; i < num_binds; ++i) { std::string name = std::string (getBindingName (i)); base::IntVector shape = TensorRtConvert::convertToShape (getBindingDimensions (i)); base::DataType data_type = TensorRtConvert::convertToDataType (getBindingDataType (i)); base::DataFormat data_format = TensorRtConvert::convertToDataFormat (getBindingFormat (i)); if (bindingIsInput (i)) { device::TensorDesc desc; desc.data_type_ = data_type; desc.data_format_ = data_format; desc.shape_ = shape; device::Tensor *max_input_tensor = new device::Tensor (device, desc, name); max_input_tensors_.insert ({name, max_input_tensor}); device::Buffer *max_input_buffer = max_input_tensor->getBuffer (); device::Tensor *current_input_tensor = new device::Tensor (desc, max_input_buffer, name); input_tensors_.insert ({name, current_input_tensor}); } else { device::TensorDesc desc; desc.data_type_ = data_type; desc.data_format_ = data_format; desc.shape_ = shape; device::Tensor *max_output_tensor = new device::Tensor (device, desc, name); max_output_tensors_.insert ({name, max_output_tensor}); device::Buffer *max_output_buffer = max_output_tensor->getBuffer (); device::Tensor *current_output_tensor = new device::Tensor (desc, max_output_buffer, name); output_tensors_.insert ({name, current_output_tensor}); } }
获取绑定的 shape
,类型和格式
如果是输入,创建对应的 tensor
,存入 input_tensors_
和 max_input_tensors_
,为什么存两次存疑
如果不是输入,就存到 max_output_tensors_
和 output_tensors_
额外的,TensorDesc
用于描述内存中的数据,而 tensor
用 buffer_
管理申请的 Buffer
,具体看一下内存的申请,new device::Tensor(device, desc, name)
会调用:
1 2 3 4 5 6 7 8 Tensor::Tensor (Device *device, const TensorDesc &desc, const std::string &name, const base::IntVector &config) : name_ (name), desc_ (desc), is_external_ (false ) { BufferDesc buffer_desc = device->toBufferDesc (desc, config); void *ptr = device->allocate (buffer_desc); buffer_ = new Buffer (device, buffer_desc, ptr, base::kMemoryTypeAllocate); ref_count_ = new int (1 ); }
device->toBufferDesc(desc, config)
用于获取内存的大小,device->allocate(buffer_desc);
会根据大小调用 cudaMalloc
申请内存:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 void *CudaDevice::allocate (const BufferDesc &desc) { void *data = nullptr ; cudaError_t status = cudaMalloc (&data, desc.size_[0 ]); if (cudaSuccess != status) { NNDEPLOY_LOGE ("cuda alloc failed with size %lu for %p, status:%d\n" , desc.size_[0 ], data, status); return nullptr ; } if (data == nullptr ) { NNDEPLOY_LOGE ("cuda alloc got nullptr\n" ); return nullptr ; } return data; }
最后调用 new Buffer
使用 Buffer
这个类管理申请到的内存。
总结:获取输入输出的名字、尺寸、数据类型,并创建对应的内存 buffer
。
计算图执行 对应代码中的 graph->run()
,具体也就是调用执行器的 run
方法:status = executor_->run();
。
kParallelTypeSequential 执行 暗自庆幸一下这是最简单的一个:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 base::Status SequentialExecutor::run () { base::Status status = base::kStatusCodeOk; for (auto iter : topo_sort_node_) { base::EdgeUpdateFlag edge_update_flag = iter->node_->updataInput (); if (edge_update_flag == base::kEdgeUpdateFlagComplete) { iter->node_->setRunningFlag (true ); status = iter->node_->run (); NNDEPLOY_RETURN_ON_NEQ (status, base::kStatusCodeOk, "node execute failed!\n" ); iter->node_->setRunningFlag (false ); } else if (edge_update_flag == base::kEdgeUpdateFlagTerminate) { ; } else { NNDEPLOY_LOGE ("Failed to node[%s] updataInput();\n" , iter->node_->getName ().c_str ()); return base::kStatusCodeErrorDag; } } return status; }
iter->node_->updataInput();
会调用 abstact_edge_->update(node);
,由于此时计算图还没有执行完毕,FixedEdge
会将节点设置为 kEdgeUpdateFlagComplete
。之后就是调用 node
的 run
方法。对于 infer
节点,如果推理引擎是 tensorrt
,就会调用 TensorRtInference::run()
方法。
实例说明,目标检测的节点运行与数据 解码节点运行 以目标检测的计算图为例,一共有 3 个节点:CvtColorResize
,Infer
和 YoloPostProcess
。还记得在 CvtColorResize
节点前面还有一个解码节点吗?以单张图像的目标检测为例,会读取图像并创建 mat
,并将 mat
放到输出边中,完成数据的传递:
1 2 3 4 5 6 7 8 base::Status OpenCvImageDecodeNode::run () { cv::Mat *mat = new cv::Mat (cv::imread (path_)); width_ = mat->cols; height_ = mat->rows; outputs_[0 ]->set (mat, index_, false ); index_++; return base::kStatusCodeOk; }
不知道为啥一定要用 outputs_[0]
,如果有多天输出边呢?其中的 set
方法对应:
1 2 3 4 5 6 7 8 9 10 11 12 base::Status DataPacket::set (void *anything, int index, bool is_external) { base::Status status = base::kStatusCodeOk; if (anything != anything_) { destory (); } is_external_ = is_external; index_ = index; flag_ = kFlagVoid; written_ = true ; anything_ = anything; return status; }
CvtColorResize 运行 由于 CvtColorResize
的输入是解码节点的输出,所以可以直接在 CvtColorResize
的 run
方法中拿到解码节点的输出:
1 cv::Mat *src = inputs_[0 ]->getCvMat (this );
而后获取 host
端的设备:
1 device::Device *device = device::getDefaultHostDevice ();
根据输入的参数创建描述 tensor
的描述符 desc
:
1 2 3 4 5 6 7 8 9 10 device::TensorDesc desc; desc.data_type_ = tmp_param->data_type_; desc.data_format_ = tmp_param->data_format_; if (desc.data_format_ == base::kDataFormatNCHW) { desc.shape_ = {1 , getChannelByPixelType (tmp_param->dst_pixel_type_), tmp_param->h_, tmp_param->w_}; } else { desc.shape_ = {1 , tmp_param->h_, tmp_param->w_, getChannelByPixelType (tmp_param->dst_pixel_type_)}; }
根据内存信息创建 dst
,也就是这条边的输出:
1 2 device::Tensor *dst = outputs_[0 ]->create (device, desc, inputs_[0 ]->getIndex (this ));
其中 getIndex
是获取当前节点的索引,由于解码节点在添加数据后进行了 index++
,所以这里拿到的 index
实际为 1。至于其中的 create
方法就是创建这个节点的 tensor
输出,来看一下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 device::Tensor *DataPacket::create (device::Device *device, const device::TensorDesc &desc, int index, const std::string &name) { base::Status status = base::kStatusCodeOk; device::Tensor *tensor = nullptr ; if (anything_ == nullptr ) { tensor = new device::Tensor (device, desc, name); } else { if (flag_ != kFlagTensor) { destory (); tensor = new device::Tensor (device, desc, name); } else { tensor = (device::Tensor *)(anything_); if (tensor->getDesc () != desc) { destory (); tensor = new device::Tensor (device, desc, name); } } } is_external_ = false ; index_ = index; flag_ = kFlagTensor; written_ = false ; anything_ = (void *)(tensor); return tensor; }
anything
指向了实际的数据。而后对输入进行颜色空间转换和 resize
操作,这个好像是数字图像处理的部分,比如将 BGR
的图转换为 RGB
的图,并 resize
到固定尺寸。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 cv::Mat tmp_cvt; if (tmp_param->src_pixel_type_ != tmp_param->dst_pixel_type_) { base::CvtColorType cvt_type = base::calCvtColorType ( tmp_param->src_pixel_type_, tmp_param->dst_pixel_type_); if (cvt_type == base::kCvtColorTypeNotSupport) { NNDEPLOY_LOGE ("cvtColor type not support" ); return base::kStatusCodeErrorNotSupport; } int cv_cvt_type = OpenCvConvert::convertFromCvtColorType (cvt_type); cv::cvtColor (*src, tmp_cvt, cv_cvt_type); } else { tmp_cvt = *src; } cv::Mat tmp_resize; if (tmp_param->interp_type_ != base::kInterpTypeNotSupport) { int interp_type = OpenCvConvert::convertFromInterpType (tmp_param->interp_type_); cv::resize (tmp_cvt, tmp_resize, cv::Size (w, h), 0.0 , 0.0 , interp_type); } else { tmp_resize = tmp_cvt; } OpenCvConvert::convertToTensor (tmp_resize, dst, tmp_param->normalize_, tmp_param->scale_, tmp_param->mean_, tmp_param->std_);
然后是 outputs_[0]->notifyWritten(dst);
,通知 dst
数据准备好了。
infer 节点运行 获取所有输入的 tensor
和 index
:
1 2 3 4 5 6 7 8 9 10 11 12 13 for (auto input : inputs_) { device::Tensor *tensor = input->getTensor (this ); tensors.emplace_back (tensor); int index = input->getIndex (this ); indexs.emplace_back (index); } int index = indexs[0 ];for (int i = 1 ; i < indexs.size (); i++) { if (index != indexs[i]) { NNDEPLOY_LOGE ("index not equal" ); return base::kStatusCodeErrorInvalidValue; } }
getTensor
对应的就是获取 data_packet
的 anything
:
1 2 3 4 5 6 7 device::Tensor *DataPacket::getTensor () { if (flag_ != kFlagTensor) { return nullptr ; } else { return (device::Tensor *)(anything_); } }
获取 index
也是数据包的 index
,如果输入边的 index
不同,说明这个节点收到了错误的输入,需要报错退出。而后是为推理引擎设置输入:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 for (auto tensor : tensors) { inference_->setInputTensor (tensor->getName (), tensor); } base::Status Inference::setInputTensor (const std::string &name, device::Tensor *input_tensor) { base::Status status = base::kStatusCodeOk; std::string new_name = "" ; if (!name.empty ()) { new_name = name; } else if (!input_tensor->getName ().empty ()) { new_name = input_tensor->getName (); } else { new_name = getInputName (0 ); } if (input_tensors_.count (new_name) > 0 ) { if (input_tensor != input_tensors_[new_name]) { external_input_tensors_[new_name] = input_tensor; } } else { NNDEPLOY_LOGI ("input_tensor name: %s not exist!\n" , new_name.c_str ()); } return status; }
可以看到如果这个 tensor
没有在最初的 input_tensors_
中(初始化指定)时,视为外部的输入。在数据准备好后,推理引擎开始执行:
1 status = inference_->run ();
最后就是将推理引擎的输出设置到边上:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 for (auto output : outputs_) { std::string name = output->getName (); base::ParallelType parallel_type = output->getParallelType (); bool flag = parallel_type == base::kParallelTypePipeline; device::Tensor *tensor = inference_->getOutputTensorAfterRun (name, device_type_, flag); if (tensor == nullptr ) { NNDEPLOY_LOGE ("can't getOutputTensorAfterRun[%s].\n" , name.c_str ()); status = base::kStatusCodeErrorInvalidParam; break ; } output->set (tensor, index, false ); }
post 节点运行 YoloPostProcess
这个节点的 run
方法比较简单,就是对输出的 tensor
进行 nms
和阈值筛选处理,不在过多解释。
ParallelTaskExecutor 执行 以所有的根节点 start_nodes_
为起点,并行的形式执行这个计算图。
1 2 3 4 for (auto iter : start_nodes_) { process (iter); } wait ();
这个 wait
方法是以条件变量的形式等待完成任务的节点数大于等于总任务数,所以等待完成任务的节点数是原子类型 的模板类:std::atomic<int>
。
至于 process(iter)
方法,除了执行节点外,还有 afterNodeRun(node_wrapper)
方法,重点来看一下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 completed_task_count_++; node_wrapper->color_ = base::kNodeColorBlack; for (auto successor : node_wrapper->successors_) { bool all_pre_done = true ; for (auto iter : successor->predecessors_) { all_pre_done &= (iter->color_ == base::kNodeColorBlack); } if (all_pre_done && successor->color_ == base::kNodeColorWhite) { if (successor->predecessors_.size () <= 1 ) { process (successor); } else { submitTaskSynchronized (successor); } } }
首先是已完成的任务数 +1,而后将已完成节点的颜色设置为黑色。遍历这个节点的后继节点 successors_
,对于后继节点而言,如果全部的前任节点都执行完毕,那么 all_pre_done
会为 true
。此时进行判断:
如果这个后继节点只有一个前任节点,那么调用 process
处理这个后继节点
如果这个后继节点有多个前任节点,那么已加锁的形式调用 process
处理这个后继节点,防止多线程环境下 process
函数操作 node_wrapper
这个临界资源。
最后提交任务后,以
1 2 3 4 5 std::lock_guard<std::mutex> lock (main_lock_) ;if (completed_task_count_ >= all_task_count_) { cv_.notify_one (); }
的形式唤醒等待的主线程,也就是 cv_.notify_one()
那里。最终将节点的颜色改为白色。额外的一点点小知识 :
使用条件变量时,在检查条件之前加锁,并在等待之前释放锁。因为唤醒的线程需要重新检查条件是否成立(因为可能会发生虚假唤醒)。如果不加锁,唤醒的线程可能会在其他线程修改条件之前就继续执行,导致逻辑错误。
虚假唤醒。这是一种能保证执行效率的方法。假设此时有10个线程处于等待中,在收到一个唤醒信号后,操作系统尝试去唤醒所有的线程,这会打破发送信号与唤醒之间一对一的关系。所以此时只能唤醒一个线程,而其余九个线程处于等待阶段。为了更灵活的处理这种情况,所以无论条件是否满足,操作系统允许等待中的线程自己醒来,称为虚假唤醒。
ParallelPipelineExecutor 执行 为了更好的读懂流水线并行的代码,简单写了份模拟的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 #include <iostream> #include <thread> #include <mutex> #include <condition_variable> std::mutex mtx; std::condition_variable cv; bool ready = false ;void run () { std::unique_lock<std::mutex> lck (mtx) ; ready = true ; cv.notify_all (); } void func1 (int id) { std::unique_lock<std::mutex> lock (mtx) ; while (!ready) { cv.wait (lock); } std::cerr << "Node Run : thread id = " << id << "\n" ; } void commit () { std::thread threads[10 ]; for (int i = 0 ; i < 10 ; i++) { threads[i] = std::thread (func1, i); } run (); for (auto & th : threads) { th.join (); } } int main () { commit (); return 0 ; }
一个线程等待『条件变量的条件成立』而挂起;另一个线程使『条件成立』。为了防止竞争,条件的检测是在互斥锁的保护下进行的,线程在改变条件状态前先要锁住互斥量。如果一个条件为假,则一个线程自动阻塞,该线程处于等待状态,并释放相关变量的互斥锁。如果另一个线程改变了条件,它将信号发送给关联的条件变量,唤醒一个或多个处于等待中的线程,使其重新获得互斥锁,重新评价条件。
如果能看懂上面代码的话,再来看流水线并行的代码。和前两个执行器不同的是,ParallelPipelineExecutor
执行器在 init
的时候直接将所有节点提交到线程池开始执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 void ParallelPipelineExecutor::commitThreadPool () { for (auto iter : topo_sort_node_) { auto func = [iter]() -> base::Status { base::Status status = base::kStatusCodeOk; while (true ) { base::EdgeUpdateFlag edge_update_flag = iter->node_->updataInput (); if (edge_update_flag == base::kEdgeUpdateFlagComplete) { iter->node_->setRunningFlag (true ); status = iter->node_->run (); NNDEPLOY_RETURN_ON_NEQ (status, base::kStatusCodeOk, "node execute failed!\n" ); iter->node_->setRunningFlag (false ); } else if (edge_update_flag == base::kEdgeUpdateFlagTerminate) { break ; } else { NNDEPLOY_LOGE ("Failed to node[%s] updataInput();\n" , iter->node_->getName ().c_str ()); status = base::kStatusCodeErrorDag; break ; } } return status; }; thread_pool_->commit (std::bind (func)); } }
上面最重要的是 updataInput
方法,会调用所有输入边的 update
方法:
1 2 3 4 5 6 for (auto input : inputs_) { flag = input->update (this ); if (flag != base::kEdgeUpdateFlagComplete) { break ; } }
由于 decode
节点没有输入边,所以会直接跳过这一环节执行 run
方法,会调用 PipeEdeLine
的 set
方法写入数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 base::Status PipelineEdge::set (void *anything, int index, bool is_external) { std::lock_guard<std::mutex> lock (mutex_) ; PipelineDataPacket *dp = new PipelineDataPacket (consumers_size_); NNDEPLOY_CHECK_PARAM_NULL_RET_STATUS (dp, "PipelineDataPacket is null.\n" ); data_packets_.push_back (dp); cv_.notify_all (); base::Status status = dp->set (anything, index, is_external); NNDEPLOY_RETURN_ON_NEQ (status, base::kStatusCodeOk, "PipelineDataPacket set error.\n" ); return status; }
总感觉这里先放数据,在唤醒比较合适。
解码节点执行完毕后,继续循环执行 updataInput
节点,也就是调用在 PipeEdgeLine
的 update
方法:
1 2 3 4 5 cv_.wait (lock, [this , tmp_node] { return to_consume_index_[tmp_node] < data_packets_.size () || terminate_flag_; });
由于 data_packets_
中插入了数据,所以这个条件会满足继续向下执行。后续的代码我颅内 debug
了很长时间。
大概意思是:节点在一次执行后,将节点对应的索引自增。在第二次执行这个节点时,由于边管理的 data_packet
会有两条数据(前面的节点放进来的),所以这个节点需要找到对应的数据,删除不用的数据。
这个流水线并行实现了下图的效果:
如果是 yolo
对视频进行目标检测,就会有多帧的输入图像。每帧是一个 node
节点,也就是有多个输入节点,一个推理节点,多个输出节点。
第一帧前处理,第一帧推理,第一帧后处理
第二帧前处理,第二帧推理,第二帧后处理
…
第 N 帧前处理,第 N 帧推理,第 N 帧后处理
这样就流水处理了起来。
计算图释放 也就是最后的 graph->deinit()
,还是对应执行器的释放。
SequentialExecutor 释放 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 base::Status SequentialExecutor::deinit () { base::Status status = base::kStatusCodeOk; for (auto iter : edge_repository_) { bool flag = iter->edge_->requestTerminate (); if (!flag) { NNDEPLOY_LOGE ("failed iter->edge_->requestTerminate()!\n" ); return base::kStatusCodeErrorDag; } } for (auto iter : topo_sort_node_) { status = iter->node_->deinit (); NNDEPLOY_RETURN_ON_NEQ (status, base::kStatusCodeOk, "failed iter->node_->deinit()" ); iter->node_->setInitializedFlag (false ); } return status; }
代码很容易看懂,通过 requestTerminate
将边设置为计算完毕,执行所有的节点的释放操作。如果是 tensorrt
的推理节点,将执行推理引擎的释放。
ParallelTaskExecutor 释放 相比 SequentialExecutor
,多了一步释放线程池。
ParallelPipelineExecutor 释放 同 ParallelTaskExecutor
释放。
在最后退出的时候,手动删除了图资源 delete graph
,会调用图的析构函数,删除之前由 new
申请的 node wrapper
和 edge wrapper
。
推理引擎释放 SequentialExecutor
,ParallelTaskExecutor
,ParallelPipelineExecutor
在释放的时候会释放节点,重点看下 infer
这个节点的释放,
1 2 3 4 5 6 base::Status Infer::deinit () { base::Status status = base::kStatusCodeOk; status = inference_->deinit (); NNDEPLOY_RETURN_ON_NEQ (status, base::kStatusCodeOk, "deinit failed" ); return status; }
也就是反初始化推理引擎,注意就是内存释放:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 base::Status TensorRtInference::deinit () { base::Status status = base::kStatusCodeOk; for (auto iter : input_tensors_) { delete iter.second; } input_tensors_.clear (); for (auto iter : max_input_tensors_) { delete iter.second; } max_input_tensors_.clear (); for (auto iter : output_tensors_) { delete iter.second; } output_tensors_.clear (); for (auto iter : max_output_tensors_) { delete iter.second; } max_output_tensors_.clear (); device::Device *device = device::getDevice (inference_param_->device_type_); if (inner_forward_buffer_ != nullptr ) { device->deallocate (inner_forward_buffer_); } return status; }
delete iter.second
会手动删除 tensor
,也就是调用 tensor
类的析构函数,删除数据的 buffer
并清除引用计数。
总结 整个计算图的代码就梳理完了,对于 README
中提到的:上述模式的组合并行,我好像还不知道怎么组合。
目前对推理引擎完全未知,怎么写自己的高性能 op
还没了解到。
我想部署一个大模型!好像还差的很远。
DAG
的组织有点乱,而且有内存泄漏,内存这块并没有很好的管理
CvtColorResize
的 run
为什么两次 notify ?
CvtColorResize
用的是 inputs_[0]
,Infer
需要遍历所有的 inputs
,很迷惑
需要实际编译运行一下看看了。