定时器和执行计划 scheduler and timer
定时器和执行计划提供定期,定时执行代码逻辑的功能
执行计划scheduler用于指定执行时间和执行逻辑,代码如下:
通过system.scheduler.scheduleOnce(1 second)来指定内部逻辑在指定时间后执行。
通过system.scheduler.schedule(1 second, 2 seconds)来指定经过1s延迟之后,每2s执行一次
通过schedule.cancel()来取消执行计划
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| class SelfClosingActor extends Actor with ActorLogging { var schedule = createTimeoutWindow()
def createTimeoutWindow(): Cancellable = { context.system.scheduler.scheduleOnce(1 second) { self ! "timeout" } }
override def receive: Receive = { case "timeout" => log.info("Stopping myself") context.stop(self) case message => log.info(s"Received $message, staying alive") schedule.cancel() schedule = createTimeoutWindow() } }
val routine: Cancellable = system.scheduler.schedule(1 second, 2 seconds) { simpleActor ! "heartbeat" }
|
计时器timer比scheduler简单,计时器支持指定一个时间,时间到时会向自身actor发送一条消息
line6 启动一个计时器,发送start消息给自己
start消息处理时会将同一个计时器改成周期执行,发reminder消息给自己
如果收到了来自外部的stop消息,则停止计时器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| case object TimerKey case object Start case object Reminder case object Stop class TimerBasedHeartbeatActor extends Actor with ActorLogging with Timers { timers.startSingleTimer(TimerKey, Start, 500 millis)
override def receive: Receive = { case Start => log.info("Bootstrapping") timers.startPeriodicTimer(TimerKey, Reminder, 1 second) case Reminder => log.info("I am alive") case Stop => log.warning("Stopping!") timers.cancel(TimerKey) context.stop(self) } }
|
路由 router
router提供消息路由的功能,按照一定的规则向多个actor发送消息。典型的应用场景是主从场景
如下例子,master创建了5个slave actor(line4),通过创建Router(line11)并指定策略(RoundRobinRoutingLogic),之后所有和slave的交互都通过router进行(line16,line19,line22)即master根据收到的消息决定如何调动slaves。
下面这种创建router的方式在生产实际中很少使用,只做演示。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| class Master extends Actor { private val slaves = for(i <- 1 to 5) yield { val slave = context.actorOf(Props[Slave], s"slave_$i") context.watch(slave) ActorRefRoutee(slave) }
private val router = Router(RoundRobinRoutingLogic(), slaves)
override def receive: Receive = { case Terminated(ref) => router.removeRoutee(ref) val newSlave = context.actorOf(Props[Slave]) context.watch(newSlave) router.addRoutee(newSlave) case message => router.route(message, sender()) } }
|
如下例子,生产实践中一般直接创建pool routers,即创建子actors就是池化的,并且直接指定路由策略
1
| val poolMaster = system.actorOf(RoundRobinPool(5).props(Props[Slave]), "simplePoolMaster")
|
还可以通过配置来创建池化的子actor和router策略
1
| val poolMaster2 = system.actorOf(FromConfig.props(Props[Slave]), "poolMaster2")
|
第三种实践常用的方式是group routers,这种用法不再要求是父子actor,可以将任意个actor绑定成一组,在其上应用router策略,对其进行消息分发
只要传入对应的paths即可,这样做则不需要在actor中先创建一堆slave actor
1
| val groupMaster = system.actorOf(RoundRobinGroup(slavePaths).props())
|
同理可以用配置的方式创建:
1
| val groupMaster2 = system.actorOf(FromConfig.props(), "groupMaster2")
|
分发者 dispatcher
分发者负责akka中具体的消息发送和处理工作的实现。可以通过自定义的方式来控制如何发送和处理消息。
如下例子,通过实现 ExecutionContext,从配置中找到 my-dispatcher的配置,来指定
1 2 3 4 5 6 7 8 9 10 11 12 13
| class DBActor extends Actor with ActorLogging { implicit val executionContext: ExecutionContext = context.system.dispatchers.lookup("my-dispatcher")
override def receive: Receive = { case message => Future { Thread.sleep(5000) log.info(s"Success: $message") } } }
|
配置如下:
首先指定类型,常用dispartcher
pinnedDispatcher:会强制将处理线程和当前actor一一对应,即不存在一个线程处理多个actor的情况
callingThreadDispatcher:保证和一个actor的所有交互发生在同一个线程上(简单理解为单线程处理全链路,保证了时序性)测试中常用
1 2 3 4 5 6 7 8 9
| # dispatchers demo my-dispatcher { type = Dispatcher # PinnedDispatcher, CallingThreadDispatcher executor = "thread-pool-executor" thread-pool-executor { fixed-pool-size = 1 } throughput = 30 }
|
邮箱 mailboxes
邮箱用于控制消息的存储方式,可以通过自定义的方式来控制存入的消息保存特定格式,比如带一个优先级属性
1 2 3 4 5 6 7 8 9
| class SupportTicketPriorityMailbox(settings: ActorSystem.Settings, config: Config) extends UnboundedPriorityMailbox( PriorityGenerator { case message: String if message.startsWith("[P0]") => 0 case message: String if message.startsWith("[P1]") => 1 case message: String if message.startsWith("[P2]") => 2 case message: String if message.startsWith("[P3]") => 3 case _ => 4 })
|
实现之后,需要在配置信息中进行注册,以便后续使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| # mailboxes demo mailboxesDemo { support-ticket-dispatcher { mailbox-type = "part5infra.Mailboxes$SupportTicketPriorityMailbox" }
control-mailbox { mailbox-type = "akka.dispatch.UnboundedControlAwareMailbox" }
akka.actor.deployment { /altControlAwareActor { mailbox = control-mailbox } } }
|
使用时,需要通过dispatcher的方式绑定需要用的mailbox
1
| val supportTicketLogger = system.actorOf(Props[SimpleActor].withDispatcher("support-ticket-dispatcher"))
|
至此,我们定义了一个能够表示消息优先级的mailbox,同时用一个绑定该邮箱的dispatcher来处理消息,会按照消息优先级来处理。
另外一种改变消息顺序的方式是定义特殊的控制消息
1
| case object ManagementTicket extends ControlMessage
|
继承了controlMessage的消息会被最优先处理
这里使用的mailbox是akka实现好的 UnboundedControlAwareMailbox