rails技术栈(1) - sidekiq(1)

2018/12/25

Sidekiq

该文档涵盖了 Sidekiq gem的使用方式和源码解析.

阅读完该文档后,您将会了解到:

  • Sidekiq的基本使用.
  • 项目中Sidekiq和God之间的关系.
  • Sidekiq和AsyncTask.

Sidekiq简介

Simple, efficient background processing for Ruby.

Sidekiq安装

gem install sidekiq

依赖

sudo apt-get install redis-server

Sidekiq in Rails

实现方式

  • 生产者, 推入消息队列
  • 消费者, 从消息队列中取数据

sidekiq

实现细节

结构目录

-- app
  -- workers
    - a_worker.rb

实现#perfrom方法

# in init_deals_worker.rb
class AWorker
  include Sidekiq::Worker
  sidekiq_options queue: :a_worker, retry: 2, backtrace: true

  def perform(id)
    sleep(20)
    puts "-----dsgv587----- #{id}"
  end
end
sidekiq_options Explanation
queue use a named queue for this Worker, default ‘default’
retry enable the RetryJobs middleware for this Worker, default true
backtrace whether to save any error backtrace in the retry payload to display in web UI, can be true, false or an integer number, default false

塞入队列

AWorker.perform_async(10)
AWorker.perform_async(11)

sidekiq 配置文件

默认的配置文件位于config/sidekiq_default.yml

---
:logfile: ./log/sidekiq_default.log
:concurrency: 5
:development:
  :verbose: true
production:
  :concurrency: 10
:queues:
  - [aaa_worker, 6]
  - [bbb_sync, 4]
  - [default, 2]

NOTE:

  • logfile 日志文件
  • concurrency 并发数目, 注意: 该并发数目不能大于Rails的database.yml中配置的pool数目
  • development 开发环境下的配置参数
    • verbose: true 表示记录所有的log, warn部分的log也会记录进去, 请查看Rails的log级别
  • production 生产环境下的配置参数
  • queues [队列名称, 权重值]

启动sidekiq进程

bundle exec sidekiq -C config/sidekiq_default.yml -e development

Sidekiq和god

god

God is an easy to configure, easy to extend monitoring framework written in Ruby.

Usage

  • god用于监控进程, 当进程挂掉后, 可以对该进程进行重启
# in god/tao800_fire.god

# encoding: utf-8
# run with: RAILS_ENV=development bundle exec god -c god/tao800_fire.god -D
RAILS_ENV   = ENV['RAILS_ENV']  || 'production' unless defined?(RAILS_ENV)
RAILS_ROOT  = ENV['RAILS_ROOT'] || File.expand_path('../../', __FILE__) unless defined?(RAILS_ROOT)

all_queues = %w(
  cpc_deduct1
  cpc_deduct2
  cpc_deduct3
)

all_queues.each_with_index do |queue_name, idx|
  God.watch do |w|
    pid_file  = "/tmp/sidekiq-#{idx}.pid"
    start_cmd = "bundle exec sidekiq -e #{RAILS_ENV} -C config/sidekiq_#{queue_name}.yml -i #{idx} -d -P #{pid_file}" # 启动命令
    stop_cmd  = "bundle exec sidekiqctl stop #{pid_file} 10" # 停止的命令

    w.dir      = RAILS_ROOT
    w.group    = 'sidekiq' # 在god中注册的组
    w.name     = "sidekiq-#{queue_name}" # 在god中注册的名字, 需要唯一
    w.interval = 1.minute
    w.start    = start_cmd
    w.pid_file = pid_file
    w.stop     = stop_cmd
    w.behavior(:clean_pid_file)
  end

  ...
end

比较重要的几个参数

  • name 在god中注册的名字, 需要唯一
  • start_cmd 启动任务的命令
  • stop_cmd 关闭任务的命令
  • pid_file process id对应的文件

god和sidekiq的结合

all_queues = %w(
  cpc_deduct1
  cpc_deduct2
  cpc_deduct3
)

all_queues.each_with_index do |queue_name, idx|
  start_cmd = "bundle exec sidekiq -e #{RAILS_ENV} -C config/sidekiq_#{queue_name}.yml -i #{idx} -d -P #{pid_file}"
  stop_cmd  = "bundle exec sidekiqctl stop #{pid_file} 10"
  ...
end

NOTE: 这里有一个约定 在config/ 目录下有以 sidekiq_ 为前缀 的文件. 如

 config/
    sidekiq_deduct1.yml
    sidekiq_deduct2.yml
    sidekiq_sync.yml

而在god中的queue_names写的是 sidekiq_ 后面的名字, 如

queue_names = %w(
  deduct1
  deduct2
  sync
)

为什么要用god监控sidekiq

当sidekiq挂了的时候, god会根据 start_cmd, 启动sidekiq的命令

INFO: 现在kafaka也是用god进行监控

Sidekiq和AsyncTask

sidekiq的缺点

  1. 队列不能持久化
  2. 任务失败后, 无法获知任务的错误信息

TaskEvent

# task has many events
task = AsyncTask.new(
  params_array: ['1', '2', '3', '4'],
  worker: 'Siedekiq::AWorker'
)

task.run
 def run
   if save
     record_task_log(:sucess)

     params_array.each do |params|
       job_id = sidekiq_worker.perform_in(5, params)
       event  = events.build(
         job_id:        job_id,
         status:        :enqueue,
         params:        params.is_a?(Hash) ? params.to_json : params.to_s,
         added_message: '进入队列'
       )

       if event.save
         record_event_log(:success, event: event)
       else
         record_event_log(:fail, event: event)
       end
     end
   else
     record_task_log(:fail)
   end
 end

使用task_event, 需要在 sidekiq_options 添加 use_task_event: true

class AWorker

sidekiq_options \
  queue: :a_worker,
  backtrace: true,
  retry: 2,
  use_task_event: true
end


Show Disqus Comments