Akka-基础设施

定时器和执行计划 scheduler and timer

定时器和执行计划提供定期,定时执行代码逻辑的功能

执行计划scheduler用于指定执行时间和执行逻辑,代码如下:

通过system.scheduler.scheduleOnce(1 second)来指定内部逻辑在指定时间后执行。

通过system.scheduler.schedule(1 second, 2 seconds)来指定经过1s延迟之后,每2s执行一次

通过schedule.cancel()来取消执行计划

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class SelfClosingActor extends Actor with ActorLogging {
var schedule = createTimeoutWindow()

def createTimeoutWindow(): Cancellable = {
context.system.scheduler.scheduleOnce(1 second) {
self ! "timeout"
}
}

override def receive: Receive = {
case "timeout" =>
log.info("Stopping myself")
context.stop(self)
case message =>
log.info(s"Received $message, staying alive")
schedule.cancel()
schedule = createTimeoutWindow()
}
}

val routine: Cancellable = system.scheduler.schedule(1 second, 2 seconds) {
simpleActor ! "heartbeat"
}

计时器timer比scheduler简单,计时器支持指定一个时间,时间到时会向自身actor发送一条消息

line6 启动一个计时器,发送start消息给自己

start消息处理时会将同一个计时器改成周期执行,发reminder消息给自己

如果收到了来自外部的stop消息,则停止计时器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
case object TimerKey
  case object Start
  case object Reminder
  case object Stop
  class TimerBasedHeartbeatActor extends Actor with ActorLogging with Timers {
    timers.startSingleTimer(TimerKey, Start, 500 millis)

    override def receive: Receive = {
      case Start =>
        log.info("Bootstrapping")
        timers.startPeriodicTimer(TimerKey, Reminder, 1 second)
      case Reminder =>
        log.info("I am alive")
      case Stop =>
        log.warning("Stopping!")
        timers.cancel(TimerKey)
        context.stop(self)
    }
  }

路由 router

router提供消息路由的功能,按照一定的规则向多个actor发送消息。典型的应用场景是主从场景

如下例子,master创建了5个slave actor(line4),通过创建Router(line11)并指定策略(RoundRobinRoutingLogic),之后所有和slave的交互都通过router进行(line16,line19,line22)即master根据收到的消息决定如何调动slaves。

下面这种创建router的方式在生产实际中很少使用,只做演示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class Master extends Actor {
// step 1 - create routees
// 5 actor routees based off Slave actors
private val slaves = for(i <- 1 to 5) yield {
val slave = context.actorOf(Props[Slave], s"slave_$i")
context.watch(slave)
ActorRefRoutee(slave)
}

// step 2 - define router
private val router = Router(RoundRobinRoutingLogic(), slaves)

override def receive: Receive = {
// step 4 - handle the termination/lifecycle of the routees
case Terminated(ref) =>
router.removeRoutee(ref)
val newSlave = context.actorOf(Props[Slave])
context.watch(newSlave)
router.addRoutee(newSlave)
// step 3 - route the messages
case message =>
router.route(message, sender())
}
}

如下例子,生产实践中一般直接创建pool routers,即创建子actors就是池化的,并且直接指定路由策略

1
val poolMaster = system.actorOf(RoundRobinPool(5).props(Props[Slave]), "simplePoolMaster")

还可以通过配置来创建池化的子actor和router策略

1
val poolMaster2 = system.actorOf(FromConfig.props(Props[Slave]), "poolMaster2")

第三种实践常用的方式是group routers,这种用法不再要求是父子actor,可以将任意个actor绑定成一组,在其上应用router策略,对其进行消息分发

只要传入对应的paths即可,这样做则不需要在actor中先创建一堆slave actor

1
val groupMaster = system.actorOf(RoundRobinGroup(slavePaths).props())

同理可以用配置的方式创建:

1
val groupMaster2 = system.actorOf(FromConfig.props(), "groupMaster2")

分发者 dispatcher

分发者负责akka中具体的消息发送和处理工作的实现。可以通过自定义的方式来控制如何发送和处理消息。

如下例子,通过实现 ExecutionContext,从配置中找到 my-dispatcher的配置,来指定

1
2
3
4
5
6
7
8
9
10
11
12
13
class DBActor extends Actor with ActorLogging {
// solution #1
implicit val executionContext: ExecutionContext = context.system.dispatchers.lookup("my-dispatcher")
// solution #2 - use Router

override def receive: Receive = {
case message => Future {
// wait on a resource
Thread.sleep(5000)
log.info(s"Success: $message")
}
}
}

配置如下:
首先指定类型,常用dispartcher

pinnedDispatcher:会强制将处理线程和当前actor一一对应,即不存在一个线程处理多个actor的情况

callingThreadDispatcher:保证和一个actor的所有交互发生在同一个线程上(简单理解为单线程处理全链路,保证了时序性)测试中常用

1
2
3
4
5
6
7
8
9
# dispatchers demo
my-dispatcher {
type = Dispatcher # PinnedDispatcher, CallingThreadDispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 1
}
throughput = 30
}

邮箱 mailboxes

邮箱用于控制消息的存储方式,可以通过自定义的方式来控制存入的消息保存特定格式,比如带一个优先级属性

1
2
3
4
5
6
7
8
9
class SupportTicketPriorityMailbox(settings: ActorSystem.Settings, config: Config)
extends UnboundedPriorityMailbox(
PriorityGenerator {
case message: String if message.startsWith("[P0]") => 0
case message: String if message.startsWith("[P1]") => 1
case message: String if message.startsWith("[P2]") => 2
case message: String if message.startsWith("[P3]") => 3
case _ => 4
})

实现之后,需要在配置信息中进行注册,以便后续使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# mailboxes demo
mailboxesDemo {
support-ticket-dispatcher {
mailbox-type = "part5infra.Mailboxes$SupportTicketPriorityMailbox"
}

control-mailbox {
mailbox-type = "akka.dispatch.UnboundedControlAwareMailbox"
}

akka.actor.deployment {
/altControlAwareActor {
mailbox = control-mailbox
}
}
}

使用时,需要通过dispatcher的方式绑定需要用的mailbox

1
val supportTicketLogger = system.actorOf(Props[SimpleActor].withDispatcher("support-ticket-dispatcher"))

至此,我们定义了一个能够表示消息优先级的mailbox,同时用一个绑定该邮箱的dispatcher来处理消息,会按照消息优先级来处理。
另外一种改变消息顺序的方式是定义特殊的控制消息

1
case object ManagementTicket extends ControlMessage

继承了controlMessage的消息会被最优先处理
这里使用的mailbox是akka实现好的 UnboundedControlAwareMailbox


Akka-基础设施
http://www.bake-data.com/2024/07/19/Akka-基础设施/
Author
shuchen
Posted on
July 19, 2024
Licensed under