Golang任务队列并发处理问题解决方案

Golang任务队列并发处理问题解决方案
JiuXia2025Golang任务队列并发处理问题解决方案
问题描述
在任务处理系统中,发现同一个任务会被多个worker并发处理,导致以下问题:
- 同一个任务在不同worker中重复处理
- 资源浪费和冲突,尤其是在Docker镜像构建过程中
- 并发写入和操作文件系统导致的错误
解决方案
实现完整的任务锁定机制,确保同一时间只有一个worker能处理特定的任务:
任务状态扩展:添加了新的”processing”状态,表示任务已被锁定但尚未正式开始处理
// internal/task/types.go const ( TaskStatusPending TaskStatus = "pending" TaskStatusProcessing TaskStatus = "processing" // 正在被锁定处理中,但还未正式运行 TaskStatusRunning TaskStatus = "running" TaskStatusComplete TaskStatus = "complete" TaskStatusFailed TaskStatus = "failed" TaskStatusCancelled TaskStatus = "cancelled" TaskStatusCompleted TaskStatus = "completed" )
任务锁定接口:扩展TaskQueue接口,添加TryLock和Unlock方法
// internal/task/queue.go type TaskQueue interface { // ... TryLock(ctx context.Context, id string) (bool, error) Unlock(ctx context.Context, id string) error // ... }
// internal/task/queue.go func (q *MemoryTaskQueue) TryLock(ctx context.Context, id string) (bool, error) { q.mu.Lock() defer q.mu.Unlock() task, exists := q.tasks[id] if !exists { return false, ErrTaskNotFound } if task.GetStatus() != string(TaskStatusPending) { return false, nil } task.SetStatus(string(TaskStatusProcessing)) q.tasks[id] = task return true, q.saveTask(task) } func (q *MemoryTaskQueue) Unlock(ctx context.Context, id string) error { q.mu.Lock() defer q.mu.Unlock() task, exists := q.tasks[id] if !exists { return ErrTaskNotFound } if task.GetStatus() != string(TaskStatusProcessing) { return nil } task.SetStatus(string(TaskStatusPending)) q.tasks[id] = task return q.saveTask(task) }
任务查重机制:在创建任务前检查是否有相同任务正在处理
// internal/task/queue.go func (q *MemoryTaskQueue) FindPending(ctx context.Context, name, tag string) ([]Task, error) { q.mu.RLock() defer q.mu.RUnlock() tasks := make([]Task, 0) for _, task := range q.tasks { status := task.GetStatus() if status == string(TaskStatusPending) || status == string(TaskStatusProcessing) || status == string(TaskStatusRunning) { if task.GetType() == string(TaskTypeBuildImage) { if strings.Contains(task.GetName(), name) { taskTag := task.GetTag() if taskTag == tag || taskTag == "" { tasks = append(tasks, task) } } } } } return tasks, nil }
// internal/task/processor.go // 创建任务前查重 existingTasks, err := p.queue.FindPending(ctx, req.Name, req.Tag) if err == nil && len(existingTasks) > 0 { // 返回已存在任务 for _, t := range existingTasks { if buildTask, ok := t.(*BuildImageTaskV3); ok { return buildTask, nil } } }
Worker流程优化:修改worker处理逻辑,添加锁定和解锁步骤
// internal/task/processor.go func (p *TaskProcessor) worker(ctx context.Context, id int) { for { // ... task, err := p.queue.Dequeue(ctx) if task == nil { continue } locked, err := p.queue.TryLock(ctx, task.GetID()) if !locked { continue } taskID := task.GetID() defer func() { unlockErr := p.queue.Unlock(ctx, taskID) // 错误处理 }() p.processTask(ctx, task) } }
具体实现
Task接口扩展:
- 添加SetStatus方法用于更新任务状态
- 添加GetTag方法用于获取任务特定标签,便于任务查重
// internal/task/types.go type Task interface { // ... SetStatus(status string) GetTag() string }
TaskQueue接口扩展:
- TryLock:尝试锁定任务,成功返回true
- Unlock:解锁任务
- FindPending:查找具有相同名称和标签的未完成任务
见上方接口定义和实现。
Worker处理流程:
- 出队任务后先尝试锁定
- 锁定成功才处理,失败则跳过
- 使用defer确保总是解锁任务
- 处理任务状态流转:pending -> processing -> running -> completed/failed
// internal/task/processor.go func (p *TaskProcessor) processTask(ctx context.Context, task Task) error { // ... task.SetStatus(string(TaskStatusRunning)) if err := p.queue.Update(ctx, task); err != nil { ... } // 任务处理逻辑 // 失败时SetError,成功时SetResult }
任务创建机制优化:
- 创建任务前先查找是否有同名同标签的任务在处理
- 有则返回已存在任务,避免创建重复任务
- 提升任务使用效率
见上方查重代码。
测试验证
该方案解决了任务被多次并发处理的问题,显著提升了系统稳定性和资源使用效率。测试表明:
- 不再出现同一任务被多个worker处理的情况
- 任务队列处理效率提高
- Docker镜像构建过程更加稳定可靠
评论
匿名评论隐私政策