Java学习者论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

手机号码,快捷登录

恭喜Java学习者论坛(https://www.javaxxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,购买链接:点击进入购买VIP会员
JAVA高级面试进阶视频教程Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程

Go语言视频零基础入门到精通

Java架构师3期(课件+源码)

Java开发全终端实战租房项目视频教程

SpringBoot2.X入门到高级使用教程

大数据培训第六期全套视频教程

深度学习(CNN RNN GAN)算法原理

Java亿级流量电商系统视频教程

互联网架构师视频教程

年薪50万Spark2.0从入门到精通

年薪50万!人工智能学习路线教程

年薪50万!大数据从入门到精通学习路线年薪50万!机器学习入门到精通视频教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程 MySQL入门到精通教程
查看: 944|回复: 0

在Java中使用协程Coroutine

[复制链接]

该用户从未签到

发表于 2011-9-13 20:20:58 | 显示全部楼层 |阅读模式
1.       背景知识
         现在的操作系统都是支持多任务的,多任务可通过多进程或多线程的方式去实现,进程和线程的对比就不在这里说了,在多任务的调度上操作系统采取抢占式和协作式两种方式,抢占式是指操作系统给每个任务一定的执行时间片,在到达这个时间片后如任务仍然未释放对CPU的占用,那么操作系统将强制释放,这是目前多数操作系统采取的方式;协作式是指操作系统按照任务的顺序来分配CPU,每个任务执行过程中除非其主动释放,否则将一直占据CPU,这种方式非常值得注意的是一旦有任务占据CPU不放,会导致其他任务饿死的现象,因此操作系统确实不太适合采用这种方式。         说完操作系统多任务的调度方式后,来看看通常程序是如何实现支持高并发的,一种就是典型的基于操作系统提供的多进程或多线程机制,每个任务占据一个进程或一个线程,当任务中有IO等待等动作时,则将进程或线程放入待调度队列中,这种方式是目前大多数程序采取的方式,这种方式的坏处在于如想支持高的并发量,就不得不创建很多的进程或线程,而进程和线程都是要消耗不少系统资源的,另外一方面,进程或线程创建太多后,操作系统需要花费很多的时间在进程或线程的切换上,切换动作需要做状态保持和恢复,这也会消耗掉很多的系统资源;另外一种方式则是每个任务不完全占据一个进程或线程,当任务执行过程中需要进行IO等待等动作时,任务则将其所占据的进程或线程释放,以便其他任务使用这个进程或线程,这种方式的好处在于可以减少所需要的原生的进程或线程数,并且由于操作系统不需要做进程或线程的切换,而是自行来实现任务的切换,其成本会较操作系统切换低,这种方式也就是本文的重点,Coroutine方式,又称协程方式,这种方式在目前的大多数语言中都有支持。         各种语言在实现Coroutine方式的支持时,多数都采用了Actor Model来实现,Actor Model简单来说就是每个任务就是一个ActorActor之间通过消息传递的方式来进行交互,而不采用共享的方式,Actor可以看做是一个轻量级的进程或线程,通常在一台4G内存的机器上,创建几十万个Actor是毫无问题的,Actor支持Continuations,即对于如下代码:         Actor                   act方法                            进行一些处理创建并执行另外一个Actor                            通过消息box阻塞获取另一个Actor执行的结果                            继续基于这个结果进行一些处理         在支持Continuations的情况下,可以做到消息box阻塞时并不是进程或线程级的阻塞,而只是Actor本身的阻塞,并且在阻塞时可将所占据的进程或线程释放给其他Actor使用,Actor Model实现最典型的就是erLang了。         对于java应用而言,传统方式下为了支持高并发,由于一个线程只能用于处理一个请求,即使是线程中其实有很多IO中断、锁等待也同样如此,因此通常的做法是通过启动很多的线程来支撑高并发,但当线程过多时,就造成了CPU需要消耗不少的时间在线程的切换上,从而出现瓶颈,按照上面对Coroutine的描述,Coroutine的方式理论上而言能够大幅度的提升Java应用所能支撑的并发量。2.       Java中使用Coroutine
         Java尚不能从语言层次上支持Coroutine,也许Java 7能够支持,目前已经有了一个测试性质的版本[1],在Sun JDK 7尚未正式发布的情况下如希望在Java中使用CoroutineScalaKilim是可以做的选择,来分别看下。         Scala是现在很火的语言之一,Twitter消息中间件基于Scala编写更是让Scala名声鹊起,除了在语法方面所做出的改进外,其中一个最突出的特色就是Scala ActorScala ActorScala用于实现Coroutine的方式,先来具体看看ScalaCoroutine支持实现的关键概念。l  ActorScala Actor可以看做是一个轻量级的Java Thread,其使用方式和Java Thread基本也一致,继承Actor,实现act方法,启动时也是调用start方法,但和Java Thread不同的是,Scala Actor可等待外部发送过来的消息,并进行相应的处理。l  Actor的消息发送机制发送消息到Actor的方式有异步、Future两种方式,异步即指发送后立即返回,继续后续流程,使用异步发送的方法为:actor ! MessageObject,其中消息对象可以为任何类型,并且Scala还支持一种称为case Object的对象,便于在收到消息时做pattern matchingFuture方式是指阻塞线程等待消息处理的结果,使用Future方式发送的方法为:actor !! MessageObject,在等待结果方面,Scala支持不限时等待,限时等待以及等待多个Future或个别Future完成,使用方法如下:val ft=actor !! MessageObject // Future方式发送消息val result=ft() // 不限时等待val results=awaitAll(500,ft1,ft2,ft3)  // 限时等待多个Future返回值val results=awaitEither(ft1,ft2) // 等待个别future完成接收消息方通过reply方法返回Future方式所等待的结果。l  Actor的消息接收机制当代码处于Actoract方法或Actor环境(例如为Actoract方法调用过来的代码)中时,可通过以下两种方式来接收外部发送给Actor的消息:一为receive方式,二为react方式,代码例子如下:receive{         case MessageObject(args) => doHandle(args)}react{         case MessageObject(args) => doHandle(args)}receivereact的差别在于receive需要阻塞当前Java线程,react则仅为阻塞当前Actor,但并不会阻塞Java线程,因此react模式更适合于充分发挥coroutine带来的原生线程数减少的好处,但react模式有个缺点是react不支持返回。receivereact都有限时接收的方式,方法为:receiveWithin(timeout)reactWithin(timeout),超时的消息通过case TIMEOUT的方式来接收。下面来看基于Scala Actor实现并发处理请求的一个简单例子。         class Processor extends Actor{                   def act(){                            loop{                                     react{                                               case command:String => doHandle(command)}}                   }                    def doHandle(command:String){                            // 业务逻辑处理}}当需要并发执行此Processor时,在处理时需要的仅为调用以下代码:val processor=new Processor()processor.startprocessor ! “Hello”         从以上说明来看,要在旧的应用中使用Scala还是会有一些成本,部署运行则非常简单,在Scala IDE Plugin编写了上面的scala代码后,即生成了java class文件,可直接在JVM中运行。Kilim是由剑桥的两位博士开发的一个用于在Java中使用Coroutine的框架,Kilim基于Java语法,先来看看Kilim中的关键概念。l  Task可以认为Task就是Actor,使用方式和Java Thread基本相同,只是继承的为Task,覆盖的为execute方法,启动也是调用taskstart方法。l  Task的消息发送机制Kilim中通过Mailbox对象来发送消息,Mailbox的基本原则为可以有多个消息发送者,但只能有一个消息接收者,发送的方式有同步发送、异步发送和阻塞线程方式的同步发送三种,同步发送是指保证一定能将消息放入发送队列中,如当前发送队列已满,则等待到可用为止,阻塞的为当前Task;异步发送则是尝试将消息放入发送队列一次,如失败,则返回false,成功则返回true,不会阻塞Task;阻塞线程方式的同步发送是指阻塞当前线程,并保证将消息发送给接收者,三种方式的使用方法如下:mailbox.put(messageObject); // 同步发送mailbox.putnb(messageObject); // 异步发送mailbox.putb(messageObject); // 阻塞线程方式发送l  Task的消息接收机制Kilim中通过Mailbox来接收消息,接收消息的方式有同步接收、异步接收以及阻塞线程方式的同步接收三种,同步接收是指阻塞当前Task,直到接收到消息才返回;异步接收是指立刻返回Mailbox中的消息,有就返回,没有则返回null;阻塞线程方式的同步接收是指阻塞当前线程,直到接收到消息才返回,使用方法如下:mailbox.get(); // 同步接收,传入long参数表示等待的超时时间,单位为毫秒mailbox.getnb(); // 异步接收,立刻返回mailbox.getb(); // 阻塞线程方式接收下面来看基于Kilim实现并发处理请求的一个简单例子。         public class Processor extends Task{                   private String command;                   public Processor(String command){                            this.command=command;}public void execute() throws Pausable,Exception{         // 业务逻辑处理}}在处理时,仅需调用以下代码:Task processor=new Processor(command);processor.start();从以上代码来看,Kilim对于Java人员而言学习门槛更低,但对于需要采用coroutine方式执行的代码在编译完毕后,还需要采用Kilimkilim.tools.Weaver类来对这些已编译出来的class文件做织入,运行时需要用织入后生成的class文件才行,织入的方法为:java kilim.tools.Weaver –d [织入后生成的class文件存放的目录] [需要织入的类文件所在的目录],目前尚没有Kilim IDE Plugin可用,因此weaver这个过程还是比较的麻烦。上面对ScalaKilim做了一个简单的介绍,在实际Java应用中使用Coroutine时,通常会出现以下几种典型的更复杂的使用场景,由于Actor模式本身就是异步的,因此其天然对异步场景支持的就非常好,更多的问题会出现在以下几个同步场景上,分别来看看基于ScalaKilim如何来实现。l  Actor同步调用Actor同步调用是经常会出现的使用场景,主要为Actor发送消息给其他的Actor处理,并等待结果才能继续。n  Scala对于这种情况,在Scala 2.7.7中,目前可采取的为以下两种方法:一种为通过Future方式发送消息来实现:class Processor(command:String) extends Actor{         def act(){                   val actor=new NetSenderActor()                   val ft=actor !! command                    println(ft())}}class NetSenderActor extends Actor{         def act(){                   case command:String => {                            reply(“received command:”+command)}}}第二种为通过receive的方式来实现:class Processor(command:String) extends Actor{         def act(){                   val actor=new NetSenderActor()                   actor ! command                    var senderResult=””receive{                            case result:String => {senderResult=result}}println(senderResult)}}class NetSenderActor extends Actor{         def act(){                   case command:String => {                            sender ! (“received command:”+command)}}}但这两种方式其实都不好,因为这两种方式都会造成当前Actor的线程阻塞,这也是因为目前Scala版本对continuations尚不支持的原因,Scala 2.8版本将提供continuations的支持,希望到时能有不需要阻塞Actor线程实现上述需求的方法。还有一种常见的场景是Actor调一段普通的Scala类,然后那个类中进行了一些处理,并调用了其他Actor,此时在该类中如需要等待Actor的返回结果,也可使用上面两种方法。n  KilimKilim中要实现Task之间的同步调用非常简单,代码如下:public class TaskA extends Task{         public void execute() throws Pausable,Exception{                   Mailbox<Object> result=new Mailbox<Object>();Task task=new TaskB(result);                   task.start();                   Object resultObject=result.get();                   System.out.println(resultObject);}}public class TaskB extends Task{         private Mailbox<Object> result;public TaskB(Mailbox<Object> result){                   this.result=result;}public void execute() throws Pausable,Exception{         result.put(“result from TaskB”);}}KilimMailbox.get并不会阻塞线程,因此这种方式是完全满足需求的。l  普通Java代码同步调用Actor由于已有的应用是普通的Java代码,经常会出现这样的场景,就是希望实现在这些Java代码中同步的调用Actor,并等待Actor的返回结果,但由于ScalaKilim都强调首先必须是在ActorTask的环境下才行,因此此场景更佳的方式应为Scala Actor(Kilim Task) à Java Code à Scala Actor(Kilim Task),这种场景在对已有的应用中会是最常出现的,来看看在ScalaKilim中如何应对这样的需求。n  Scala目前Scala中如希望在Java Code中调用Scala Actor,并等待其返回结果,暂时还没办法,做法只能改为从Java Code中去调一个ScalaObject,然后在这个Object中调用Actor,并借助上面提到的receivefuture的方法来获取返回值,最后将这个返回值返回Java Coden  Kilim目前Kilim中如希望实现上面的需求,其实非常简单,只需要在Java Code的方法上加上Throw Pausable,然后通过mailbox.get来等待Kilim Task返回的结果即可,在Kilim中只要调用栈上的每个方法都有Throw Pausable,就可在这些方法上做等待返回这类的同步操作。从上面这两个最常见的需求来看,无疑Kilim更符合需求,但要注意的是对于Kilim而言,如果出现Task à nonpausable method à pausable method这样的状况时,pausable method中如果想执行阻塞当前Task的操作,是无法做到的,只能改造成Task (mailbox上做等待,并传递mailbox给后续步骤) à nonpausable method (传递mailbox) à pausable method (将逻辑转为放入一个Task中,并将返回值放入传递过来的mailbox),这种状况在面对spring aop、反射调用等现象时就会出现了,目前kilim 0.6的版本尚未提供更透明的使用方法,不过据kilim作者提供的一个试用版本,其中已经有了对于反射调用的透明化的支持,暂时在目前只能采用上述方法,迁移成本相对较大,也许以后的kilim版本会考虑这样的场景,提供相应的方法来降低迁移的成本。3.       性能、所能支撑的并发量对比
在对ScalaKilim有了这些了解后,来具体看看采用ScalaKilim后与传统Java方式在性能、所能支撑的并发量上的对比。l  测试模型采用一个比较简单的模型进行测试,具体为有4个线程,这4个线程分别接收到了一定数量的请求,每个请求需要交给另外一个线程去执行,这个线程所做的动作为循环10次获取另外一个线程的执行结果,此执行线程所做的动作为循环1000次拼接一个字符串,然后返回。l  实现代码由于目前Scala版本对Continuation支持不够好,但上面的场景中又有此类需求,所以导致Scala版本的代码写的比较麻烦一些。实现代码以及可运行的环境请从此处下载:http://www.bluedavy.com/open/benchmark.zipl  结果对比测试机器为一台4核的linux机器。TPS的对比结果如下: Load的对比结果如下:
从上面的测试结果来看,在这个benchmark的场景中,基于KilimScala实现的Coroutine版本在随着请求数增长的情况下load的增长幅度都比纯粹的Java版本低很多,Kilim版本表现尤其突出,在TPS方面,由于目前Scala版本对Continuation支持的不好,因此在这个测试场景中有点吃亏,表现反而最差,经过上面的测试可以看到,基于Coroutine版本可以以同样的load或更低的load来支撑更高的TPS
到此为止,基本上对Java中使用Coroutine的相关知识做了一个介绍,总结而言,采用Coroutine方式可以很好的绕开需要启动太多线程来支撑高并发出现的瓶颈,提高Java应用所能支撑的并发量,但在开发模式上也会带来变化,并且需要特别注意不能造成线程被阻塞的现象,从开发易用和透明迁移现有Java应用两个角度而言目前Coroutine方式还有很多不足,但相信随着越来越多的人在Java中使用Coroutine,其易用性必然是能够得到提升的。

4.       参考资料
1.         http://en.wikipedia.org/wiki/Computer_multitasking
2.         http://en.wikipedia.org/wiki/Coroutine
3.         http://en.wikipedia.org/wiki/Actor_model
4.         http://en.wikipedia.org/wiki/Continuation
5.         http://lamp.epfl.ch/~phaller/doc/haller07coord.pdf
6.         http://www.scala-lang.org/sites/default/files/odersky/jmlc06.pdf
7.         http://www.malhar.net/sriram/kilim/kilim_ecoop08.pdf
8.         http://lamp.epfl.ch/~phaller/doc/ScalaActors.pdf

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

QQ|手机版|Java学习者论坛 ( 声明:本站资料整理自互联网,用于Java学习者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

GMT+8, 2025-1-16 02:31 , Processed in 0.541276 second(s), 49 queries .

Powered by Discuz! X3.4

© 2001-2017 Comsenz Inc.

快速回复 返回顶部 返回列表