AkiraZheng's Time.

Asyncflow项目2:整体设计思路

Word count: 3.7kReading time: 12 min
2024/07/15

一、整体框架(异步任务框架)

1. 整体框架

在Asyncflow项目中,主要包括以下部分的设计:

  • 任务调度器Worker
    • 异步执行实际的任务
    • 执行完毕后将任务status、stage更新提交到flowSvr
  • 代理FlowSvr
    • 接收任务请求
    • 将任务请求转发给Worker
    • 获取数据库中的任务
    • 任务治理(定时执行任务位置表更新、定时轮询是否达到分表阈值500w(更新任务位置表)、间隔查看和处理卡死任务等)
  • DB数据库设计
    • 数据库表、字段、索引、优先级设计
      • 上下文设计:content字段,框架worker和flower不需要理解上下文只需要存储和传递,所以可以实现对各种类型任务的快速注册(新增任务类型typeX)
    • 分表设计
  • Cache缓存设计
    • 分布式锁设计

在主要实现流程中,代理做为worker、用户、数据库之间的桥梁,负责接收、分发任务以及管理任务状态转移。

由于worker是异步执行的,因此对于多个woker获取同一类型任务时,需要在worker与flowSvr之间加一层分布式锁,只有获得锁的worker才能向flowSvr请求任务。

具体流程如下:

2. 中间件

在整体框架中,主要用到的中间件为MySQLRedis,根据需求可以引入消息队列

  • MySQL作为持久化存储,用于存储任务的状态优先级任务类型等信息
  • Redis作为信息缓存加速查询,以及实现分布式锁解决冲突问题
    • 一类任务的多个worker需要一把分布式锁
    • 而不同任务之间会打到不同flowsvr上,操作不同的表格,所以不同任务之间不需要加锁

3. 框架部署方式

当前部署方式为:

  • 多flowSvr单MySQL单Redis
    • 多个flowSvr部署在单机上,共用一个MySQL数据库,通过分布式锁解决并发问题
    • 路由通过Nginx实现负载均衡
    • 部署在阿里云服务器上
  • 多Worker
    • worker处理任务的接口逻辑需要用户自己实现
    • worker用于占据并执行任务、更新任务状态(任务提交)

当前阶段单机(1个server)中部署了3个flowSvr pod3个worker,通过Nginx实现负载均衡MySQLRedis也是单机部署

4. 表结构设计

1)任务信息表t_video_task_1

以其中一种任务类型video为例,其表名为t_video_task_1,表结构如下:

字段名 类型 说明
id bigint 自增ID,主键
user_id varchar 用户ID
task_id varchar 任务ID,唯一约束
task_type varchar 任务类型
task_stage varchar 任务阶段
task_status varchar 任务状态:待执行1、执行中2、成功3、失败4
crt_retry_num int 当前已经重试次数
max_retry_num int 最大重试次数
max_retry_interval int 最大重试时间间隔:渐进式重试
schedule_log varchar(4096) 任务调度日志:json格式
priority int 任务优先级:默认为0
task_content varchar(4096) 任务内容:json格式
order_time int 任务调度时间:与优先级、当前失败后重试间隔有关
create_time datetime 创建时间
modify_time datetime 修改时间
  • idtask_id的区别
    • id是表内自增不对外暴露的
    • task_id可以是uuid,提供用户进行轮询查询任务状态以及做为唯一标识保证接口幂等
  • task_type
    • 这里是用于识别不同类任务的,比如音视频和北斗解算就是不同类型任务有不同type
    • 而对于同一个任务多阶段是用task_stage去区分的,不同类型任务不会出现在一张表格中的
  • task_stage:如一个任务有三个阶段,则有task_1、task_2、task_3
    • 举例:北斗解算可以划分为采集数据过滤预处理数据解算两个阶段
  • status
    • 指的是worker对当前任务的执行状态,有4种
      • 过程态会在不同任务阶段动态变化:1(等待worker调度)、2(正在被worker调度)
      • 任务终态不会变化:3(任务失败)、4(任务成功)
  • max_retry_interval:比如设置为10,重试时会从1开始,翻倍递增,直到10后一直不变,直到达到最大重试次数
    • 公式:max_retry_interval = min(1<<crt_retry_num, max_retry_interval)
    • 举例:[1, 2, 4, 8, 10, 10, 10, 10, 10, 10]
  • schedule_log:json格式,包含本次任务追踪id、耗时、失败原因等信息
  • task_content
    • 一般是存json,所以flowsvr对语义是无感知的,worker方与用户需要统一里面的语义进行业务操作
    • 只能存储4096字节,如果业务复杂的话应该开辟单独的表专门存储上下文,然后在task_content中值存一个类似content_id的标识用于查询对应的结果
  • order_time:用于排序设置优先级的,与优先级、当前失败后重试间隔有关(实现重试的关键)

2)任务配置表t_schedule_cfg

属于配置表,用于存储各类任务的系统配置信息,包括单次拉取数重试配置,表结构如下:

字段名 类型 说明
task_type varchar 任务类型
schedule_limit int 单次拉取任务数
schedule_interval int 每次拉取任务间隔
max_retry_num int 最大重试次数
max_retry_interval int 最大重试时间间隔
create_time datetime 创建时间
modify_time datetime 修改时间

3)任务位置表t_schedule_pos

也属于一种配置信息,属于动态变化的配置,用于实现分表,分表有助于实现负载均衡

最主要的字段是schedule_begin_posschedule_end_pos,分别用于标记worker下次调度在哪个表拉取任务user下次在哪个表插入任务,两个值一般跨度最大只有两个表,表结构如下:

字段名 类型 说明
task_type varchar 任务类型
schedule_begin_pos int 下次调度在哪个表拉取任务
schedule_end_pos int 下次调度在哪个表插入任务
create_time datetime 创建时间
modify_time datetime 修改时间
  • schedule_begin_posschedule_end_pos的更新
    • 在flowSvr的任务治理模块中,会定时更新这两个信息
    • schedule_begin_pos更新:定时全表扫描,当前表status都为终态3执行成功4执行失败时,更新pos++
    • schedule_end_pos更新:当前表count>500w时,更新pos++
  • schedule_begin_posschedule_end_pos的作用
    • schedule_begin_pos
      • 拉取任务会根据begin_pos找到那张表
      • 然后用sql语句按顺序过滤筛选一批未执行完所有阶段且处于待执行的任务
      • 根据order_time选出top limit条任务
      • 更新或查询任务时不需要获取任务位置表,因为task_id会自带这个任务的位置信息
    • schedule_end_pos:user下次在哪个表插入任务

5. 状态流转

6. 选用的网络通信协议

HTTP实现通信

7. webserver依赖框架

  1. tinyWebServer
  2. Drogon框架(有完整的路由配置)

二、使用场景

Asyncflow项目主要用于解决异步任务调度问题,因此适用于需要多阶段任务阶段间相互依赖耗时的异步执行的场景,例如:

  • 音视频
  • 图片处理
  • 数据处理…

三、同步和异步的区别

1)同步与异步的区别

同步任务是指任务执行完毕后,才会继续执行下一个任务。而异步任务是指任务执行过程中,可以继续执行下一个任务,只需要轮询处理结果即可,因此异步很适合在耗时的操作中使用。

相当于在同步任务中,user发起任务请求后会阻塞干等;而在异步任务中,user发起任务请求后会收到一个任务ID,然后可以继续做其他事情,等到任务执行完毕后再获取结果(通常是通过回调函数or轮询结果)。

本项目中同一个任务的多个阶段是同步执行的,需要按顺序,而不同任务之间是异步执行的,可以并行执行。

2)异步时对外提供的接口

异步需要暴露给生产者(用户)的接口通常有两个:

  • 发起任务接口:用于生产者提交发起任务,返回任务ID
  • 查询任务接口:用于查询任务状态,返回任务结果

3)为什么异步结果查询普遍用的是轮询而不是服务器主动通知?

因为轮询对于客户端来说更加灵活,可以根据自己的需求来决定轮询的频率,而服务器主动通知则需要客户端保持长连接(得使用WebSocket、HTTP2等支持主动push的协议,而轮询的话大多数协议都支持),增加了服务器的压力。

4)框架中体现异步的两个主要的点

  • 对于用户而言的异步
    • 提交任务后,直接交给Async执行,用户不需要等待任务执行完毕,轮询即可
  • 对于Async框架而言的异步
    • 对于Async框架来说,将任务分解后,worker拉取某个阶段的任务交由下游接口执行,此时的worker相对于下游接口也是用户,只需要异步等待下游接口返回结果就行(多阶段异步),此时框架就可以去做别的任务调度了
    • 举例:比如下游业务是向腾讯云发出任务请求,那么其实worker主要要完成的就是向腾讯云调接口而不是Asyncflow实际执行业务逻辑,消费者worker的工作是实现调度,去查结果

四、Asycnflow与消息队列的区别

1. Asycnflow的特点

Asycnflow最主要的亮点是:异步多阶段任务任务调度框架化轻量级

  • 天生为异步任务设计(适合多阶段、耗时任务)
  • 任务状态转移明确(任务状态:未开始、进行中、已完成)
  • 任务优先级明确(任务优先级:高、中、低)
  • 提供通用框架,注册各种任务类型,实现任务逻辑即可
    • 非框架下,总是做重复代码,于是先做调研做一个框架节省代码开发时间,只需要做业务逻辑就行
  • 核心是任务调度器,负责任务的调度和执行,因此相比消息队列(只是消息传递的组件),Asyncflow具备简化任务管理-调度的优势

功能设计上有以下特点:

  • 两种重试机制:均匀重试渐进式重试
  • 滚表的分表方式
  • 解耦多个排序因素:抽象order_time字段作为排序依据
  • 接入GORM接入GORM库简化数据库操作,实现数据库连接池优化性能

2. 为什么用Asyncflow而不用消息队列呢?

1)消息队列和Asyncflow的区别与联系

消息队列与Asyncflow之间是纯队列和调度框架的区别,实际上Asyncflow也可以引入消息队列作为中间件提高性能:

  • 在flowSvr中,由于拉取跟执行任务是耦合的,所以导致多个worker拉取任务时需要排队
  • 如果引入消息队列,可以提前拉取多批任务放入消息队列中,worker从消息队列中拉取任务,这样可以提高并发度减少worker之间的竞争加快分布式锁的释放
  • 缺点:引入中间件会加入维护成本和额外组件的开发学习成本,同时增加框架的复杂性,可能有过度设计的问题,所以要根据实际情况决定是否引入

2)消息队列和Asyncflow的区别

  • 消息队列只是一个组件,需要单独维护和设计调度,且不是天生具备任务管理-任务状态查询的,需要单独开发和维护,其本质是进行消息传递,主要用于解耦(任务本身不依赖)异步(处理耗时任务)处理,削峰(抗住瞬间流量)填谷等场景;而Async的框架就是开发了完整的调度和任务状态更新管理功能,用户只需要注册任务、将任务传入即可,更倾向是一个任务调度器且强调多阶段的依赖性,主要用于任务的调度和执行
  • 使用消息队列需要对各个事件驱动的任务阶段加入消息队列,不便于顺序执行流程,且中间服务挂了会导致丢失;而Async通过MySQL持久化存储,稳定性更好

因此,使用Async异步框架可以简化开发流程和维护成本,天生提供任务管理调度能力,且同样具备水平扩展的能力

3)为什么用消息队列而不用Asyncflow呢?

因为消息队列只是消息传递的组件,而Asyncflow是一个任务调度框架,提供了任务的管理、调度、状态更新等功能,更适合于任务的调度和执行。

虽然用Kafka也可以实现,但是也是要重新实现MySQL的接入和一些worker调度逻辑的实现

调研了一下,C++实现这种框架的其实是很少的,Go中有一个Machine框架,但是该框架的功能过与齐全,对于本项目来说属于过度设计,所以觉得仿照其他语言的框架实现一个轻量级Asyncflow,实现单任务多阶段串联

五、C++语言异步框架比较

C++语言的异步框架有很多,比如libuvboost.asio等,这些框架都是事件驱动的,可以实现异步IO多线程等功能。

1)libuv

libuv是一个跨平台的异步IO库,主要用于事件驱动的编程,它提供了事件循环异步IO定时器线程池等功能,可以用于开发高性能的网络应用

它更适合于网络编程异步IO等场景,适用于编写高性能的网络服务器和客户端,如WebSocket服务器、HTTP服务器等

2)boost.asio

boost.asio是一个C++标准库,提供了异步IO网络编程多线程等功能,可以用于开发高性能的网络应用,其适用性类似于libuv,但是更加底层,需要自己实现更多的功能

3)Asyncflow

本项目相较于libuv和boost.asio,提高更高级别的抽象层,是用户友好型,提供了任务调度任务状态管理等功能,适用于多阶段任务任务依赖等场景,更加适合于任务调度执行,而不是仅仅网络编程

同时框架具有轻量级依赖少易维护等优点

CATALOG
  1. 一、整体框架(异步任务框架)
    1. 1. 整体框架
    2. 2. 中间件
    3. 3. 框架部署方式
    4. 4. 表结构设计
    5. 5. 状态流转
    6. 6. 选用的网络通信协议
    7. 7. webserver依赖框架
  2. 二、使用场景
  3. 三、同步和异步的区别
  4. 四、Asycnflow与消息队列的区别
    1. 1. Asycnflow的特点
    2. 2. 为什么用Asyncflow而不用消息队列呢?
  5. 五、C++语言异步框架比较