rails8: 添加 solid queue
🕐
一些 job/queue 会用到这个功能
01 新建 rails8 项目
新建 rails8 项目(已经配置了 ~/.dotfiles/.railsrc, 可以不需要 --skip-test 了)
rails new . --skip-test02 初始化 Solid Queue(Rails 8 新项目默认已配置)
如果是升级到 Rails 8 的项目,需手动初始化:
# 生成 Solid Queue 相关迁移文件
rails generate solid_queue:install03 编辑 database.yml
默认数据库是没有的,需要编辑这个文件 ./config/database.yml
# ... 省略
development:
primary:
<<: *default
database: storage/development.sqlite3
cache:
<<: *default
database: storage/development_cache.sqlite3
migrations_paths: db/cache_migrate
queue:
<<: *default
database: storage/development_queue.sqlite3
migrations_paths: db/queue_migrate
cable:
<<: *default
database: storage/development_cable.sqlite3
migrations_paths: db/cable_migrate
# ... 省略04 生成 migrate
执行迁移(创建任务队列相关表)

rails db:migrate05 默认会启动 queue 进程
添加配置 Procfile.dev
web: bin/rails server
jobs: bin/rails solid_queue:start06 生成一个示例的 job
我这里用的是 定时任务
# 生成一个示例任务
rails generate job ArticlePublisherJob示例实现
class ArticlePublisherJob < ApplicationJob
queue_as :default
# 执行文章发布任务
def perform(article_id)
article = Article.find(article_id)
article.publish!
rescue ActiveRecord::RecordNotFound
Rails.logger.error "Article #{article_id} not found"
end
# 调度文章发布
def self.schedule(article)
if article.scheduled_at && article.scheduled_at > Time.current
# 定时发布:在未来指定时间执行
set(wait_until: article.scheduled_at).perform_later(article.id)
elsif article.scheduled_at
# 立即发布:时间已过
article.publish!
end
end
end07 在实际功能中调用 job
添加 job 的使用场景
class Api::ArticlesController < ApplicationController
skip_before_action :verify_authenticity_token
before_action :set_article, only: [:show, :update, :destroy]
# ... 省略
# POST /api/articles
def create
@article = Article.new(article_params)
if @article.save
ArticlePublisherJob.schedule(@article) if @article.scheduled_at
render json: @article, status: :created
else
render json: { errors: @article.errors.full_messages }, status: :unprocessable_entity
end
end
# ... 省略
end08 原理解释
运行原理解释
- 当你运行
bin/rails solid_queue:start时,确实启动了一个独立的 Ruby 进程。- 这个进程内部包含一个调度器 (Dispatcher) 和多个工作者 (Workers)。
- 它们都运行在一个无限循环 (
loop) 中
- 核心机制:定时查询 (Polling) 进程不会被动等待推送(像 Redis 的 BLPOP 那样),而是主动出击:
- Dispatcher: 每隔配置的时间(例如 polling_interval: 1 秒),扫描数据库中“计划执行时间已到”但尚未分发的任务。
- Workers: 从特定的队列中“抢占”任务。
SQL 逻辑大致类似于:SELECT * FROM solid_queue_jobs WHERE queue_name = 'default' AND status = 'ready' AND run_at <= NOW() LIMIT batch_size FOR UPDATE SKIP LOCKED。
关键点: FOR UPDATE SKIP LOCKED 非常重要。它确保在多个 Worker 进程同时查询时,同一个任务不会被两个进程重复领取(利用数据库的行锁机制)。
- 执行完成会将对应的状态存放到对应的表里 - 执行与状态流转 (State Machine) 任务的生命周期完全通过数据库表中的字段状态来管理。
solid_queue主要涉及两张核心表(简化版):- A.
solid_queue_jobs(任务主表) - 这是你提到的“存放对应状态”的地方。一条记录的状态流转如下:
ready: 任务刚入队 (perform_later),等待被取走。claimed: Worker 查到了任务,利用SKIP LOCKED锁住了它,并将状态改为claimed,同时记录claimed_by_id(哪个进程拿走的) 和claimed_at。- 此时如果进程崩溃,其他进程可以通过超时检测发现这个任务“卡住”了,并将其重置回
ready重试。 running(可选/内部状态): 正在执行perform方法。completed或failed:- 成功:状态更新为完成,记录
finished_at。 - 失败:捕获异常。如果还有重试次数,更新
run_at到未来时间并重置状态为ready;如果耗尽重试次数,状态变为failed。
- B.
solid_queue_failed_executions(失败记录表) - 如果任务彻底失败(达到最大重试次数),详细的错误信息、堆栈跟踪会被存入这张表,方便后续排查(类似你之前关注的错误提示友好性)。
- A.
rails
solid
solid_queue
redis