AkiraZheng's Time.

Asyncflow项目5:flowsvr开发

Word count: 2.9kReading time: 12 min
2024/09/14

1. 配置文件

使用TOML作为配置管理


导入配置文件

通过DecodeFile读取某路径下的配置文件并解析到结构体实例Conf


定义结构体解析配置文件

对于

1
2
3
4
type commonConfig struct {
Port int `toml:"port"`
OpenTLS bool `toml:"open_tls"`
}
  • toml:”port”:是结构体字段的标签(tag),它告诉 Go 中的TOML库应该将文件中的 port 字段映射到该结构体中的 Port变量中。

TOML库添加到go.mod文件中

  • 使用go mod init初始化生成go.mod 文件

    1
    2
    3
    4
    set GO111MODULE=on
    export GOPROXY=https://goproxy.cn
    go mod init [name]//在项目目录下执行,其中name为项目文件夹名
    eg: go mod init Asyncflow-dev-raw
  • 使用go get下载TOML

    1
    go get github.com/BurntSushi/toml

参考

  1. Golang toml完全解析示例
  2. toml-go客户端
  3. 3分钟教你go语言如何使用go module下载指定版本的golang库依赖管理

2. 项目资源

2.1. mysql数据库资源框架:gorm

  • 数据库连接池:选择gorm库

2.1.1.gorm框架的字段映射

gorm中映射字段有显式映射隐式映射两种,隐式映射是通过字段名和类型自动映射,显式映射是通过tag标签映射,如:

1
2
3
4
5
6
7
8
type TaskPos struct {
Id uint64 `gorm:"column:id"`
TaskType string `gorm:"column:task_type"`
ScheduleBeginPos int `gorm:"column:schedule_begin_pos"`
ScheduleEndPos int `gorm:"column:schedule_end_pos"`
CreateTime *time.Time
ModifyTime *time.Time
}

2.3 高性能Web框架:gin

2.3.1. gin框架的初始化

  • 创建Web服务
  • 路由:定义路由组/v1
  • 路由中的POST请求:
    • 注册任务/register_task
    • 创建任务/create_task
    • 占据任务/hold_task
    • 执行完更新任务/set_task
  • 路由中的GET请求:
    • 查询任务/get_task
    • 过滤获取一批任务/get_task_list
    • 获取任务配置表/get_task_schedule_cfg_list
    • 获取某个type任务的数量/get_task_counts_by_type
    • Ping测试/ping
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      // RegisterRouter 注册路由
      func RegisterRouter(router *gin.Engine) {
      v1 := router.Group("/v1")
      {
      // 注册任务
      v1.POST("/register_task", task.RegisterTask)
      // 创建任务接口,前面是路径,后面是执行的函数,跳进去
      v1.POST("/create_task", task.CreateTask)
      // 占据任务
      v1.POST("/hold_tasks", task.HoldTasks)
      // 更新任务
      v1.POST("/set_task", task.SetTask)
      // 查询任务(请求参数是 TaskId)
      v1.GET("/get_task", task.GetTask)
      // 获取任务列表(请求参数是 taskType Limit Status)
      v1.GET("/get_task_list", task.GetTaskList)
      // 获取任务配置信息列表
      v1.GET("/get_task_schedule_cfg_list", task.GetTaskScheduleCfgList)
      // 通过taskType获取任务所有记录数量
      v1.GET("/get_task_counts_by_type", task.GetTaskCountsByType)
      v1.GET("ping", func(c *gin.Context) {
      c.JSON(200, gin.H{
      "message": "pong",
      })
      })
      }
      }
  • 启动web server:router.Run(Port)
    • 启动后主协程会阻塞在这里,等待接收请求

2.3.2. gin框架的请求接收

  • c *gin.Context:请求上下文

  • c.ShouldBind(&task):将请求的json数据自动绑定到结构体task

    • 其中结构体task需要定义json标签,如:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      // TaskData 任务调度数据
      type TaskData struct {
      UserId string `json:"user_id"`
      TaskId string `json:"task_id"`
      TaskType string `json:"task_type"`
      TaskStage string `json:"task_stage"`
      Status int `json:"status"`
      Priority *int `json:"priority"`
      CrtRetryNum int `json:"crt_retry_num"`
      MaxRetryNum int `json:"max_retry_num"`
      MaxRetryInterval int `json:"max_retry_interval"`
      ScheduleLog string `json:"schedule_log"`
      TaskContext string `json:"context"`
      OrderTime int64 `json:"order_time"`
      CreateTime time.Time `json:"create_time"`
      ModifyTime time.Time `json:"modify_time"`
      }

2.3 redis缓存资源:goredis

使用goredis库中的redis连接池,用在创建任务时增加缓存任务信息、以及使用redission库实现分布式锁

2.3.1. redis缓存

redis缓存设置的过期时间以天为单位,在config.toml中设置具体的过期天数

redis缓存主要用于客户端轮询查询任务信息表中某个TaskId对应的任务信息,以及任务配置表中的任务配置信息

2.3.2. 分布式锁

使用setnx还是redission库实现分布式锁?

如果使用setnx,当锁过期无法续期,这样可能导致锁被释放,但是任务还没有执行完,从而导致任务重复执行

因此使用redission库实现分布式锁,通过看门狗机制实现锁的自动续期

2.4 性能分析:pprof

项目中使用pprof进行性能分析,可以通过localhost:26688/debug/pprof/查看性能分析结果

  • main函数中添加pprof的启动代码

    1
    2
    3
     go func() {
    http.ListenAndServe("0.0.0.0:26688", nil)
    }()
  • net/http/pprof包中提供了pprof的几种路由

    • goroutine:查看当前程序中的goroutine数
    • heap:查看堆内存的分配情况
    • threadcreate:查看线程的创建情况
    • block:查看阻塞事件的记录
    • cmdline:查看当前程序的命令行参数
    • profile:查看CPU的profile信息
    • trace:查看当前程序的trace信息
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    // ppfof包中提供了几种路由
    func init() {
    prefix := ""
    if godebug.New("httpmuxgo121").Value() != "1" {
    prefix = "GET "
    }
    http.HandleFunc(prefix+"/debug/pprof/", Index)
    http.HandleFunc(prefix+"/debug/pprof/cmdline", Cmdline)
    http.HandleFunc(prefix+"/debug/pprof/profile", Profile)
    http.HandleFunc(prefix+"/debug/pprof/symbol", Symbol)
    http.HandleFunc(prefix+"/debug/pprof/trace", Trace)
    }

2.5. 日志框架:seelog

用XML格式统一化日志输出的格式,方便后续日志的查看和分析,存储路径为"../log/web.log"

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<seelog minlevel="trace">
<outputs formatid="fmt_info">
<filter levels="trace,debug,info,warn,error,critical">
<rollingfile formatid="fmt_info" type="size" filename="../log/web.log" maxsize="104857600" maxrolls="10"/>
</filter>
<filter levels="error,critical">
<rollingfile formatid="fmt_err" type="size" filename="../log/error/web_error.log" ` +
`maxsize="10485760" maxrolls="100"/>
</filter>
</outputs>
<formats>
<format id="fmt_info" format="%Date(2006-01-02 15:04:05.999):::%Msg%n" />
<format id="fmt_err" format="%Date(2006-01-02 15:04:05.999):::%Msg%n" />
</formats>
</seelog>

参考

  1. GORM 指南:中文官方文档-数据库操
  2. gin框架
  3. golang框架-web框架之gin
  4. 一文搞懂gin框架httprouter路由实现原理
  5. 超全的Go Http路由框架性能比较
  6. Github go-redis
  7. Go Redis 快速入门
  8. Go 日志框架:seelog github
  9. seelog使用

3. 路由对应的任务调度方法:task

接入Handler库,定义所有接口均需处理的两个接口方法:

  • HandleInput:检查输入参数是否合法
  • HandleProcess:参数合法后,HandleProcess处理业务逻辑
1
2
3
4
5
// HandlerIntf handler接口
type HandlerIntf interface {
HandleInput() error
HandleProcess() error
}

3.1. 创建任务create_task

CreateTaskHandler实现HandlerIntf的两个接口,并作为Run方法的参数传入,其中Run方法统一执行自定义的HandleInputHandleProcess

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Package handler 用于接口逻辑处理
package handler

//Run 执行函数
func Run(handler HandlerIntf) error {
err := handler.HandleInput()
if err != nil {
return err
}
err = handler.HandleProcess()
return err
}

// HandlerIntf handler接口
type HandlerIntf interface {
HandleInput() error
HandleProcess() error
}

1)HandleInput

c.ShouldBind(&hd.Req):将请求的json数据自动绑定到结构体task中,然后检查请求参数是否合法

  • 检查hd.Req中的TaskType是否为空
  • 检查hd.Req中的Priority是否为空
  • 检查hd.Req中的Priority值在[0, db.MAX_PRIORIT]之间

2)HandleProcess

其中数据库操作使用gorm库,db.Create方法创建新纪录、db.Save方法更新某一条记录(其中根据表的主键更新)

gorm库的数据库操作方法通过struct中的各个字段隐式映射按顺序对应数据库表中的字段,实现了自动映射,如位置配置表的:

1
2
3
4
5
6
7
8
type TaskPos struct {
Id uint64
TaskType string
ScheduleBeginPos int
ScheduleEndPos int
CreateTime *time.Time
ModifyTime *time.Time
}
  • 获取分表中位置信息表的信息:找到插入的位置endPos
  • 获取任务配置表中的该任务的配置信息:用于填充插入的一条新任务的基础配置信息
  • 创建任务信息表:插入一条新任务的信息
  • 填充p.Resp的回包信息(含唯一的TaskId
  • 增加该条任务完整信息的redis缓存:string类型,设置过期时间24小时

其中createTask需要通过uuid生成唯一索引TaskId,并拼接上表名方便滚表方式下随时查询

uuid使用"github.com/google/uuid"库,通过uuid.New()生成唯一索引

1
2
3
4
5
// GenTaskId 生成对应taskId
func (p *Task) GenTaskId(taskType, pos string) string {
taskType = strings.Replace(taskType, "_", "-", -1)
return fmt.Sprintf("%+v_%s_%s", uuid.New(), taskType, pos)
}

3.2. 注册任务register_task

1)HandleInput

  • 检查hd.Req中的TaskType是否为空

2)HandleProcess

  • 创建对应的任务信息表t_taskType_1
  • 位置信息表插入一条(beginPos, endPos)为(1, 1)的记录
  • 任务配置表插入一条任务配置信息

3.3. 占据任务hold_task

1)HandleInput

  • 检查hd.Req中的TaskType是否为空:根据任务类型拉取一批任务

2)HandleProcess

  • 修正batch的limit值
  • 位置信息表中获取当前任务类型的beginPos
  • 获取一批任务类型为待执行的任务
    • 任务拉取约束为:order_time < NOWstatus,并按照order_time升序排列
  • 将拉取到的一批任务的status更新为TASK_STATUS_PROCESSING并逐个装进hd.Resp
  • 更新数据库
    • 更新当前一批任务的statusTASK_STATUS_PROCESSING
    • 并更新modify_time

3.4. 执行完更新任务set_task

1)HandleInput

  • 检查hd.Req中的TaskId是否为空:更新一条TaskId对应的任务
  • 检查hd.Req中的Priority的数值

2)HandleProcess

  • 更新任务信息表中的TaskId对应的任务的statusTASK_STATUS_FINISHED
    1
    2
    3
    //只对id为p.TaskId 且 该任务状态不为成功和失败的任务进行更新,更新内容为p中的内容
    err := db.Table(tableName).Where("task_id = ?", p.TaskId).
    Where("status <> ? and status <> ?", TASK_STATUS_SUCCESS, TASK_STATUS_FAILED).Updates(p).Error
  • 清除redis缓存中的该条任务信息

3.5. 查询任务get_task

GET方法需要从Request中获取请求参数,这里同样通过c.ShouldBind(&hd.Req)将请求的json数据自动绑定到结构体task

1)HandleInput

  • 检查hd.Req中的TaskId是否为空:查询一条TaskId对应的任务

2)HandleProcess

  • redis缓存中查询是否含该TaskId的任务信息
  • 若有则直接返回,否则从任务信息表查找数据库
  • 若数据库中有该任务信息,则将该任务信息重新写入redis缓存中,方便下一次查询
  • 装包:将查询到的任务信息装进hd.Resp的响应信息中

4. 任务治理模块设计

任务治理模块主要进行分表处理卡死任务处理两个功能

任务管理通过go routine开启三个协程,协程在for死循环中不断执行,每个协程维护一个定时器定时处理一项特定任务:

4.1. 卡死任务处理

开启定时器:定为10s检查一次,当任务过期时,相当于用掉一次超时重试机会,所以重置任务状态时还需要更新超时重试次数

1
2
3
t := time.NewTimer(time.Duration(config.Conf.Task.LongProcessInterval) * time.Second)
// <-t.C:阻塞等待定时器到期
<-t.C

4.2. 分表处理:新增表end_pos

开启定时器:定为30s检查一次是否需要分表,每次统计任务位置表endPos表的记录数,若超过分表阈值,则进行分表操作

1
2
t := time.NewTimer(time.Duration(config.Conf.Task.SplitInterval) * time.Second)
<-t.C

4.3. 分表处理:更新拉取任务的表begin_pos

开启定时器:定为10s检查一次表任务是否已经全部执行完(通过比较begin_pos表中所有状态为成功失败的任务数和end_pos表中的总任务数,如果相等则滚表到下一张表)

1
2
t := time.NewTimer(time.Duration(config.Conf.Task.MoveInterval) * time.Second)
<-t.C

GO学习笔记

1. Goland全局搜索

  • 按两次Shift:全局搜索
  • Ctrl+Shift+F:全局搜索

2. 依赖包含关系

  • 在调用其他包的方法时,要保证该方法是公开的,即首字母大写
  • go的init函数是在包被导入时自动执行的函数,可以用来初始化包
CATALOG
  1. 1. 配置文件
  2. 2. 项目资源
    1. 2.1. mysql数据库资源框架:gorm
      1. 2.1.1.gorm框架的字段映射
    2. 2.3 高性能Web框架:gin
      1. 2.3.1. gin框架的初始化
      2. 2.3.2. gin框架的请求接收
    3. 2.3 redis缓存资源:goredis
      1. 2.3.1. redis缓存
      2. 2.3.2. 分布式锁
    4. 2.4 性能分析:pprof
    5. 2.5. 日志框架:seelog
  3. 3. 路由对应的任务调度方法:task
    1. 3.1. 创建任务create_task
      1. 1)HandleInput
      2. 2)HandleProcess
    2. 3.2. 注册任务register_task
      1. 1)HandleInput
      2. 2)HandleProcess
    3. 3.3. 占据任务hold_task
      1. 1)HandleInput
      2. 2)HandleProcess
    4. 3.4. 执行完更新任务set_task
      1. 1)HandleInput
      2. 2)HandleProcess
    5. 3.5. 查询任务get_task
      1. 1)HandleInput
      2. 2)HandleProcess
  4. 4. 任务治理模块设计
    1. 4.1. 卡死任务处理
    2. 4.2. 分表处理:新增表end_pos
    3. 4.3. 分表处理:更新拉取任务的表begin_pos
  5. GO学习笔记
    1. 1. Goland全局搜索
    2. 2. 依赖包含关系