一、整体框架(异步任务框架)
1. 整体框架
在Asyncflow项目中,主要包括以下部分的设计:
- 任务调度器Worker
- 异步执行实际的任务
- 执行完毕后将任务status、stage更新提交到flowSvr
- 代理FlowSvr
- 接收任务请求
- 将任务请求转发给Worker
- 获取数据库中的任务
- 任务治理(定时执行任务位置表更新、定时轮询是否达到分表阈值500w(更新任务位置表)、间隔查看和处理卡死任务等)
- DB数据库设计
- 数据库表、字段、索引、优先级设计
- 上下文设计:content字段,框架worker和flower不需要理解上下文,只需要存储和传递,所以可以实现对各种类型任务的快速注册(新增任务类型typeX)
- 分表设计
- 数据库表、字段、索引、优先级设计
- Cache缓存设计
- 分布式锁设计
在主要实现流程中,代理做为worker、用户、数据库之间的桥梁,负责接收、分发任务以及管理任务状态转移。
由于worker是异步执行的,因此对于多个woker获取同一类型任务时,需要在worker与flowSvr之间加一层分布式锁,只有获得锁的worker才能向flowSvr请求任务。
具体流程如下:
2. 中间件
在整体框架中,主要用到的中间件为MySQL和Redis,根据需求可以引入消息队列
- MySQL作为持久化存储,用于存储任务的状态、优先级、任务类型等信息
- Redis作为信息缓存加速查询,以及实现分布式锁解决冲突问题
- 一类任务的多个worker需要一把分布式锁
- 而不同任务之间会打到不同flowsvr上,操作不同的表格,所以不同任务之间不需要加锁
3. 框架部署方式
当前部署方式为:
- 多flowSvr单MySQL单Redis:
- 多个flowSvr部署在单机上,共用一个MySQL数据库,通过分布式锁解决并发问题
- 路由通过Nginx实现负载均衡
- 部署在阿里云服务器上
- 多Worker
- worker处理任务的接口逻辑需要用户自己实现
- worker用于占据并执行任务、更新任务状态(任务提交)
当前阶段单机(1个server)中部署了3个flowSvr pod和3个worker,通过Nginx实现负载均衡,MySQL和Redis也是单机部署
4. 表结构设计
1)任务信息表t_video_task_1
以其中一种任务类型video
为例,其表名为t_video_task_1
,表结构如下:
字段名 | 类型 | 说明 |
---|---|---|
id | bigint | 自增ID,主键 |
user_id | varchar | 用户ID |
task_id | varchar | 任务ID,唯一约束 |
task_type | varchar | 任务类型 |
task_stage | varchar | 任务阶段 |
task_status | varchar | 任务状态:待执行1、执行中2、成功3、失败4 |
crt_retry_num | int | 当前已经重试次数 |
max_retry_num | int | 最大重试次数 |
max_retry_interval | int | 最大重试时间间隔:渐进式重试 |
schedule_log | varchar(4096) | 任务调度日志:json格式 |
priority | int | 任务优先级:默认为0 |
task_content | varchar(4096) | 任务内容:json格式 |
order_time | int | 任务调度时间:与优先级、当前失败后重试间隔有关 |
create_time | datetime | 创建时间 |
modify_time | datetime | 修改时间 |
id
和task_id
的区别id
是表内自增不对外暴露的task_id
可以是uuid,提供用户进行轮询查询任务状态以及做为唯一标识保证接口幂等的
task_type
- 这里是用于识别不同类任务的,比如音视频和北斗解算就是不同类型任务有不同type
- 而对于同一个任务的多阶段是用
task_stage
去区分的,不同类型任务不会出现在一张表格中的
task_stage
:如一个任务有三个阶段,则有task_1、task_2、task_3- 举例:北斗解算可以划分为采集数据过滤预处理、数据解算两个阶段
status
- 指的是worker对当前任务的执行状态,有4种
- 过程态会在不同任务阶段动态变化:1(等待worker调度)、2(正在被worker调度)
- 任务终态不会变化:3(任务失败)、4(任务成功)
- 指的是worker对当前任务的执行状态,有4种
max_retry_interval
:比如设置为10,重试时会从1开始,翻倍递增,直到10后一直不变,直到达到最大重试次数- 公式:
max_retry_interval = min(1<<crt_retry_num, max_retry_interval)
- 举例:[1, 2, 4, 8, 10, 10, 10, 10, 10, 10]
- 公式:
schedule_log
:json格式,包含本次任务追踪id、耗时、失败原因等信息task_content
- 一般是存json,所以flowsvr对语义是无感知的,worker方与用户需要统一里面的语义进行业务操作
- 只能存储4096字节,如果业务复杂的话应该开辟单独的表专门存储上下文,然后在task_content中值存一个类似content_id的标识用于查询对应的结果
order_time
:用于排序设置优先级的,与优先级、当前失败后重试间隔有关(实现重试的关键)
2)任务配置表t_schedule_cfg
属于配置表,用于存储各类任务的系统配置信息,包括单次拉取数、重试配置,表结构如下:
字段名 | 类型 | 说明 |
---|---|---|
task_type | varchar | 任务类型 |
schedule_limit | int | 单次拉取任务数 |
schedule_interval | int | 每次拉取任务间隔 |
max_retry_num | int | 最大重试次数 |
max_retry_interval | int | 最大重试时间间隔 |
create_time | datetime | 创建时间 |
modify_time | datetime | 修改时间 |
3)任务位置表t_schedule_pos
也属于一种配置信息,属于动态变化的配置,用于实现分表,分表有助于实现负载均衡
最主要的字段是schedule_begin_pos和schedule_end_pos,分别用于标记worker下次调度在哪个表拉取任务和user下次在哪个表插入任务,两个值一般跨度最大只有两个表,表结构如下:
字段名 | 类型 | 说明 |
---|---|---|
task_type | varchar | 任务类型 |
schedule_begin_pos | int | 下次调度在哪个表拉取任务 |
schedule_end_pos | int | 下次调度在哪个表插入任务 |
create_time | datetime | 创建时间 |
modify_time | datetime | 修改时间 |
schedule_begin_pos
和schedule_end_pos
的更新- 在flowSvr的任务治理模块中,会定时更新这两个信息
schedule_begin_pos
更新:定时全表扫描,当前表status
都为终态3执行成功
或4执行失败
时,更新pos++schedule_end_pos
更新:当前表count>500w时,更新pos++
schedule_begin_pos
和schedule_end_pos
的作用schedule_begin_pos
:- 拉取任务会根据
begin_pos
找到那张表 - 然后用sql语句按顺序过滤筛选一批未执行完所有阶段且处于待执行的任务
- 根据
order_time
选出top limit条任务 - 更新或查询任务时不需要获取任务位置表,因为task_id会自带这个任务的位置信息
- 拉取任务会根据
schedule_end_pos
:user下次在哪个表插入任务
5. 状态流转
6. 选用的网络通信协议
走HTTP实现通信
7. webserver依赖框架
- tinyWebServer
- Drogon框架(有完整的路由配置)
二、使用场景
Asyncflow项目主要用于解决异步任务调度问题,因此适用于需要多阶段任务、阶段间相互依赖且耗时的异步执行的场景,例如:
- 音视频
- 图片处理
- 数据处理…
三、同步和异步的区别
1)同步与异步的区别
同步任务是指任务执行完毕后,才会继续执行下一个任务。而异步任务是指任务执行过程中,可以继续执行下一个任务,只需要轮询处理结果即可,因此异步很适合在耗时的操作中使用。
相当于在同步任务中,user发起任务请求后会阻塞干等;而在异步任务中,user发起任务请求后会收到一个任务ID,然后可以继续做其他事情,等到任务执行完毕后再获取结果(通常是通过回调函数or轮询结果)。
本项目中同一个任务的多个阶段是同步执行的,需要按顺序,而不同任务之间是异步执行的,可以并行执行。
2)异步时对外提供的接口
异步需要暴露给生产者(用户)的接口通常有两个:
- 发起任务接口:用于生产者提交发起任务,返回任务ID
- 查询任务接口:用于查询任务状态,返回任务结果
3)为什么异步结果查询普遍用的是轮询而不是服务器主动通知?
因为轮询对于客户端来说更加灵活,可以根据自己的需求来决定轮询的频率,而服务器主动通知则需要客户端保持长连接(得使用WebSocket、HTTP2等支持主动push的协议,而轮询的话大多数协议都支持),增加了服务器的压力。
4)框架中体现异步的两个主要的点
- 对于用户而言的异步
- 提交任务后,直接交给Async执行,用户不需要等待任务执行完毕,轮询即可
- 对于Async框架而言的异步
- 对于Async框架来说,将任务分解后,worker拉取某个阶段的任务交由下游接口执行,此时的worker相对于下游接口也是用户,只需要异步等待下游接口返回结果就行(多阶段异步),此时框架就可以去做别的任务调度了
- 举例:比如下游业务是向腾讯云发出任务请求,那么其实worker主要要完成的就是向腾讯云调接口而不是Asyncflow实际执行业务逻辑,消费者worker的工作是实现调度,去查结果
四、Asycnflow与消息队列的区别
1. Asycnflow的特点
Asycnflow最主要的亮点是:异步、多阶段任务、任务调度框架化、轻量级
- 天生为异步任务设计(适合多阶段、耗时任务)
- 任务状态转移明确(任务状态:未开始、进行中、已完成)
- 任务优先级明确(任务优先级:高、中、低)
- 提供通用框架,注册各种任务类型,实现任务逻辑即可
- 非框架下,总是做重复代码,于是先做调研做一个框架节省代码开发时间,只需要做业务逻辑就行
- 核心是任务调度器,负责任务的调度和执行,因此相比消息队列(只是消息传递的组件),Asyncflow具备简化任务管理-调度的优势
功能设计上有以下特点:
- 两种重试机制:均匀重试和渐进式重试
- 滚表的分表方式
- 解耦多个排序因素:抽象
order_time
字段作为排序依据 - 接入GORM接入GORM库简化数据库操作,实现数据库连接池优化性能
2. 为什么用Asyncflow而不用消息队列呢?
1)消息队列和Asyncflow的区别与联系
消息队列与Asyncflow之间是纯队列和调度框架的区别,实际上Asyncflow也可以引入消息队列作为中间件提高性能:
- 在flowSvr中,由于拉取跟执行任务是耦合的,所以导致多个worker拉取任务时需要排队
- 如果引入消息队列,可以提前拉取多批任务放入消息队列中,worker从消息队列中拉取任务,这样可以提高并发度,减少worker之间的竞争、加快分布式锁的释放
- 缺点:引入中间件会加入维护成本和额外组件的开发学习成本,同时增加框架的复杂性,可能有过度设计的问题,所以要根据实际情况决定是否引入
2)消息队列和Asyncflow的区别
- 消息队列只是一个组件,需要单独维护和设计调度,且不是天生具备任务管理-任务状态查询的,需要单独开发和维护,其本质是进行消息传递,主要用于解耦(任务本身不依赖),异步(处理耗时任务)处理,削峰(抗住瞬间流量)填谷等场景;而Async的框架就是开发了完整的调度和任务状态更新管理功能,用户只需要注册任务、将任务传入即可,更倾向是一个任务调度器且强调多阶段的依赖性,主要用于任务的调度和执行
- 使用消息队列需要对各个事件驱动的任务阶段加入消息队列,不便于顺序执行流程,且中间服务挂了会导致丢失;而Async通过MySQL持久化存储,稳定性更好
因此,使用Async异步框架可以简化开发流程和维护成本,天生提供任务管理调度能力,且同样具备水平扩展的能力
3)为什么用消息队列而不用Asyncflow呢?
因为消息队列只是消息传递的组件,而Asyncflow是一个任务调度框架,提供了任务的管理、调度、状态更新等功能,更适合于任务的调度和执行。
虽然用Kafka也可以实现,但是也是要重新实现MySQL的接入和一些worker调度逻辑的实现
调研了一下,C++实现这种框架的其实是很少的,Go中有一个Machine框架,但是该框架的功能过与齐全,对于本项目来说属于过度设计,所以觉得仿照其他语言的框架实现一个轻量级Asyncflow,实现单任务多阶段串联
五、C++语言异步框架比较
C++语言的异步框架有很多,比如libuv、boost.asio等,这些框架都是事件驱动的,可以实现异步IO,多线程等功能。
1)libuv
libuv是一个跨平台的异步IO库,主要用于事件驱动的编程,它提供了事件循环、异步IO、定时器、线程池等功能,可以用于开发高性能的网络应用
它更适合于网络编程,异步IO等场景,适用于编写高性能的网络服务器和客户端,如WebSocket服务器、HTTP服务器等
2)boost.asio
boost.asio是一个C++标准库,提供了异步IO、网络编程、多线程等功能,可以用于开发高性能的网络应用,其适用性类似于libuv,但是更加底层,需要自己实现更多的功能
3)Asyncflow
本项目相较于libuv和boost.asio,提高更高级别的抽象层,是用户友好型,提供了任务调度、任务状态管理等功能,适用于多阶段任务、任务依赖等场景,更加适合于任务调度和执行,而不是仅仅网络编程
同时框架具有轻量级、依赖少易维护等优点