在现代的编程中,利用并行计算提升性能已经成为了一个重要的课题。尤其是在C#中,PLINQ(并行 LINQ(PLINQ)是 Language-Integrated 查询(LINQ) 模式的并行实现)提供了简单而强大的工具,让我们可以轻松实现数据的并行处理。今天,我们就一起探索如何通过PLINQ优化代码性能,如何避免常见的陷阱以及如何在适当的场景下使用它。

1. 基础使用:并行计算演示

首先,是如何使用PLINQ进行简单的并行计算

var seq = Enumerable.Range(1, 16);
// Parallel.ForEach演示
Parallel.ForEach(seq, Console.WriteLine);
PrintHeader("并行效果演示");
var pSeq = seq.AsParallel();
pSeq.ForAll(Console.WriteLine);
// output
1
2
5
16
3
4
6
7
8
9
10
11
12
13
14
15

在这段代码中,展示了使用Parallel.ForEach 以及ForAll方法,我们能够发现其输出是乱序的

因为并行计算的顺序无法预测,谁先执行完毕,谁就先输出,并不会保证顺序。

2. 使用WithDegreeOfParallelism控制并行度

为了更好地控制并行计算的性能,我们可以使用WithDegreeOfParallelism方法来限制并发数。这样可以更精确地控制并行任务的数量,从而避免过度并行化带来的性能开销。(默认与CPU线程数相同)

PrintHeader("foreach 演示");
foreach (var item in pSeq.WithDegreeOfParallelism(4))
{
    Console.WriteLine(item);
}

// output
1
5
9
13
2
6
10
14
3
7
11
15
4
8
12
16

运行以上代码,可以明显看到输出有4组的特征(如果不设置,并且你的cpu较为现代的话,这里可能会顺序显示所有结果,这是foreach的不同之处 )

3. 有序化并行计算结果

有时我们需要确保并行计算的结果是有序的。在PLINQ中,使用AsOrdered可以让并行计算保持输入数据的顺序:

PrintHeader("有序化效果演示");
foreach (var item in pSeq
    .AsOrdered()
    .WithDegreeOfParallelism(4))
{
    Console.WriteLine(item);
}

使用AsOrdered可以保证最终的输出顺序与输入顺序一致,但需要注意,这样做会引入一定的性能开销。

当你不需要接下来的计算有序时,可以再次使用.AsUnordered() 取消有序。

4. 隐式有序化:ToArray等方法

调用ToArray,ToList方法会隐式地对数据进行排序,并确保输出的结果是有序的:

PrintHeader("ToArray隐式有序化效果演示");
foreach (var item in pSeq
    .WithDegreeOfParallelism(4)
    .ToArray())
{
    Console.WriteLine(item);
}

5. 分区与线程ID演示

PLINQ能够根据CPU核心数智能地将任务分区,进行并行处理。我们可以通过输出每个元素所在的线程ID来查看每个分区的情况。

PrintHeader("分区线程ID演示");
var partitioned = pSeq
    .WithDegreeOfParallelism(4)
    .Select(s => (s, Thread.CurrentThread.ManagedThreadId));
foreach (var item in partitioned)
{
    Console.WriteLine(item);
}

// output

(1, 22)
(5, 18)
(9, 23)
(13, 20)
(2, 22)
(6, 18)
(10, 23)
(14, 20)
(3, 22)
(7, 18)
(11, 23)
(15, 20)
(4, 22)
(8, 18)
(12, 23)
(16, 20)

这段代码会显示每个元素所在的线程ID,尽管这不直接代表分区id(不同分区可能会用相同的线程) 但这可以帮助我们理解PLINQ是如何分配任务的。

6. 合并操作演示

PLINQ还支持通过Aggregate进行合并操作。在以下示例中,我们演示了如何使用Aggregate进行简单的合并,并在每次合并时输出中间结果。

PrintHeader("合并效果演示");
pSeq
    .WithDegreeOfParallelism(4)
    .Aggregate((acc, curr) =>
    {
        Console.WriteLine("{0} + {1}", acc, curr);
        return acc + curr;
    });
// output
13 + 14
27 + 15
42 + 16
1 + 2
3 + 3
6 + 4
5 + 6
11 + 7
18 + 8
9 + 10
19 + 11
30 + 12
10 + 26
36 + 42
78 + 58

如果需要更精确的控制,我们可以使用Aggregate的一个重载来分别定义合并操作和结果的处理方式。

PrintHeader("更具体的合并演示");
pSeq
    .WithDegreeOfParallelism(4)
    .Aggregate(
    seed: 0,
    (acc, curr) =>
    {
        Console.WriteLine("处理: {0} + {1} {2}", acc, curr, Thread.CurrentThread.ManagedThreadId);
        return acc + curr;
    },
    (total, subTotal) =>
    {
        Console.WriteLine("合并: {0} + {1} {2}", total, subTotal, Thread.CurrentThread.ManagedThreadId);
        return total + total;
    }, fin => fin
    );

// output

处理: 0 + 1 22
处理: 1 + 2 22
处理: 3 + 3 22
处理: 6 + 4 22
处理: 0 + 5 23
处理: 5 + 6 23
处理: 11 + 7 23
处理: 18 + 8 23
处理: 0 + 9 18
处理: 9 + 10 18
处理: 19 + 11 18
处理: 30 + 12 18
处理: 0 + 13 2
处理: 13 + 14 2
处理: 27 + 15 2
处理: 42 + 16 2
合并: 10 + 26 2
合并: 20 + 42 2
合并: 40 + 58 2

7. 性能测试:CPU密集型任务

在进行并行计算时,了解它在不同场景下的表现是很重要的。我们使用一个简单的CPU密集型任务进行性能对比,展示PLINQ与顺序执行的差异。

PrintHeader("7) 性能:CPU 密集型(示例:顺序 vs PLINQ)");
const int N = 20_000_00;
int dop = Math.Min(Environment.ProcessorCount, 4);
double seqSum = 0, parSum = 0;
TimeIt($"Sequential N={N}", () =>
{
    seqSum = Enumerable.Range(1, N).Sum(CpuWork);
}, repeat: 2);
TimeIt($"Parallel N={N} DOP={dop}", () =>
{
    parSum = Enumerable.Range(1, N)
        .AsParallel()
        .WithDegreeOfParallelism(dop)
        .Sum(CpuWork);
}, repeat: 2);

// output
==============================
7) 性能:CPU 密集型(示例:顺序 vs PLINQ)
==============================
Sequential N=2000000             1061 ms (x2)
Parallel N=2000000 DOP=4          302 ms (x2)

在进行大数据量的计算时,我们能够看到并行计算的优势。特别是当任务计算密集时,PLINQ可以大幅提升性能。

8. 小数据与并行计算的性能问题

对于小数据量,使用并行计算反而可能带来性能下降(切换线程等消耗)。我们通过以下代码进行对比,看看并行计算是否适用于小数据集:


PrintHeader("小数据演示");
var smallN = 100;
TimeIt($"Sequential N={smallN}", () =>
{
    seqSum = Enumerable.Range(1, smallN).Sum(CpuWork);
}, repeat: 500);
TimeIt($"Parallel N={smallN} DOP={dop}", () =>
{
    parSum = Enumerable.Range(1, smallN)
        .AsParallel()
        .WithDegreeOfParallelism(dop)
        .Sum(CpuWork);
}, repeat: 500);
// output
==============================
小数据演示
==============================
Sequential N=100                   14 ms (x500)
Parallel N=100 DOP=4               34 ms (x500)

对于较小的数据集,线程切换的开销可能会使得并行计算反而更慢。

本质上来说,当一个分区的任务足够复杂,超过切换线程等额外开销时,并行才变得有意义

9. I/O密集型任务:模拟与对比

I/O密集型任务的并行化效果并不如CPU密集型任务那样显著。以下是I/O操作的顺序执行与并行执行的对比:

PrintHeader("IO模拟演示");
var ioN = 100;
TimeIt($"Sequential IO N={ioN}", () =>
{
    foreach (var i in Enumerable.Range(1, ioN))
    {
        Thread.Sleep(100);
    }
}, repeat: 1);
TimeIt($"Parallel IO N={ioN} DOP={dop}", () =>
{
    Enumerable.Range(1, ioN)
        .AsParallel()
        .WithDegreeOfParallelism(dop)
        .ForAll(i =>
        {
            Thread.Sleep(100);
        });
}, repeat: 1);

// output

==============================
IO模拟演示
==============================
Sequential IO N=100             10901 ms (x1)
Parallel IO N=100 DOP=4          2718 ms (x1)
Async IO N=100                   1088 ms (x10)

在I/O密集型任务中,使用异步编程async/await往往能够更高效地利用线程资源。

10. 总结

通过上面的代码演示与性能测试,我们了解到:

对于计算密集型任务,PLINQ能够显著提高性能,尤其是在数据量较大的情况下。

对于I/O密集型任务,异步编程(async/await)更加合适,因为它能够有效地利用线程进行等待资源。

使用PLINQ时,需要小心分区与线程开销,避免在小数据或I/O密集型任务中滥用并行计算,同时也需要避免过度并行化。

学会了吗?学会了

Bilibili: @无聊的年

微信公众号: @scixing的炼丹房

视频点击下方查看原文

视频代码仓库: https://github.com/ssccinng/Parallel.Video

图片

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐