Akka基础-actors

Actor model

是一种处理并发问题的概念模型,定义了系统组件的行为和交互规则。

在传统的并发编程中,处理对象时:

  • 将其状态存储为数据

  • 调用其方法进行各种操作
    在actor model中:

  • 将所有对象视为actor

  • 将其状态进行存储

  • 通过异步发送消息来进行交互,不直接调用方法
    actor model的核心是单线程和异步,通过消息的形式,将所有交互都变成了异步单线程。解决了线程冲突问题,同时解决了加锁带来的性能损耗。

用一个通俗的例子说明actor模型:

相互发短信或者邮件其实都是actor model。每个人都是actor,在收到他人的信息后,由本人自己决定处理信息的顺序,并不会立即响应。而作为发送方,也不会在发送完消息后不做任何事情一直等待回复。

Akka 定义

akka是一个实现了actor model的框架,基于scala编写,提供java和scala两套api,用于解决分布式系统下的并发交互问题。

官网:https://akka.io/

Akka 具体使用

akka编程遵循以下的大逻辑

  • 首先创建一个actorsystem,负责实例化各种actor,并负责最初向actor发送消息
  • 之后定义若干个actor来实现业务逻辑,这些actor类都需要继承Actor trait
  • 在每一个actor中,需要实现receive方法,用于处理接收到的消息,这个方法是一个partial function,模式匹配各种不同的消息
  • 消息是actor之间传递用的,可以是任何类型
  • 发送消息的动作用!完成

    actors

基本的写法如下:

  • 定义actorSystem
  • 定义actor
  • 实例化一个actor并向其发送消息
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    val actorSystem = ActorSystem("firstActorSystem")

    class Person(name: String) extends Actor {
    override def receive: Receive = {
    case "hi" => println(s"Hi, my name is $name")
    case _ =>
    }
    }

    val person = actorSystem.actorOf(Person.props("Bob"))
    person ! "hi"

下面的例子体现actor的如下特点:

  • actor的receive方法可以接受各种类型的消息,只要有对应的模式匹配就行
  • 注意sender()的用法,sender()表示这个消息的来源,也是一个actor,所以可以直接向sender() 发送消息(line 3)
  • 类似的,self表示actor自己,可以用self向自己发送消息(line8)
  • forward功能保留最初的发送方,当前发送方只是做传递,会将最初的发送方一直传递下去
  • 最初的sender是默认的noSender,比如从actorsystem向actor发消息是,这个消息都会标记来自noSender
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    class SimpleActor extends Actor {
    override def receive: Receive = {
    case "Hi!" => sender() ! "Hello, there!" // replying to a message
    case message: String => println(s"[$self] I have received $message")
    case number: Int => println(s"[simple actor] I have received a NUMBER: $number")
    case SpecialMessage(contents) => println(s"[simple actor] I have received something SPECIAL: $contents")
    case SendMessageToYourself(content) =>
    self ! content
    case SayHiTo(ref) => ref ! "Hi!" // alice is being passed as the sender
    case WirelessPhoneMessage(content, ref) => ref forward (content + "s") // keep the original sender of the WPM
    }
    }

messages

消息可以是任何类型,列举几种常用的

  • 基本数据类型,int,string等

  • case class:定义的结构化的消息,带不同的参数

  • case object:如果这个消息只是为了触发特定操作,只是一个信号,用case object
    消息的限制如下:

  • 消息必须可以序列化

  • 消息必须是不可变的

    behaviors

行为就是指actor受到对应的消息后采取的行动,在receive这个函数中,在对应的模式匹配中实现。

Akka运行原理

akka构造一个actor共享的线程池,少数几个线程(100)就可以处理大量的actor(1000000)。每一个actor其实就是一个数据结构,包含两个部分:1. 定义的事件处理逻辑 2. 一个邮箱,用于接收消息。

akka会进行调度,让这些线程去执行一个个actor,对于每个actor来说,没有线程执行它时,不会有任何变化。当有现成执行它时,按顺序从邮箱中取出消息,然后执行对应的处理逻辑。

akka做出如下保证:

  • 同一时间一个actor只会有一个线程在处理
  • 对于消息的发送,是至多一次的
  • 对于消息的发送方和接收方,接收到的顺序一定和发送的顺序一致(可能会丢,但不会乱序)

    Akka并发处理

首先看下面这个银行actor的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class BankAccount extends Actor {
import BankAccount._
var funds = 0
override def receive: Receive = {
case Deposit(amount) =>
if (amount < 0) sender() ! TransactionFailure("invalid deposit amount")
else {
funds += amount
sender() ! TransactionSuccess(s"successfully deposited $amount")
}
case Withdraw(amount) =>
if (amount < 0) sender() ! TransactionFailure("invalid withdraw amount")
else if (amount > funds) sender() ! TransactionFailure("insufficient funds")
else {
funds -= amount
sender() ! TransactionSuccess(s"successfully withdrew $amount")
}
case Statement => sender() ! s"Your balance is $funds"
}
}

上述银行actor接受三种类型的消息,存款,取款和查询。具体的实现方法是重点,通过一个变量funds来维护账户的余额。这样会有一些问题:

  • 是否线程安全?(根据上面akka运行的原理可以知道,是线程安全的)
  • 是否会出错?(如果只是actor内部使用变量是可以的,但是如果变量值作为输出传给了其他actor会出现问题)
    总之:在actor中使用变量意味着actor是有状态的,这其实不好,不论从可读性,传递引发错误还是扩展性的角度,都不应该使用变量实现这种有状态的actor

最佳方法是将actor都转换成无状态的,具体做法是通过行为转换,将维护的变量变成递归的参数来执行。看下面这个状态转换的例子:

Akka 行为转换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class FussyKid extends Actor {
import FussyKid._
import Mom._
// internal state of the kid
var state = HAPPY
override def receive: Receive = {
case Food(VEGETABLE) => state = SAD
case Food(CHOCOLATE) => state = HAPPY
case Ask(_) =>
if (state == HAPPY) sender() ! KidAccept
else sender() ! KidReject
}
}
class StatelessFussyKid extends Actor {
import FussyKid._
import Mom._
override def receive: Receive = happyReceive
def happyReceive: Receive = {
case Food(VEGETABLE) => context.become(sadReceive, false) // change my receive handler to sadReceive
case Food(CHOCOLATE) =>
case Ask(_) => sender() ! KidAccept
}
def sadReceive: Receive = {
case Food(VEGETABLE) => context.become(sadReceive, false)
case Food(CHOCOLATE) => context.unbecome()
case Ask(_) => sender() ! KidReject
}
}

FussyKid是有状态的,变量state维护其状态
StatelessFussyKid是无状态的,核心是利用context.become方法,转换handler,使得对同一条消息根据状态采取不同回复。

进一步说明,context.become方法接收两个参数,第一个是Receive类型的handler,表示将这个消息转给谁来处理,第二个参数是boolean,表示是否替换当前的handler。下面解释第二个参数。

第二个参数默认为true,上面19行和24行传入了false。为true意味着新的handler直接替换了旧的,永远只有一个handler。为false则会将所有的handler维护成一个栈,每次切换,都会在顶放入一个新handler,每个消息都由顶的handler处理。因此,需要搭配unbecome方法进行出栈操作。回到这个例子,这个参数会直接导致整个逻辑的不同,简单讲,当用栈维护时(false),消息需要逐个抵消(小孩收到几个菜就需要几个巧克力才能开心)。如果直接替换(true)则消息没有累计的效果(不管吃了几个菜,一个巧克力就开心)。

还可以额外加入参数,实现上面银行例子的无状态版本。

1
2
3
4
5
6
7
8
9
10
11
12
13
class BankAccount extends Actor {
import Counter._
override def receive: Receive = countReceive(0)
def countReceive(currentCount: Int): Receive = {
case Deposit(amount) =>
if (amount < 0) sender() ! TransactionFailure("invalid deposit amount")
else context.become(countReceive(currentCount + amount))
case Withdraw(amount) =>
if (amount < 0) sender() ! TransactionFailure("invalid withdraw amount")
else if (amount > currentCount) sender() ! TransactionFailure("insufficient funds")
else context.become(countReceive(currentCount - amount))
case Statement => sender() ! s"Your balance is $funds"
}

Actor 和子actor

在actor的行为中可以创建其他actor,这样创建出来的actor 称为这个actor的子actor。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Parent extends Actor {
import Parent._
override def receive: Receive = {
case CreateChild(name) =>
println(s"${self.path} creating child")
// create a new actor right HERE
val childRef = context.actorOf(Props[Child], name)
context.become(withChild(childRef))
}
def withChild(childRef: ActorRef): Receive = {
case TellChild(message) => childRef forward message
}
}
class Child extends Actor {
override def receive: Receive = {
case message => println(s"${self.path} I got: $message"):
}
}
  • context.actorOf 用于创建子actor
  • 每一个actor都会有一个唯一的路径,可以通过self.path来获取
  • 可以看到,子actor的路径就是父actor路径的下一级
  • 由此可得,actor之间存在着树状的结构,akka定义了三种守卫actor来做为最上层的actor
  • /system:系统级别的actor最高路径,用于实现akka系统的基础功能的actor归于此下,如日志actor
  • /user:用户创建的actor的最高路径,用户创建的actor都在其下
  • /:最高的root,所有actor都在其下
    根据上述路径,可以找到唯一一个actor
1
2
val childSelection = system.actorSelection("/user/parent/child2")
childSelection ! "I found you!"

通过actorSelection方法找到一个actor,可以直接使用,如果路径不存在,则发送给dead letter
特别注意:永远不要将可变的actor状态或者“this”引用传递给子actor!!!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class NaiveBankAccount extends Actor {
import CreditCard._
import NaiveBankAccount._
var amount = 0
override def receive: Receive = {
case InitializeAccount =>
val creditCardRef = context.actorOf(Props[CreditCard], "card")
creditCardRef ! AttachToAccount(this) // !!
case Deposit(funds) => deposit(funds)
case Withdraw(funds) => withdraw(funds)
}
def deposit(funds: Int) = {
println(s"${self.path} depositing $funds on top of $amount")
amount += funds
}
def withdraw(funds: Int) = {
println(s"${self.path} withdrawing $funds from $amount")
amount -= funds
}
}

class CreditCard extends Actor {
override def receive: Receive = {
case AttachToAccount(account) => context.become(attachedTo(account))
}
def attachedTo(account: NaiveBankAccount): Receive = {
case CheckStatus =>
println(s"${self.path} your messasge has been processed.")
// benign
account.withdraw(1) // because I can
}
}

注意第8行,将this(自身的引用)传递给了子actor。
这样做的后果是打破了akka整体通过消息传递进行调用的机制,子actor可以直接调用父actor的方法,或者直接改变父actor的内部变量值了。

完全不符合actor model的设计,会产生实际问题且极难debug

actor和子actor的一个作用就是并行计算(甚至可以做分布式计算),子actor作为executor,父actor作为driver。

Actor日志

通过继承actorlogging类,来实现日志能力

1
2
3
4
5
6
class ActorWithLogging extends Actor with ActorLogging {
override def receive: Receive = {
case (a, b) => log.info("Two things: {} and {}", a, b) // Two things: 2 and 3
case message => log.info(message.toString)
}
}

Akka配置

默认的配置文件位置为src/resources/application.conf

其中通过下面的形式定义配置:

1
2
3
name1 {
name2 = aaa
}

通过ConfigFactory.load().getConfig(“name1.name2”)的方式获取到配置

1
2
3
4
val specialConfig = ConfigFactory.load().getConfig("name1.name2")
val specialConfigSystem = ActorSystem("SpecialConfigDemo", specialConfig)
val specialConfigActor = specialConfigSystem.actorOf(Props[SimpleLoggingActor])
specialConfigActor ! "Remember me, I am special"

Reference

https://www.udemy.com/course/akka-essentials/?couponCode=ST21MT60724

chapter3


Akka基础-actors
http://www.bake-data.com/2024/07/03/Akka基础-actors/
Author
shuchen
Posted on
July 3, 2024
Licensed under