Golang任务队列并发处理问题解决方案

Golang任务队列并发处理问题解决方案

问题描述

在任务处理系统中,发现同一个任务会被多个worker并发处理,导致以下问题:

  1. 同一个任务在不同worker中重复处理
  2. 资源浪费和冲突,尤其是在Docker镜像构建过程中
  3. 并发写入和操作文件系统导致的错误

解决方案

实现完整的任务锁定机制,确保同一时间只有一个worker能处理特定的任务:

  1. 任务状态扩展:添加了新的”processing”状态,表示任务已被锁定但尚未正式开始处理

    // internal/task/types.go
    const (
        TaskStatusPending    TaskStatus = "pending"
        TaskStatusProcessing TaskStatus = "processing" // 正在被锁定处理中,但还未正式运行
        TaskStatusRunning    TaskStatus = "running"
        TaskStatusComplete   TaskStatus = "complete"
        TaskStatusFailed     TaskStatus = "failed"
        TaskStatusCancelled  TaskStatus = "cancelled"
        TaskStatusCompleted  TaskStatus = "completed"
    )
  2. 任务锁定接口:扩展TaskQueue接口,添加TryLock和Unlock方法

    // internal/task/queue.go
    type TaskQueue interface {
        // ...
        TryLock(ctx context.Context, id string) (bool, error)
        Unlock(ctx context.Context, id string) error
        // ...
    }
    // internal/task/queue.go
    func (q *MemoryTaskQueue) TryLock(ctx context.Context, id string) (bool, error) {
        q.mu.Lock()
        defer q.mu.Unlock()
        task, exists := q.tasks[id]
        if !exists {
            return false, ErrTaskNotFound
        }
        if task.GetStatus() != string(TaskStatusPending) {
            return false, nil
        }
        task.SetStatus(string(TaskStatusProcessing))
        q.tasks[id] = task
        return true, q.saveTask(task)
    }
    
    func (q *MemoryTaskQueue) Unlock(ctx context.Context, id string) error {
        q.mu.Lock()
        defer q.mu.Unlock()
        task, exists := q.tasks[id]
        if !exists {
            return ErrTaskNotFound
        }
        if task.GetStatus() != string(TaskStatusProcessing) {
            return nil
        }
        task.SetStatus(string(TaskStatusPending))
        q.tasks[id] = task
        return q.saveTask(task)
    }
  3. 任务查重机制:在创建任务前检查是否有相同任务正在处理

    // internal/task/queue.go
    func (q *MemoryTaskQueue) FindPending(ctx context.Context, name, tag string) ([]Task, error) {
        q.mu.RLock()
        defer q.mu.RUnlock()
        tasks := make([]Task, 0)
        for _, task := range q.tasks {
            status := task.GetStatus()
            if status == string(TaskStatusPending) || 
               status == string(TaskStatusProcessing) || 
               status == string(TaskStatusRunning) {
                if task.GetType() == string(TaskTypeBuildImage) {
                    if strings.Contains(task.GetName(), name) {
                        taskTag := task.GetTag()
                        if taskTag == tag || taskTag == "" {
                            tasks = append(tasks, task)
                        }
                    }
                }
            }
        }
        return tasks, nil
    }
    // internal/task/processor.go
    // 创建任务前查重
    existingTasks, err := p.queue.FindPending(ctx, req.Name, req.Tag)
    if err == nil && len(existingTasks) > 0 {
        // 返回已存在任务
        for _, t := range existingTasks {
            if buildTask, ok := t.(*BuildImageTaskV3); ok {
                return buildTask, nil
            }
        }
    }
  4. Worker流程优化:修改worker处理逻辑,添加锁定和解锁步骤

    // internal/task/processor.go
    func (p *TaskProcessor) worker(ctx context.Context, id int) {
        for {
            // ...
            task, err := p.queue.Dequeue(ctx)
            if task == nil { continue }
            locked, err := p.queue.TryLock(ctx, task.GetID())
            if !locked { continue }
            taskID := task.GetID()
            defer func() {
                unlockErr := p.queue.Unlock(ctx, taskID)
                // 错误处理
            }()
            p.processTask(ctx, task)
        }
    }

具体实现

  1. Task接口扩展

    • 添加SetStatus方法用于更新任务状态
    • 添加GetTag方法用于获取任务特定标签,便于任务查重
    // internal/task/types.go
    type Task interface {
        // ...
        SetStatus(status string)
        GetTag() string
    }
  2. TaskQueue接口扩展

    • TryLock:尝试锁定任务,成功返回true
    • Unlock:解锁任务
    • FindPending:查找具有相同名称和标签的未完成任务

    见上方接口定义和实现。

  3. Worker处理流程

    • 出队任务后先尝试锁定
    • 锁定成功才处理,失败则跳过
    • 使用defer确保总是解锁任务
    • 处理任务状态流转:pending -> processing -> running -> completed/failed
    // internal/task/processor.go
    func (p *TaskProcessor) processTask(ctx context.Context, task Task) error {
        // ...
        task.SetStatus(string(TaskStatusRunning))
        if err := p.queue.Update(ctx, task); err != nil { ... }
        // 任务处理逻辑
        // 失败时SetError,成功时SetResult
    }
  4. 任务创建机制优化

    • 创建任务前先查找是否有同名同标签的任务在处理
    • 有则返回已存在任务,避免创建重复任务
    • 提升任务使用效率

    见上方查重代码。

测试验证

该方案解决了任务被多次并发处理的问题,显著提升了系统稳定性和资源使用效率。测试表明:

  • 不再出现同一任务被多个worker处理的情况
  • 任务队列处理效率提高
  • Docker镜像构建过程更加稳定可靠

image