本文共 4907 字,大约阅读时间需要 16 分钟。
Akka是Scala语言实现的一套基于Actor模型的异步通信框架,可用于构建高并发、分布式、可容错、事件驱动的基于JVM的应用,在Spark中曾被用于实现进程、节点间通信,在实际项目中协助我们成功搭建了满足业务需求的模型部署平台。
某国内大型连锁餐饮企业旗下拥有大量门店。餐厅门店的每日生产、订货、排班都依赖于每日客单量预估的合理性,其内部数据团队实现了一套预估模型,需要TalkingData帮助构建一个工程化平台以支撑模型的训练和部署,从而将模型真正地应用到实际生产环节中。
经过交流,我们发现在实际生产环境中,在各方面存在一些问题:
异步:所有门店的前日销售、业务等数据均由各自门店的店长负责整合上传。上传的开始时间、结束时间、数据的完整性等均不确定。而模型训练和预测均依赖这部分数据,这就意味这无法为模型训练和预测设置统一的开始入口。
高并发:除了一些特殊类型的门店,绝大多数门店的营业时间相对固定,从店长决定整理上传销售数据,到准备物料、排班准备次日营业,留给模型训练和模型预测回吐预测结果的时间大概为3小时。如果每个门店的预测指标有2至3项,那么需要有足够的调度能力在规定时间内完成大概2万次模型训练加预测流程。
容错:由于门店数量众多且情况各不相同,仍然有很多潜在的因素可能导致流程出错或失败。原则上,某次流程的失败不应该对其他流程造成任何影响,每个流程在平台层面应该成为互相独立的任务。
因此,我们需要一套轻量化的分布式服务框架,来实现满足上述需求的模型训练预测平台,并在一定程度上保证平台的可拓展性。结合此前团队内的技术积累,最终选择了Akka框架用于实现平台的内部通信。
一次完整的预测任务包括:训练数据准备→模型训练→模型结果导出→预测数据准备→预测结果导出,其中数据准备步骤在时间上不确定,模型相关步骤在执行结果上不确定,如果采用同步模型,将会产生大量的等待线程,占用浪费大量资源。在Actor模型中,每个Actor作为一个基本计算单元,回应接收到的消息,同时并行的:
上述操作没有顺序执行的假设,因此可以并行进行。发送者与已经发送的消息解耦,可以进行无需等待的异步通信。
Akka中的Actor本质上就是接收消息并采取行动处理消息的对象,是封装状态和行为的对象,它们唯一的通信方式是交换消息——把消息存放在接收方的邮箱里。Actor自然形成树形结构,这种结构的精髓在于任务被拆开、委托,直到任务小到可以被完整地处理。因此,我们将预测任务的各个步骤拆分抽象,并创建类型消息与步骤对应,将每个步骤交给线程级别的Actor执行处理,通过发送不同类型的消息来触发创建不同操作的Actor,让整个预测流程无需等待。
由于绝大多数门店的营业时间大致相同,平台在流量上会有明显的峰值和低谷,在低谷期间平台需要尽可能减少资源占有量,而在流量峰值来临时平台要能够及时响应,保证足够的可用性。
经过讨论,我们确定了采用Master-Worker模式的平台结构,Master负责接收与分配任务,Worker负责处理执行具体的模型任务。Master和Worker均为独立的ActorSystem,管理内部不不同操作逻辑的Actor,在空闲状态下占有资源很小。Actor为线程级别,同样仅占用极少量资源,生命周期由ActorSystem统一管理。少量请求时,Actor线程具有很高的复用率,请求并发高时,ActorSystem会创建大量的Actor线程用来承接请求,保证可用性。
每个预测任务的模型相关步骤均存在失败的可能性,此外,数据准备过程中的网络波动、内容校验出错等情况,都会导致当前预测任务的失败。对于失败的任务,我们希望能够尽可能记录错误信息,为重跑提供先决条件。
在Akka中,构建了父子Actor的树形监督结构,提供Actor的监督机制以保证容错性,把处理响应错误的责任交给出错对象以外的实体。父Actor创建子Actor来委托处理子任务,同时便会自动地监管它们。子Actor列表维护在父Actor的上下文中,父Actor可以访问它。
通过更进一步的拆分细化,我们将Worker端的Actor分为Prepare和Executor两种,Prepare为主要负责数据准备步骤,Executor负责模型相关步骤,统一由Worker端的父Actor管理,错误和异常均向上层抛出,由Worker端的父Actor记录并发送给的错误收集模块统一处理。
创建ActorSystem时,默认将在classpath中寻找application.conf、 application.json和application.properties,并自动加载:
val system=ActorSystem(\u0026quot;RsModelActorSystem\u0026quot;)val system=ActorSystem(\u0026quot;RsModelActorSystem\u0026quot;, ConfigFactory.load()) //同上
如果想要使用自己的配置文件,可以通过ConfigFactory来配置加载:
\t\tval system = ActorSystem(\u0026quot;UniversityMessageSystem\u0026quot;, ConfigFactory.load(\u0026quot;own-application.conf\u0026quot;)) \t\tval config = ConfigFactory.parseString( \t\ts\u0026quot;\u0026quot;\u0026quot; \t|akka.remote.netty.tcp.hostname = $host \t|akka.actor.provider = akka.remote.RemoteActorRefProvider \t|akka.remote.enabled-transport = akka.remote.netty.tcp \t|akka.remote.netty.tcp.port = 2445 \t\t\u0026quot;\u0026quot;\u0026quot;.stripMargin) \tval system = ActorSystem(\u0026quot;RsModelActorSystem\u0026quot;, \t\tconfig.withFallback(ConfigFactory.load())) //同上
ActorSystem的配置参数中有大量参数可以自定义,需要根据实际需要修改,例如在该项目中,后期单个算法任务对象大小超过了Akka remote默认包大小128000 bytes,需要修改参数 akka.remote.netty.tcp.maximum-frame-size
一个Actor包含了状态、行为、一个邮箱、子Actor和一个监管策略,所有这些封装在一个Actor引用里。Actor对象通常包含一些变量来反映其所处的可能状态,Akka-actor自身的轻量线程与系统的其他部分完全隔离,因此无须担心并发问题。每当一个消息被处理,它会与Actor的当前行为进行匹配。行为是一个函数,它定义了在某个时间点处理当前消息所要采取的动作,需要结合实际需求编写具体逻辑。Actor的邮箱是连接发送者与接收者的纽带,每个Actor有且仅有一个邮箱,所有的发来的消息都在邮箱里排队。可以有不同策略的邮箱实现供选择,缺省时为FIFO。
编写逻辑
在Actor类中,主要逻辑均在receive方法中实现,通过偏函数方法,执行并返回对应的逻辑:
\t\t//ActorLogging提供Actor内部的日志输出\t\tclass RsActor extends Actor with ActorLogging { \t\t\toverride def receive: Receive = { \t\t\tcase MapMessage(parameters) =\u0026gt; \t\t\t\tprintln(parameters.get(\u0026quot;code\u0026quot;))\t \t\t\tcase MapKeyMessage(parameters, key) =\u0026gt; \t\t\t\tprintln(parameters.get(key)) \t\t\tcase StringMessage(msg) =\u0026gt; \t\t\t\tprintln(msg.getBytes().length) \t\t\tcase o: Object =\u0026gt; \t\t\t\tprintln(o.getClass) \t\t\tcase _: AnyRef =\u0026gt; \t\t\t\tprintln(\u0026quot;233\u0026quot;) \t\t\t}\t\t}
生成一个可以接收消息的Actor实例主要有两个方法:
\t\t//生成一个基于本地类的Actor实例\t\tval rsActor = system.actorOf(Props[RsActor], \u0026quot;rsActor\u0026quot;)\t\t//生成一个基于远程地址的Actor实例\t\tval rmActor = \t\t\tsystem.actorSelection(\u0026quot;akka.tcp://RsModelAkkaSystem@192.168.1.9:2445/user/rsActor\u0026quot;)\t\t\t\t//\t使用!向对应的Actor实例发送消息\t\trsActor ! StringMessage(\u0026quot;test\u0026quot;)\t\trmActor ! MapMessage(Map(\u0026quot;code\u0026quot;-\u0026gt;\u0026quot;233\u0026quot;))
Akka中对传递的消息内容并没有太严格要求,可以是基本数据类型,也可以是支持序列化的对象:
\t\t//scala的case class便于简洁地创建消息类\t\tcase class StringMessage(msg: String) extends Serializable\t\tcase class MapMessage(parameters: Map[String, String]) extends Serializable\t\tcase class MapKeyMessage(parameters: Map[String, String], key: String) extends Serializable
Akka作为一款被广泛使用的开源工具,在实际项目中体现出了很多的优势,异步的消息驱动方式也给我们提供了一套新的思路和实现方法。
作者介绍:李天烨,TalkingData数据科学家。毕业于东北大学,任职于TalkingData数据科学团队,从事数据科学自动化相关工作。
转载地址:http://cafgx.baihongyu.com/