在有关十二种过滤元素的博客文章中,我只提到了代码的串行版本。但是如何利用并发呢?也许我们可以抛出更多线程和异步任务并更快地完成复制?
例如,我的机器上有6个内核,因此很高兴看到,例如顺序复制速度提高了5倍?
在C ++ 17中,我们有并行算法,因此让我们尝试使用std :: execution :: par调用std :: copy_if。
如果在MSVC库中转到std :: copy_if的实现,则并行版本可以看到以下内容:
// VS 2019 16.8 //目前尚未并行化,预计将来的发行版中并行性将是可行的
免责声明:这些只是我的实验(主要是为了学习一些东西);如果您想在项目中使用它,请进行测量,测量和测量:)
基本而言,C ++ 17的并行算法很容易实现。只需传递一个std :: execution :: par,您就完成了!例如:
该代码将调用一堆线程(可能利用一些现有的线程池),并将在多个线程上批量启动较小的任务。
我们应该记住,这种调用总是比顺序调用产生更多的工作!以及准备,设置批处理,启动线程池,同步的成本-这增加了整个处理的可见成本。
理想情况下,并行运行事物最适合许多对象,也适用于单独执行小任务的情况。一个完美的例子:
C ++ 17并行算法的惊人性能,可能吗? -C ++故事在文章中,我展示了一些使用Fresnel和3D Vector的“真实”用例,并且加速几乎与系统中内核的数量成线性关系。
您会看到一些加速(当您有大量对象时),但是它与核的数量不是线性的。
这是因为sort需要重新整理容器中的内容,并且为了安全地进行操作,该算法必须执行一些同步操作,以便其他线程看到正确的结果。
对于我们的测试(除了简单的调试输出),我将使用以下代码。
const size_t VEC_SIZE = argc> 1个atoi(argv [1]):10; std :: vector< std :: pair< double,double> testVec(VEC_SIZE); std :: ranges :: generate(testVec.begin(),testVec.end(),[]()可变{return std :: pair {GenRandom(-10.0,10.0),GenRandom(-10.0,10.0)};} ); auto test = [](auto& elem){auto sn = sin(elem.first)* cos(elem.second + 10.0);返回sn> 0.0;};
总的来说,我想比elem%2 == 0拥有更多的计算能力。此外,每个元素都是16字节,因此对象也不是很小。
我们必须在所有元素上运行谓词函数-在大多数情况下,它不依赖于其他元素,因此最好在许多线程上执行
但是我们必须将匹配的元素放入新容器中。这是一个可变的步骤,需要线程之间进行一些同步。
首先,最好采用蛮力方法并从中学习:
模板<类型名称T,类型名称Pred>自动FilterCopyIfParNaive(const std :: vector< vec,Pred p){std :: vector< T出去; std :: mutex mut; std :: for_each(std :: execution :: par,begin(vec),end(vec),[& out,& mut,p](auto&& elem){if(p(elem)){ std :: unique_lock lock(mut); out.push_back(elem);}});返回;}
由于std :: for_each和std :: execution :: par,我们并行运行所有步骤,但是当我们要将元素放入输出容器时,我们需要进行同步。
可以注意到,要保护修改容器状态的所有操作。
// 4核心/ 8线索屏蔽VEC尺寸:100000Transform仅SEQ:2.5878 MS,RET:100000Transform仅适用:1.3734 MS,RET:100000FilterCopyIF:5.3675 MS,RET:50203FopyIFParnaive:9.1836 MS,RET:50203
// 6核心/ 12线程屏蔽VEC大小:100000Transform仅SEQ:2.223 MS,RET:100000Transform仅适用:0.5507 MS,RET:100000FilterCopyIF:3.851 MS,RET:50203FILterCopyIFParnaive:10.1295 MS,RET:50203
对于比较,我还包括仅转换SEQ并仅转换仅PAR,这只是一个简单的变换在集合中运行:
STD :: Vector< uint8_t>缓冲区(testvec.size()); rundandmeasure("只转换SEQ" [& testvec,& buffer,& test](){std :: transform(begin(testvec),结束(testvec),开始(缓冲区),测试); return buffer.size();}); rundandmeasure("只转换为" [& testvec,& buffer,& test]( ){std :: transform(std ::执行:: par,begin(testvec),结束(testvec),begin(缓冲区),测试); return buffer.size();});
请注意,缓冲区是在变换Lambda之外创建的,因此我们不支付其初始化的价格。看看它是如何用许多核心划衡的。
在所有输入元素上运行std :: transform以计算谓词函数,将布尔结果存储在临时容器中。
然后我们需要计算匹配元素的最终位置 - 这可以通过调用std :: exlusive_scan来完成
模板< typeName t,typename pred>自动筛选copyifparcompose(const std :: vertent< t>& vec,pred p){std :: vector< uint8_t>缓冲区(vec.size()); STD :: Vector< uint32_t> idx(vec.size()); std :: transform(std ::执行:: par,begin(vec),结束(vec),begin(缓冲区),[& p](const t& elem){return p(elem);}); std :: fellusive_scan(std ::执行:: par,begin(beaffer),结束(缓冲区),begin(idx),0); std :: vector< t> out(idx.back()+ 1); std :: vector< size_t>索引(vec.size()); std :: iota(indexes.begin(),indexes.end(),0); std :: for_each(std ::执行:: par,begine(索引),结束(索引),[& beaffer,& vec;& idx,& out](size_t i){if(buffer [i ])出局[Idx [i]] = VEC [i];});返回;}
输入:0,1,1,0,1,1,0,1,1,1,1,1,0,1,1,1,1,1,1,1,1,2, 2,3,4,4,5,6out:1,2,4,5,7,8,9
所以...是的,它有效,在某些情况下,它将比顺序版本更快。
让我们有一个测试运行。这可以比顺序版本快吗?
// 4核心/ 8个线索博音号VEC大小:100000Transform仅SEQ:2.5878 MS,RET:100000Transform仅适用:1.3734 MS,RET:100000FilterCopyIF:5.3675 MS,RET:50203FopyIFParCAPE:9.1836 MS,RET:3.03 MS,RET:50203FILTERCOPYIFPARCOMPOSEQ :2.3454 MS,RET:50203FilterCopyIFPartransformPush:2.5735 MS,RET:50203
// 6核心/ 12线索屏蔽VEC大小:100000Transform仅SEQ:2.3379 MS,RET:100000Transform仅适用:0.5979 MS,RET:100000FilterCopyIF:3.675 MS,RET:50203毫米,RET:50203毫米,RET:50203FILTERCOPYIFPARCOMPOSEQ :1.0754 MS,RET:50203FilterCopyIFPARTRANSFORMPUSH:2.0103 MS,RET:50203
FilterCopyIfParComposeSeq-是FilterCopyIfParCompose的一个版本,带有一个简单的循环来复制结果:
FilterCopyIfParTransformPush是另一个变体,其中只有std :: transform可以并行运行,然后我们使用常规的push_back。
模板< typename T,typename Pred>自动FilterCopyIfParTransformPush(const std :: vector< T>&vec,Pred p){std :: vector< uint8_t>缓冲区(vec.size()); std :: transform(std :: execution :: par,begin(vec),end(vec),begin(buffer),[& p](const T& elem){return p(elem);}); std :: vector< T>出去; for(size_t i = 0; i< vec.size(); ++ i)if(buffer [i])out.push_back(vec [i]);返回;}
但是我们可以看到该版本比顺序版本快2倍! (4核),而6核则快3倍!因此,这是一种很有前途的方法。
这次,我们将工作分成较小的块,然后分别调用copy_if:
模板< typename T,typename Pred>自动FilterCopyIfParChunks(const std :: vector< T>& vec,Pred p){const auto chunks = std :: thread :: hardware_concurrency(); const auto chunkLen = vec.size()/块; std :: vector< size_t>指数(块); std :: iota(indexes.begin(),index.end(),0); std :: vector< std :: vector< T>复制的块(块); std :: for_each(std :: execution :: par,begin(indexes),end(indexes),[&](size_t i){auto startIt = std :: next(std :: begin(vec),i * chunkLen); auto endIt = std :: next(startIt,chunkLen); std :: copy_if(startIt,endIt,std :: back_inserter(copiedChunks [i]),p);}); std :: vector< T>出去;对于(const auto& part:copyedChunks)out.insert(out.end(),part.begin(),part.end()); if(vec.size()%chunks!= 0){auto startIt = std :: next(std :: begin(vec),chunks * chunkLen); std :: copy_if(startIt,end(vec),std :: back_inserter(out),p); }返回;}
// 4核/ 8线程基准vec大小:100000仅转换seq:2.5878 ms,ret:100000仅转换par:1.3734 ms,ret:100000FilterCopyIf:5.3675 ms,ret:50203FilterCopyIfParNaive:9.1836 ms,ret:50203FilterCopyIfParCompose:3.03 ms,ret:50203eqFilterCopyIfComp :2.3454 ms,ret:50203FilterCopyIfParTransformPush:2.5735 ms,ret:50203FilterCopyIfParChunks:2.9718 ms,ret:50203
// 6核/ 12线程基准vec大小:100000仅转换seq:2.3379 ms,ret:100000仅转换par:0.5979 ms,ret:100000FilterCopyIf:3.675 ms,ret:50203FilterCopyIfParNaive:10.0073 ms,ret:50203FilterCopyIfParCompose:1.2936 ms,ret:50203FilterCopyIfParComp :1.0754 ms,ret:50203FilterCopyIfParTransformPush:2.0103 ms,ret:50203FilterCopyIfParChunks:2.0974 ms,ret:50203
此版本的实现较为简单,但速度不高。不过,它比顺序版本要快。
先前的版本很有前途,但是我们还有另一种方式来划分任务。与其依赖std :: execution :: par,我们可以踢几个std :: future对象,然后等待它们完成。
我在名为“ C ++高性能”的书中发现了类似的想法免责声明:我没有这本书,但其Github Repo似乎已经公开可用:Cpp-High-Performance / copy_if_split_into_two_parts.cpp
模板< typename T,typename Pred>自动FilterCopyIfParChunksFuture(const std :: vector< T>&vec,Pred p){const auto chunks = std :: thread :: hardware_concurrency(); const auto chunkLen = vec.size()/块; std :: vector< std :: future< std :: vector< T>>任务(块); for(size_t i = 0; i< chunks; ++ i){auto startIt = std :: next(std :: begin(vec),i * chunkLen);自动endIt = std :: next(startIt,chunkLen);任务[i] = std :: async(std :: launch :: async,[=,& p] {std :: vector< T> chunkOut; std :: copy_if(startIt,endIt,std :: back_inserter(chunkOut ),p); return chunkOut;}); } std :: vector< T>出去;对于(auto& ft:任务){auto part = ft.get(); out.insert(out.end(),part.begin(),part.end()); } //其余部分:if(vec.size()%chunks!= 0){auto startIt = std :: next(std :: begin(vec),chunks * chunkLen); std :: copy_if(startIt,end(vec),std :: back_inserter(out),p); }返回;}
// 4核/ 8线程基准vec大小:100000仅转换seq:2.5878 ms,ret:100000仅转换par:1.3734 ms,ret:100000FilterCopyIf:5.3675 ms,ret:50203FilterCopyIfParNaive:9.1836 ms,ret:50203FilterCopyIfParCompose:3.03 ms,ret:50203eqFilterCopyIfComp :2.3454 ms,ret:50203FilterCopyIfParTransformPush:2.5735 ms,ret:50203FilterCopyIfParChunks:2.9718 ms,ret:50203FilterCopyIfParChunksFuture:2.5091 ms,ret:50203
// 6核/ 12线程基准vec大小:100000仅转换seq:2.3379 ms,ret:100000仅转换par:0.5979 ms,ret:100000FilterCopyIf:3.675 ms,ret:50203FilterCopyIfParNaive:10.0073 ms,ret:50203FilterCopyIfParCompose:1.2936 ms,ret:50203FilterCopyIfParComp :1.0754 ms,ret:50203FilterCopyIfParTransformPush:2.0103 ms,ret:50203FilterCopyIfParChunks:2.0974 ms,ret:50203FilterCopyIfParChunksFuture:1.9456 ms,ret:50203
// 4 CoreS / 8 ThreadsBenchmark Vec大小:1000000Transform仅SEQ:24.7069 MS,RET:1000000Transform仅适用:5.9799 MS,RET:1000000FilterCopyIF:45.2647 MS,RET:499950FilterCopyIFParnaive:499950FilterCopyIFParCompose:17.1237 MS,RET:499950FilterCopyIFPARCOMPOSEQ :16.7736 MS,RET:499950FILTERCOPYIFPARTRANSFORMPUSH:21.2285 MS,RET:499950FILTERCOPYIFPARCHUSKS:22.1941 MS,RET:499950FILTERCOPYIFPARCHUNKSFUTURE:22.4486 MS,RET:499950
// 6核心/ 12线索屏蔽VEC尺寸:1000000Transform仅限SEQ:24.7731 MS,RET:1000000Transform仅适用:2.8692 MS,RET:1000000FilterCopyIF:35.6397 MS,RET:499950 //基础线FilterCopyIFParnaive:499950FilterCopyIFParCompose:99953 MS ,RET:499950FilterCopyIFPARCOMPOSEQ:9.9909 MS,RET:499950FILTERCOPYIFPARTRANSFORMPUSH:13.9003 MS,RET:499950FILTERCOPYIFPARCHUNKS:499950FILTERCOPYIFPARCHUNKSFUTURE:12.6284 MS,RET:499950
正如您所看到的,我们可以更快地使代码更快,但仍然,您需要大量的元素来处理(我猜50K ...... 100k至少),以及您必须支付额外设置甚至内存的价格。
像往常一样,这取决于您的环境和要求。但是,如果您使用多个线程,它可能是明智的,依赖顺序拷贝_IF并保持其他线程忙。例如,您可以启动一些小的“副本”线程,在等待副本完成时同时执行其他工作。这里有各种各样的场景和方法。
本文旨在不创建最好的并行算法,而是实验并学习一些东西。我希望这里提出的想法为您提供一些提示。
我完全依赖于基于标准库的实心多线程框架。尽管如此,使用英特尔TBB或其他高度复杂的计算框架时,还有更多选项。
如您所见,我敦促动态创建输出矢量。这会产生一些额外的开销,所以在解决方案中,您可能会限制为此。为什么不分配与输入容器相同的大小?也许可以在解决方案中工作?也许我们可以在以后推载载体?或者可能利用一些智能分配者?
我还应该提到,在最近的超负荷杂志ACCU中有一篇文章:危害C ++并行算法的盲目的案例,所以你也可以看看它。 文章列出了您可能希望在跳跃和放置std :: execution :: dellower之前考虑的五个问题。 我展示了4种不同的技术,但也许你有更多的东西? 你会建议什么? 我在MSVC上测试了我的代码,但在GCC中,通过Intel TBB获得并行算法。 你试过他们吗? 很高兴看到该编译器/库的结果。 如果您' ve' vere' vere'对现代c ++感兴趣! 了解最近C ++标准的所有主要功能! 检查一下: