1. 配置文件
使用TOML
作为配置管理
导入配置文件
通过DecodeFile
读取某路径下的配置文件并解析到结构体实例Conf
中
定义结构体解析配置文件
对于
1 2 3 4 5 6
| type TomlConfig struct { FlowsvrAddr string `toml:"flowsvr_addr"` RedisLockAddr string `toml:"redis_lock_addr"` RedisLockPassword string `toml:"redis_lock_password"` }
|
2. redission分布式锁:go-redis框架
本项目中使用go-redis
库来实现分布式锁,通过SetNX
命令结合看门狗机制手动实现一个分布式锁的,SetNX
命令是一个原子性的操作,只有在key不存在时才会设置key的值。
参考
- Github go-redis
- Go Redis 快速入门
3. Task处理
Task处理完的返包用的是http.Client
,通过http.NewRequest
实现对服务端接口的调用
1
| req, err := http.NewRequest(method, reqUrl, reader)
|
3.1 注册Task
将Task注册到worker的map
数据结构中,以taskType
为键,task
为值。
1 2 3 4 5 6 7 8 9
| type TaskHandler struct { TaskType string NewProc func() TaskIntf }
func RegisterHandler(handler *TaskHandler) { taskHandlerMap[handler.TaskType] = handler }
|
3.2 Task任务需要实现的公用接口
task需要实现几个公用接口,封装在上述TaskHandler
结构体中。主要实现:
ContextLoad
:加载上下文
HandleProcess
:处理、执行任务
SetTask
:当前阶段任务完成,调用flowSvr接口更新当前任务状态
HandleFinish
:当前阶段任务完成执行的操作(更新任务状态)
HandleFinishError
:任务失败后执行后续的任务失败处理操作
Base
:反序列化任务信息到TaskBase
结构体
CreateTask
:创建任务(创建
HandleFailedMust
:处理失败的任务,任务状态重置为失败,结束任务
1 2 3 4 5 6 7 8 9 10 11
| type TaskIntf interface { ContextLoad() error HandleProcess() error SetTask() error HandleFinish() HandleFinishError() error Base() *TaskBase CreateTask() (string, error) HandleFailedMust() error }
|
3.3 Task序列化反序列化处理:json
使用go中自带的"encoding/json"
库来实现Task的序列化和反序列化。
3.3.1 Task序列化
序列化是将结构体转换为json字符串的过程,使用json.Marshal
函数实现。
1
| b, err := json.Marshal(body)
|
3.3.2 Task反序列化
反序列化是将json字符串转换为结构体的过程,使用json.Unmarshal
函数实现。当向接口发送请求获得response
后,需要将response
的body
部分反序列化为结构体。
1
| err = json.Unmarshal(respStr, respData)
|
参考
- golang解析json数据(Encoding/Json)
- golang json解析
4. 任务调度
4.1 任务调度:load任务配置
在worker启动时,先通过get_task_schedule_cfg_list
接口获取任务调度配置,然后根据配置的调度时间,定时执行任务。
开启一个goroutine,每隔20s
请求更新一次任务调度配置。
1 2 3 4 5 6 7 8 9 10 11 12
| once.Do(func() { if err := LoadCfg(); err != nil { msg := "load task cfg schedule err" + err.Error() martlog.Errorf(msg) fmt.Println(msg) os.Exit(1) } go func() { CycleReloadCfg() }() })
|
4.2 任务调度:定时拉取任务
在worker的主线程中阻塞,执行任务调度。根据任务配置表中的cfg.ScheduleInterval
时间开启定时器定时拉取一批任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| cfg, ok := scheduleCfgDic[p.TaskType] if !ok { martlog.Errorf("scheduleCfgDic %s, not have taskType %s", tools.GetFmtStr(scheduleCfgDic), p.TaskType) return } intervalTime := time.Second * time.Duration(cfg.ScheduleInterval) if cfg.ScheduleInterval == 0 { intervalTime = time.Second * DEFAULT_TIME_INTERVAL }
step := RandNum(501)
intervalTime += time.Duration(step) * time.Millisecond t := time.NewTimer(intervalTime) <-t.C
|
每到定时时间拉取一批任务时开启一个新的goroutine,并在新开启的goroutine中,假设拿到一批数量为n
的任务,遍历这n
个任务,每个任务开启一个goroutine执行。
4.3 任务调度:分布式锁
其中,在每次拉取一批新任务时先通过阻塞模式redis抢锁,通过 Redis 的LUA原子操作实现跨进程/跨机器的互斥访问。5s内抢不到锁则返回等待下一次拉取任务。持有锁的过期时间为3s。
其中redis的分布式锁连接地址在配置文件中配置。
1 2 3 4 5 6
| mutex := redislock.NewRedisLock(p.TaskType, lockClient, redislock.WithBlock(), redislock.WithWatchDogMode(), redislock.WithExpireSeconds(3)) if err := mutex.Lock(context.Background()); err != nil { martlog.Errorf("RedisLock lock err %s", err.Error()) return }
|
在redislock.NewRedisLock
处理完创建锁的操作后,会在redislock.Lock
中调用redislock.WatchDog
函数通过延时函数在抢占到分布式锁后开启看门狗机制,启动一个后台 goroutine 来定期续期锁的有效期。
redis加锁是通过SETNX
命令结合看门狗机制手动实现一个分布式锁的,SETNX
命令是一个原子性的操作,只有在key不存在时才会设置key的值,如果key已经存在,则不做任何操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
| func (r *RedisLock) Lock(ctx context.Context) (err error) { defer func() { if err != nil { return } r.watchDog(ctx) }()
err = r.tryLock(ctx) if err == nil { return nil }
if !r.isBlock { return err }
if !IsRetryableErr(err) { return err }
err = r.blockingLock(ctx) return }
func (r *RedisLock) tryLock(ctx context.Context) error { result, err := r.client.pool.SetNX(ctx, r.key, r.token, time.Duration(r.expireTimeSecond)*time.Second).Result() if err != nil { return err }
if !result { return ErrLockAcquiredByOthers }
return nil }
func (r *RedisLock) watchDog(ctx context.Context) { if !r.watchDogMode { return }
for !atomic.CompareAndSwapInt32(&r.runningDog, 0, 1) { }
ctx, r.stopDog = context.WithCancel(ctx) go func() { defer func() { atomic.StoreInt32(&r.runningDog, 0) }() r.runWatchDog(ctx) }() }
func (r *RedisLock) runWatchDog(ctx context.Context) { ticker := time.NewTicker(r.watchDogWorkStepTime) defer ticker.Stop()
for range ticker.C { select { case <-ctx.Done(): return default: }
_ = r.DelayExpire(ctx, r.expireTimeSecond) } }
func (r *RedisLock) DelayExpire(ctx context.Context, expireSeconds int64) error { result, err := r.client.pool.Eval(ctx, LuaCheckAndExpireDistributionLock, []string{r.key}, []interface{}{r.token, expireSeconds}).Bool() if err != nil { return nil }
if !result { return ErrDelayExpire }
return nil }
|
4.3 任务调度:任务执行
在每个任务的goroutine中,执行任务的过程中,需要先加载上下文ContextLoad
,然后执行任务HandleProcess
,任务执行完后,根据任务执行结果调用SetTask
接口更新任务状态。
当任务执行完HandleProcess
后需要重置任务阶段和任务状态,并更新task结构体中的schedule_log
(这里包括记录track-uuid时间戳、ErrMsg和cost任务执行时间)
最后再更新当前任务的排序时间order_time
:
- 任务执行成功:
order_time = modify_time - priority
- 任务执行失败:
order_time = modify_time + retry_interval
- 说明:根据重试时间延迟任务被调度时间
- 这里不能加上priority,否则无法保证重试时间间隔retry_interval内不会被调度