Akka-模式

缓存消息 stashing message

缓存消息提供将消息暂时缓存,当正确的时机到来时,重新将消息放入邮箱并进行处理

如下例子,通过继承stash trait来实现相关功能。

当收到某条消息,但当前无法处理,直接调用stash方法,将消息进行缓存

当要处理缓存的消息时,直接调用unstash方法,所有消息再次进入mailbox

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// step1 - mix-in the Stash trait
class ResourceActor extends Actor with ActorLogging with Stash {
private var innerData: String = ""

override def receive: Receive = closed

def closed: Receive = {
case Open =>
log.info("Opening resource")
// step 3 - unstashAll when you switch the message handler
unstashAll()
context.become(open)
case message =>
log.info(s"Stashing $message because I can't handle it in the closed state")
// step 2 - stash away what you can't handle
stash()
}

def open: Receive = {
case Read =>
// do some actual computation
log.info(s"I have read $innerData")
case Write(data) =>
log.info(s"I am writing $data")
innerData = data
case Close =>
log.info("Closing resource")
unstashAll()
context.become(closed)
case message =>
log.info(s"Stashing $message because I can't handle it in the open state")
stash()
}
}

需要注意:

  • stash是有内存大小限制的,不能过大
  • mailbox也是有大小限制的,过大的unstash有可能使得mailbox消息过多,丢失消息
  • 不要将消息stash两次,会报错
  • stash trait应该在继承的最后(scala的加载机制)

    提问模式 Ask pattern

当发送消息后,希望得到actor处理的回复时,可以使用ack 模式。具体为用?替代!进行消息的发送。

ask 形式发送的消息会返回一个future类型的值,因此可以在此基础上使用onComplete等future等特性进行后续处理。这是服务间交互的常见场景

也可将返回值转发给其他的actor

1
future.mapTo[Option[String]].pipeTo(sender())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class AuthManager extends Actor with ActorLogging {
import AuthManager._

// step 2 - logistics
implicit val timeout: Timeout = Timeout(1 second)
implicit val executionContext: ExecutionContext = context.dispatcher

protected val authDb = context.actorOf(Props[KVActor])

override def receive: Receive = {
case RegisterUser(username, password) => authDb ! Write(username, password)
case Authenticate(username, password) => handleAuthentication(username, password)
}

def handleAuthentication(username: String, password: String) = {
val originalSender = sender()
// step 3 - ask the actor
val future = authDb ? Read(username)
// step 4 - handle the future for e.g. with onComplete
future.onComplete {
// step 5 most important
// NEVER CALL METHODS ON THE ACTOR INSTANCE OR ACCESS MUTABLE STATE IN ONCOMPLETE.
// avoid closing over the actor instance or mutable state
case Success(None) => originalSender ! AuthFailure(AUTH_FAILURE_NOT_FOUND)
case Success(Some(dbPassword)) =>
if (dbPassword == password) originalSender ! AuthSuccess
else originalSender ! AuthFailure(AUTH_FAILURE_PASSWORD_INCORRECT)
case Failure(_) => originalSender ! AuthFailure(AUTH_FAILURE_SYSTEM)
}
}
}

一定注意,在对返回值的处理逻辑中,仍然要遵守actor的原则,即用消息进行交互。不能直接调用方法或改变内部状态。


Akka-模式
http://www.bake-data.com/2024/07/21/Akka-模式/
Author
shuchen
Posted on
July 21, 2024
Licensed under