rails8: 添加 solid queue

🕐

一些 job/queue 会用到这个功能

01 新建 rails8 项目

新建 rails8 项目(已经配置了 ~/.dotfiles/.railsrc, 可以不需要 --skip-test 了)

rails new . --skip-test

02 初始化 Solid Queue(Rails 8 新项目默认已配置)

如果是升级到 Rails 8 的项目,需手动初始化:

# 生成 Solid Queue 相关迁移文件
rails generate solid_queue:install

03 编辑 database.yml

默认数据库是没有的,需要编辑这个文件 ./config/database.yml

# ... 省略
development:
  primary:
    <<: *default
    database: storage/development.sqlite3
  cache:
    <<: *default
    database: storage/development_cache.sqlite3
    migrations_paths: db/cache_migrate
  queue:
    <<: *default
    database: storage/development_queue.sqlite3
    migrations_paths: db/queue_migrate
  cable:
    <<: *default
    database: storage/development_cable.sqlite3
    migrations_paths: db/cable_migrate

# ... 省略

04 生成 migrate

执行迁移(创建任务队列相关表)

rails db:migrate

05 默认会启动 queue 进程

添加配置 Procfile.dev 

web: bin/rails server
jobs: bin/rails solid_queue:start

06 生成一个示例的 job

我这里用的是 定时任务

# 生成一个示例任务
rails generate job ArticlePublisherJob

示例实现

class ArticlePublisherJob < ApplicationJob
  queue_as :default

  # 执行文章发布任务
  def perform(article_id)
    article = Article.find(article_id)
    article.publish!
  rescue ActiveRecord::RecordNotFound
    Rails.logger.error "Article #{article_id} not found"
  end

  # 调度文章发布
  def self.schedule(article)
    if article.scheduled_at && article.scheduled_at > Time.current
      # 定时发布:在未来指定时间执行
      set(wait_until: article.scheduled_at).perform_later(article.id)
    elsif article.scheduled_at
      # 立即发布:时间已过
      article.publish!
    end
  end
end

07 在实际功能中调用 job

添加 job 的使用场景

class Api::ArticlesController < ApplicationController
  skip_before_action :verify_authenticity_token
  before_action :set_article, only: [:show, :update, :destroy]
  # ... 省略

  # POST /api/articles
  def create
    @article = Article.new(article_params)

    if @article.save
      ArticlePublisherJob.schedule(@article) if @article.scheduled_at
      render json: @article, status: :created
    else
      render json: { errors: @article.errors.full_messages }, status: :unprocessable_entity
    end
  end

  # ... 省略
end

08 原理解释

运行原理解释

  1. 当你运行 bin/rails solid_queue:start 时,确实启动了一个独立的 Ruby 进程。
    1. 这个进程内部包含一个调度器 (Dispatcher) 和多个工作者 (Workers)
    2. 它们都运行在一个无限循环 (loop) 中
  2. 核心机制:定时查询 (Polling) 进程不会被动等待推送(像 Redis 的 BLPOP 那样),而是主动出击:
    1. Dispatcher: 每隔配置的时间(例如 polling_interval: 1 秒),扫描数据库中“计划执行时间已到”但尚未分发的任务。
    2. Workers: 从特定的队列中“抢占”任务。
      SQL 逻辑大致类似于:SELECT * FROM solid_queue_jobs WHERE queue_name = 'default' AND status = 'ready' AND run_at <= NOW() LIMIT batch_size FOR UPDATE SKIP LOCKED。
      关键点: FOR UPDATE SKIP LOCKED 非常重要。它确保在多个 Worker 进程同时查询时,同一个任务不会被两个进程重复领取(利用数据库的行锁机制)。
  3. 执行完成会将对应的状态存放到对应的表里 - 执行与状态流转 (State Machine) 任务的生命周期完全通过数据库表中的字段状态来管理。solid_queue 主要涉及两张核心表(简化版):
    1. A. solid_queue_jobs (任务主表) 
    2. 这是你提到的“存放对应状态”的地方。一条记录的状态流转如下:
      1. ready: 任务刚入队 (perform_later),等待被取走。
      2. claimed: Worker 查到了任务,利用 SKIP LOCKED 锁住了它,并将状态改为 claimed,同时记录 claimed_by_id (哪个进程拿走的) 和 claimed_at
      3. 此时如果进程崩溃,其他进程可以通过超时检测发现这个任务“卡住”了,并将其重置回 ready 重试。
      4. running (可选/内部状态): 正在执行 perform 方法。
      5. completedfailed:
      6. 成功:状态更新为完成,记录 finished_at
      7. 失败:捕获异常。如果还有重试次数,更新 run_at 到未来时间并重置状态为 ready;如果耗尽重试次数,状态变为 failed
    3. B. solid_queue_failed_executions (失败记录表)
    4. 如果任务彻底失败(达到最大重试次数),详细的错误信息、堆栈跟踪会被存入这张表,方便后续排查(类似你之前关注的错误提示友好性)。
rails
solid
solid_queue
redis