源栈培训:C#进阶-8:异步和并行(一)

更多
2019年06月16日 11点31分 作者:叶飞 修改

First of All:不要把异步和多线程混为一谈! —— 这是最大的误区。

图示:

概念区别:

  • 并行(parallel):多个任务真正的“同时”进行 ,只能在多处理器上实现
  • 并发(concurrency):多个任务在运行时间上有重叠(所以看起来像是在同时运行),通常在单处理器上实现。
  • 多线程(multiple-threads):一个进程上有多个线程同时运行。在某些语境下可以和“并发”混用,因为目前的操作系统都是使用多线程实现并发。
    1. 进程(process):重量级,操作系统分配资源(内存/CPU/IO),通常是一个运行中的程序
    2. 线程(thread):轻量级,共享进程资源

解释:为什么需要“假的”并行——并发?

  • 增加响应(responsive),常用于UI
  • 多核CPU提高吞吐量(throughput)。主要是源于CPU(尤其是多核/多处理器)和I/O操作在效率上的巨大差距。如果只能同步串行的话,CPU存在大量的“等待空闲”时间,不利于资源利用。

托管(managed)线程

Thread.Sleep()

  • primary:主(启动)线程
    Thread current = Thread.CurrentThread;
    
  • worker:工作线程
    Thread current = new Thread(new ThreadStart(Process));
    
            public static void Process()
            {
                Console.WriteLine($"ThreadId:{Thread.CurrentThread.ManagedThreadId}");
            }
显示线程相关信息:
            Console.WriteLine(Thread.GetDomain().FriendlyName);
            Console.WriteLine(current.ManagedThreadId);
            Console.WriteLine(current.Priority);
            Console.WriteLine(current.IsThreadPoolThread);
            Console.WriteLine(current.ThreadState);

上述线程都是前台(foreground)线程:主线程和new Thread()创造。

此外还有后台(background):在线程池中维护。

前后台线程的区别:如果前台线程终止,后台线程也会被结束。

可以改变线程的前后台状态:

            current.IsBackground = true;


线程方法:

  • Sleep()
  • Start()
  • 其他一些deprecated方法:Abort()、Suspend()、Resume()……


线程池(thread pool)

理解“池”的概念:

  • 池中存放多个线程
  • 一个线程使用完成后并不销毁,而是放回池中
  • 下次使用时直接从池中取出未使用的线程

一个进程只有一个线程池,池中的线程都是:

  • 后台线程
  • 使用默认的priority
  • 使用相同的栈大小
  • is in the multithreaded apartment (一个COM组件允许多个线程访问,COM组件内部具有同步线程的能力,参考:StackOverflow
  • 适用于“短期运行”的线程

可以设置线程池的最大/最小线程数

线程池中线程的调度依赖于:


TaskScheduler

使用队列:Global / Local 

防止有人偷懒(work stealing)

处理长运行(long-running)线程:会阻塞队列

Task inlining:当一个Task在等待的时候,它就可以在“等待”状态的线程上同步执行


Understanding the Costs of Async and Await


引入 线程池TaskScheduler 封装Thread之后,总体上来说,.NET鼓励我们使用:(为什么?参考:托管线程处理的最佳做法


基于任务(Task)的异步编程


Task可以理解为工作在未来某个时间一定会完成的承诺(promise)

注意:Task是比线程“更高层的抽象”,它是对任务(work)的封装,而不是线程(thread)的封装。因为:

  • 是否开启一个新线程,是由scheduler决定的
  • developer可以强制要求开启一个新的线程,但无法要求任务必须运行在当前线程

.NET为我们提供了两种Task:

  • Task:不返回值
  • Task<T>:返回一个T类型对象
            Action getup = () =>
            {
                Console.WriteLine($"Task-{Task.CurrentId}:起床啦!...." +
                    $"ThreadId:{Thread.CurrentThread.ManagedThreadId}");
                Console.WriteLine();
            };

            for (int i = 0; i < 10; i++)
            {
                Console.WriteLine($"第{i + 1}次:");
                Console.WriteLine();

                Task t1 = new Task(getup);

                t1.Start();     //new出来的task还需要显式调用Start()才开始运行
                Console.WriteLine("t1.Start()...");
                Show(t1);

                //t1.RunSynchronously();    //同步运行
                //Console.WriteLine("t1.RunSynchronously()...");

                //Task t1 = Task.Run(getup);    //更灵活
                //Console.WriteLine("Task.Run(getup)...");
                //Task t1 = Task.Factory.StartNew(getup);   //更多重载(控制)
                //Console.WriteLine("Task.Factory.StartNew(getup)...");

                //t1.Wait();      //确保t1任务完成
                //Console.WriteLine("t1.Wait()...");

                Console.WriteLine();
            }

推荐顺序:Task.Run() =>Task.Factory.StartNew() =>new Task()

理解区别:

  • t1.Wait();
  • t1.RunSynchronously();

对于Task<T>而言,区别在于:

            Func<long> getup = () =>    //使用Func而不是Action
            {
                Console.WriteLine($"Task-{Task.CurrentId}:起床啦!...." +
                    $"ThreadId:{Thread.CurrentThread.ManagedThreadId}");
                return DateTime.Now.Ticks;
            };
                Task<long> t1 = new Task<long>(getup);
                Console.WriteLine("t1.Result:" + t1.Result);    //使用Result属性获得返回值
                Console.WriteLine("t1.Status:" + t1.Status);

程序会保证(阻塞其他线程)总是能取到Result的值,和Wait()相似?

其他常用方法:



向Task<T>传递参数

  • 直接使用外部变量
                for (int i = 0; i < 10; i++)
                {
                    Console.WriteLine();Task<long> t1 = new Task<long>(() =>
                    {
                        Console.WriteLine($"第{i + 1}次:");
                        Console.WriteLine($"Task-{Task.CurrentId}:起床啦!...." +
                            $"ThreadId:{Thread.CurrentThread.ManagedThreadId}");
                        return DateTime.Now.Ticks;
                    });}
    

    这样做的问题是,由于异步调用,i 值会“飘忽不定”。更奇怪的是,还有可能出现 i = 10 的情形……

  • 在 new Task<long>() 时传入一个参数:
                    Task<long> task = new Task<long>((n) =>
                    {
                        Console.WriteLine($"第{n}次Task-{Task.CurrentId}:起床啦!...." +
                            $"ThreadId:{Thread.CurrentThread.ManagedThreadId}");
                        return DateTime.Now.Ticks;
                    },i+1);
    可以保证每次task收到恒定的传入值。

ContinueWith

确保一个线程在之前线程完成之后再开始运行。

                Task<DateTime> getup = Task<DateTime>.Run(() =>
                {
                    Thread.Sleep(100);
                    Console.WriteLine($"ThreadId:{Thread.CurrentThread.ManagedThreadId} "+
                        $"Task-{Task.CurrentId}:起床啦!....");
                    return DateTime.Now;
                });

                getup.ContinueWith((x) =>
                {
                    Console.WriteLine($"ThreadId:{Thread.CurrentThread.ManagedThreadId} " +
                        $"Task-{Task.CurrentId}:起床结束");
                    Console.WriteLine($"ThreadId:{Thread.CurrentThread.ManagedThreadId} " +
                        $"Task-{Task.CurrentId}:刷牙洗脸....");
                    Console.WriteLine($"ThreadId:{Thread.CurrentThread.ManagedThreadId} " +
                        $"Task-{Task.CurrentId}: {x.Result}");
                });


注意:

  • ContinueWith()会返回一个新的Task,
  • 该Task处于 WaitingForActivation 状态
  • 会在之前Task完成后自动运行,(??实际上是之前Task完成后再用“回调”的方式调用当前Task,所以两个Task总是处于相同的运行环境。


异常处理

当前线程无法捕获另外一个线程抛出的异常(演示:略)

除非碰到了Task.Wait() 方法:

  • 多个线程中的所有的异常被包裹在AggregateException中
  • 可以通过使用AggregateException异常实例中的:
    • InnerExceptions 依次获取所有线程上的异常,以及
                  Task[] getups = new Task[10];
                  for (int i = 0; i < 10; i++)
                  {
                      //Thread.Sleep(10);
                      getups[i] = Task.Run(() =>
                      {
                          throw new Exception(i.ToString());
                      });
                  }
      
                  try
                  {
                      Task.WaitAll(getups);
                  }
                  catch (AggregateException ae)
                  {
                      foreach (var item in ae.InnerExceptions)
                      {
                          Console.WriteLine(item.Message);
                      }
                  }
    • Handle() 方法处理异常
                      ae.Handle(x => {
                          Console.WriteLine(x.Message);
                          return true;
                      });

此外,也可以直接使用:

                //if (task.Status == TaskStatus.Faulted)
                if (task.IsFaulted)
                {
                    foreach (var item in task.Exception.InnerExceptions)
                    {
                        Console.WriteLine(item);
                    }
                }

注意在此之前,添加语句:

                while (!task.IsCompleted){}     //异常只会在task完成之后才会被收集


终止某个任务

只需要按部就班的书写如下代码即可:

            CancellationTokenSource source = new CancellationTokenSource();
            CancellationToken token = source.Token;

            Task<DateTime> getup = Task<DateTime>.Run(() =>     //getup已经Run()起来了
            {
                token.ThrowIfCancellationRequested();   //如果被cancel(),抛出异常
                Console.WriteLine($"Task-{Task.CurrentId}:起床啦!...." +
                    $"ThreadId:{Thread.CurrentThread.ManagedThreadId}");
                return DateTime.Now;
            }, token);      //传入token指令,确保cancel通知能够被侦听

            source.Cancel();   //使用source进行cancel

            try
            {
                getup.Wait();   //只有在Wait()时才能捕获异常
            }
            catch (AggregateException ae)
            {
                //使用Handle()方法进行处理
                ae.Handle((e) => 
                {
                    Console.WriteLine("canceled?");
                    return true;    //表示已经成功处理,不需要再抛出异常
                });
            } 
体会代码之美:
  • CancellationTokenSource类的封装

  • token的注入
除了抛出异常,还可以使用:
                    token.Register(() => { Console.WriteLine("registered cancel handler"); });
但是奇怪的是,需要在Cancel()之前加一句:
                Thread.Sleep(1);

想想是为什么?


.NET类库实例

File相关:File/Stream

网络相关:HttpClient

XML相关:XElement

(演示:async搜索,略

注意命名规范:添加Async后缀

想一想:如何使用这些方法返回的Task?有没有什么问题?(Status/Result/……


作业:

重构之前的验证码作业:

  1. 创建一个新的前台线程(Thread),在这个线程上运行生成随机字符串的代码
  2. 在一个任务(Task)中生成画布
  3. 使用生成的画布,用两个任务完成:
    • 在画布上添加干扰线条
    • 在画布上添加干扰点
  4. 将生成的验证码图片异步的存入文件
  5. 能捕获抛出的若干异常,并相应的处理
  6. 以上作业,需要在控制台输出线程和Task的Id,以演示异步并发的运行。

作业点评

  • 不要用大小写来区分方法
  • 理解字段的作用
  • 使用IList而不是List
  • 代码的书写习惯
  • 习惯:提交之前查看改动(对比)
    ---
  • 演示你最近的三次提交
作业点评
  • x,y => width/height
  • Factory 的构造函数不应该有参数(x,y)
  • 规划好类的“入口”“出口”(构造函数/访问修饰符)
  • 字段用于内部,属性用于外部
  • 一般在构造函数中给字段赋值,而不是直接赋值
源栈培训 C# 进阶 异步 多线程
赞: 0 踩: 0

打赏
已收到打赏的 帮帮币

你的 打赏 非常重要!
为了保证文章的质量,每一篇文章的发布,都已经消耗了作者 1 枚 帮帮币
没有“帮帮币”,作者无法发布新的文章。

全系列阅读
评论 / 0
叶飞的系列文章

源栈培训:ASP.NET全栈开发

飞哥的源栈培训:线上全程直播,免费收看;线下拎包入住,按周计费。本系列收录所有讲义(含视频录播地址)

编程那些事:菜鸟入门

大飞哥倾力之作,面向有意入行IT/开发/编程的初学者,欢迎任何形式的留言建议……

从包工头到程序猿

真实故事,讲述我在家装公司关门之后,如何转行成为一个程序猿的故事。(《折腾》第三卷)

《折腾》(卷一)青涩

时间段:从大学毕业到开始创业。离开青葱校园,涉世之初的那些往事……

《折腾》(卷二)风雨 之(1)工地

我一个完全的门外汉(无论装修还是管理),开始给黎叔装修房子。从踌躅满志,到四处碰壁;从一往直前,到左右为难……

《折腾》(卷二)风雨 之(2)胸怀

作为一个律师,接工程没签合同,被狠狠的坑了一把!年轻人暴烈的想要复仇,黎叔教他一个企业家的胸怀……

《折腾》(卷二)风雨 之(3)渠道

成立了公司,招聘了员工,开始大力的拓展业务,一个接一个的坑,摔倒了又爬起来……

《折腾》(卷二)风雨 之(4)视野

经历残酷现实的磨砺,终于明白:干啥事,都不能闭门造车,人要走出去,开阔视野……

未分类

系统自动生成的未分类系列

一锅大杂烩

从律师到包工头,从码农到写手,读书交友生活创业,各种零零碎碎,乱七八糟……

人人都是程序猿

计算机编程普及课程,视频:https://space.bilibili.com/55410301/#/channel/detail?cid=49491

全部
关键字



帮助

反馈