rails技术栈(1) - sidekiq(2)

2019/01/11

Sidekiq

该文档涵盖了 Sidekiq 源码解析, 包括异步任务入队、sidekiq启动以及异步任务执行.

阅读完该文档后,您将会了解到:

  • Sidekiq的基本使用.
  • 异步任务入队
  • sidekiq启动过程
  • 异步任务执行流程

这篇博客主要是借鉴大V以及sidekiq源码所作,错误之处还望指正。


全文总结

全文其实主要是三个部分:

  1. 将任务数据存入redis队列(用的有序集合)
  2. 从任务队列中取任务数据
  3. 执行任务,并发多线程 调用 自定义的 HardWorker 函数体。

概述

Sidekiq 是 Ruby 和 Rails 项目中常用的后台任务处理系统, 其本身提供的 API 十分简洁,源代码也非常易于阅读,是一个轻量级的异步处理组件;

Sidekiq简介

在具体分析介绍 Sidekiq 的实现原理之前,我们需要对整个组件的使用过程进行概述,保证我们对 Sidekiq 的结构有一个总体上的了解。

class HardWorker
  include Sidekiq::Worker
  def perform(name, count)
    # do something
  end
end

HardWorker.perform_async('bob', 5)

在这里,我们直接照搬 Sidekiq Wiki 中 Getting Started 部分的代码简单展示下它是如何使用的,当我们执行HardWorker.perform_async 方法时,Sidekiq 的 Worker 会将一个异步任务以 JSON 的形式将相关的信息加入 Redis 中并等待消费者对任务的拉取和处理。

Sidekiq 的消费者有三个部分组成,分别是 ManagerProcessorPoller;他们三者会相互协作共同完成对 Redis 中任务消费的过程。

需要注意的是,Sidekiq 中的 Sidekiq::Worker 并不是真正用于处理任务的 Worker,负责执行执行任务的类型其实是 Sidekiq::Processor;在文章中,当我们提到 Sidekiq Worker 时,其实说的是 Sidekiq::Processor,当我们使用了形如 Sidekiq::Worker 或者 Worker 的形式时,我们说的就是对应的类。

异步任务的入队

流程整理:

重点:从第(1)步到(4), 这个过程数据都没有存储到redis, 都是对即将要存入redis中的数据做处理。 从 Worker.perform_async 方法到 Client#push 方法整个过程都在对即将加入到 Redis 中队列的哈希进行操作,而不是已经存储了。

# item ('queue' => 'my_queue', 'class' => MyWorker, 'args' => ['foo', 1, :bat => 'bar'])

-- HardWorker.perform_async          #自定义woker开始执行
  -- Worker.perform_async         #将相关信息以JSON的形式传入到redis(1)
    -- Woker#client_push          # 获取了上下文中的 Redis 池并将传入的item对象传入Redis 中(2)
      -- Client#push              # 对即将加入到 Redis 中队列的哈希进行操作, 添加 at 字段到字符串化; 调用normalize_item 方法中添加 jid 和 created_at 字段。(3)
        -- Client#normalize_item  # 添加jid 和 created_at 字段后,返回push方法 (4)
          -- Client#raw_push      # 所有添加异步任务(后续有perform_in和perform_at, 本质上与Worker.perform_async一样)的方法最终都调用了私有方法 Client#raw_push 以及 Client#atomic_push 向 Redis 中添加数据
          -- Client#atomic_push   # 使用的是redis的有序集合 (面试点) (5)

(1) Worker.perform_async

当我们对需要异步执行的任务调用类似 Worker.perform_async 的方法时,Sidekiq 其实并不会真正去创建一个 HardWorkerWorker 的对象,它实际上会调用 Worker.client_push 方法并将当前的 class(HardWorker) 和 args(传入参数,name和count) 参数传进去,也就是需要异步执行的类和参数:

def perform_async(*args)
  client_push('class'.freeze => self, 'args'.freeze => args)
end

(2) Woker#client_push

Worker.perform_async执行了 Worker.client_push 方法并传入了一个哈希,内容如图;在方法的实现中,它获取了上下文中的 Redis 池并将传入的 item 对象传入 Redis 中:

# item ('queue' => 'my_queue', 'class' => MyWorker, 'args' => ['foo', 1, :bat => 'bar'])

def client_push(item)
  pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options['pool'.freeze] || Sidekiq.redis_pool
  item.keys.each do |key|
    item[key.to_s] = item.delete(key)
  end
  Sidekiq::Client.new(pool).push(item)
end

(3/4) Client#push 和 Client#normalize_item

Worker.perform_async 方法到 Client#push 方法整个过程都在对即将加入到 Redis 中队列的哈希进行操作.

normalize_item 添加 jid 和 created_at 字段。

def push(item)
  normed = normalize_item(item)
  payload = process_single(item['class'.freeze], normed)

  if payload
    raw_push([payload])
    payload['jid'.freeze]
  end
end

注意:从 Worker.perform_async 方法到 Client#push 方法整个过程都在对即将加入到 Redis 中队列的哈希进行操作,就是以上的过程都是在数据操作,而数据还未存储到redis,第(5)步才是存储执行。

(5) Client#raw_push 和 Client#atomic_push

私有方法 Client#raw_push 以及 Client#atomic_push 向 Redis 中添加数据,两种不同的情况,

    1. 当异步任务需要在未来的某一时间点进行安排时,它会加入 Redis 的一个有序集合: (if代码块)

在这个有序集合中,Sidekiq 理所应当地将 schedule 作为权重,而其他的全部字段都以 JSON 的格式作为负载传入;

def atomc_push(conn, payloads)
  if payloads.first['at'.freeze]
    conn.zadd('schedule'.freeze, payloads.map do |hash|
                at = hash.delete('at'.freeze).to_s
                [at, Sidekiq.dump_json(hash)]
              end)
  else
    # ...
  end
end
    1. 立即执行的异步任务时(else代码块)

除了设置当前任务的入队时间 enqueued_at 之外,Sidekiq 将队列加入到一个大队列 queues 的集合中,并且将负载直接推到 “queue:#{q}” 数组中等待消费者的拉取

def atomc_push(conn, payloads)
  if payloads.first['at'.freeze]
  # ...
  else
    q = payloads.first['queue'.freeze]
    now = Time.now.to_f
    to_push = payloads.map do |entry|
      entry['enqueued_at'.freeze] = now
      Sidekiq.dump_json(entry)
    end
    conn.sadd('queues'.freeze, q)
    conn.lpush("queue:#{q}", to_push)
  end
end

以上过程整理如下图:

定时任务

除了 Worker.perform_async 之外,Worker 还提供了另外一对用于在一段时间之后或者某个时间点执行相应任务的方法 Worker.perform_atWorker.perform_in

def perform_in(interval, *args)
  int = interval.to_f
  now = Time.now.to_f
  ts = (int < 1_000_000_000 ? now + int : int)
  item = { 'class'.freeze => self, 'args'.freeze => args, 'at'.freeze => ts }
  item.delete('at'.freeze) if ts <= now
  client_push(item)
end
alias_method :perform_at, :perform_in

为了使用同一个接口支持两种不同的安排方式(时间点和多久之后),方法内部对传入的 internal 进行了判断,当 interval.to_f < 1_000_000_000 时就会在一段时间之后执行任务,否则就会以时间点的方式执行任务,虽然 Worker.perform_at 和 Worker.perform_in 是完全相同的方法,不过我们在使用时还是尽量遵循方法的语义选择两者中更符合逻辑的方法。

1_000_000_000 如果是时间点,时间戳的值应该都是大于这个值

数据在redis中的存储结构

无论是立即执行还是需要安排的异步任务都会进入 Redis 的队列中,但是它们之间还是有一些区别的,

  1. Worker.perform_in/at 会将任务以 [at, args] 的形式加入到 schedules 有序集中,
  2. Worker.perform_async 将负载加入到指定的队列,并向整个 Sidekiq 的队列集合 queues 中添加该队列。

所有的 payload 中都包含了一个异步任务需要执行的全部信息,包括该任务的执行的队列 queue、异步队列的类 class、参数 args 以及 sidekiq_options 中的全部参数。

除了上述参数,一个异步任务还包含诸如 created_at、enqueued_at 等信息,也有一个通过 SecureRandom.hex(12) 生成的任务唯一标识符 jid。

Sidekiq 的启动过程

对于 Sidekiq 印象最深刻的就是它在命令行启动的时候输出的一个字符画,我们能在 cli.rb 的 Cli.banner 方法中找到这个字符画:

    m,
         `$b
    .ss,  $$:         .,d$
    `$$P,d$P'    .,md$P"'
     ,$$$$$bmmd$$$P^'
   .d$$$$$$$$$$P'
   $$^' `"^$$$'       ____  _     _      _    _
   $:     ,$$:       / ___|(_) __| | ___| | _(_) __ _
   `b     :$$        \___ \| |/ _` |/ _ \ |/ / |/ _` |
          $$:         ___) | | (_| |  __/   <| | (_| |
          $$         |____/|_|\__,_|\___|_|\_\_|\__, |
        .d$$                                       |_|

这一节也将介绍 Sidekiq 的启动过程,在 bin 文件夹中的 sidekiq 文件包含的内容就是在命令行执行 sidekiq 时执行的代码:

begin
  cli = Sidekiq::CLI.instance
  cli.parse
  cli.run
rescue => e
  # ...
end

这里的代码就是创建了一个 CLI 对象,执行 CLI#parse 方法对参数进行解析,最后调用 CLI#run 方法:

def run
  print_banner

  self_read, self_write = IO.pipe
  # ...

  launcher = Sidekiq::Launcher.new(options)
  begin
    launcher.run
    while readable_io = IO.select([self_read])
      signal = readable_io.first[0].gets.strip
      handle_signal(signal)
    end
  rescue Interrupt
    launcher.stop
  end
end

CLI#run 在执行最开始就会打印 banner,也就是我们在每次启动 Sidekiq 时看到的字符画,而在之后会执行 Launcher#run 运行用于处理异步任务的 Processor 等对象。

每一个 Launcher 都会启动一个 Manager 对象和一个 Poller,其中 Manager 同时管理了多个 Processor 对象,这些不同的类之间有着如上图所示的关系。

def run
  @thread = safe_thread("heartbeat", &method(:start_heartbeat))
  @poller.start
  @manager.start
end

Manager 会在初始化时根据传入的 concurrency 的值创建对应数量的 Processor,默认的并行数量为 25;当执行 Manager#start 时,就会启动对应数量的线程和处理器开始对任务进行处理:

class Manager
  def start
    @workers.each do |x|
      x.start
    end
  end
end

class Processor
  def start
    @thread ||= safe_thread("processor", &method(:run))
  end
end

从 Launcher 的启动到现在只是一个调用 initialize 和 start 方法的过程,再加上 Sidekiq 源代码非常简单,所以阅读起没有丝毫的难度,也就不做太多的解释了。

并行模型

当处理器开始执行 Processor#run 方法时,就开始对所有的任务进行处理了;从总体来看,Sidekiq 使用了多线程的模型对任务进行处理,每一个 Processor 都是使用了 safe_thread 方法在一个新的线程里面运行的:

def safe_thread(name, &block)
  Thread.new do
    Thread.current['sidekiq_label'.freeze] = name
    watchdog(name, &block)
  end
end

在使用 Sidekiq 时,我们也会在不同的机器上开启多个 Sidekiq Worker,也就是说 Sidekiq 可以以多进程、多线程的方式运行,同时处理大量的异步任务。

到目前为止,我们已经分析了异步任务的入队以及 Sidekiq Worker 的启动过程了,接下来即将分析 Sidekiq 对异步任务的处理过程。

异步任务的处理

『主题』的订阅

作为一个 Sidekiq Worker 进程,它在启动时就会决定选择订阅哪些『主题』去执行,比如当我们使用下面的命令时:

sidekiq -q critical,2 -q default

CLI#parse 方法会对传入的 -q 参数进行解析,但是当执行 sidekiq 命令却没有传入队列参数时,Sidekiq 只会订阅 default 队列中的任务:

def parse(args=ARGV)
  # ...
  validate!
  # ...
end

def validate!
  options[:queues] << 'default' if options[:queues].empty?
end

同时,默认情况下的队列的优先级都为 1,高优先级的队列在当前的任务中可以得到更多的执行机会,实现的方法是通过增加同一个 queues 集合中高优先级队列的数量,我们可以在 CLI#parse_queue 中找到实现这一功能的代码:

def parse_queue(opts, q, weight=nil)
  [weight.to_i, 1].max.times do
    (opts[:queues] ||= []) << q
  end
  opts[:strict] = false if weight.to_i > 0
end

异步任务的处理

从异步任务的入队一节中,我们可以清楚地看到使用 #perform_async 和 #perform_in 两种方法创建的数据结构 payload 最终以不同的方式进入了 Redis 中,所以在这里我们将异步任务的处理分为定时任务和『立即』任务两个部分,分别对它们不同的处理方式进行分析。

1.定时任务

Sidekiq 使用 Scheduled::Poller 对 Redis 中 schedules 有序集合中的负载进行处理,其中包括 retry 和 schedule 两个有序集合中的内容。

在 Poller 被 Scheduled::Poller 启动时会调用 #start 方法开始对上述两个有序集合轮训,retry 中包含了所有重试的任务,而 schedule 就是被安排到指定时间执行的定时任务了:

def start
  @thread ||= safe_thread("scheduler") do
    initial_wait
    while !@done
      enqueue
      wait
    end
  end
end

Scheduled::Poller#start 方法内部执行了一个 while 循环,在循环内部也只包含入队和等待两个操作,用于入队的方法最终调用了 Scheduled::Poll::Enq#enqueue_jobs 方法:

def enqueue_jobs(now=Time.now.to_f.to_s, sorted_sets=SETS)
  Sidekiq.redis do |conn|
    sorted_sets.each do |sorted_set|
      while job = conn.zrangebyscore(sorted_set, '-inf'.freeze, now, :limit => [0, 1]).first do
        if conn.zrem(sorted_set, job)
          Sidekiq::Client.push(Sidekiq.load_json(job))
        end
      end
    end
  end
end

传入的 SETS 其实就是 retry 和 schedule 构成的数组,在上述方法中,Sidekiq 通过一个 Redis#zrangebyscore 和 Redis#zrem 将集合中小于当前时间的任务全部加到立即任务中,最终调用是在前面已经提到过的 Client#push 方法将任务推到指定的队列中。

由于 Scheduled::Poller 并不是不停地对 Redis 中的数据进行处理的,因为当前进程一直都在执行 Poller#enqueue 其实是一个非常低效的方式,所以 Sidekiq 会在每次执行 Poller#enqueue 之后,执行 Poller#wait 方法,随机等待一段时间:

def wait
  @sleeper.pop(random_poll_interval)
  # ...
end

def random_poll_interval
  poll_interval_average * rand + poll_interval_average.to_f / 2
end

随机等待时间的范围在 [0.5 * poll_interval_average, 1.5 * poll_interval_average] 之间;通过随机的方式,Sidekiq 可以避免在多个线程处理任务时,短时间内 Redis 接受大量的请求发生延迟等问题,能够保证从长期来看 Redis 接受的请求数是平均的;同时因为 Scheduled::Poller 使用了 #enqueue 加 #wait 对 Redis 中的数据进行消费,所以没有办法保证任务会在指定的时间点执行,执行的时间一定比安排的时间要晚,这也是我们在使用 Sidekiq 时需要注意的。

随机等待的时间其实不止与 poll_interval_average 有关,在默认情况下,它是当前进程数的 15 倍,在有 30 个 Sidekiq 线程时,每个线程会每隔 225 ~ 675s 的时间请求一次。

2.执行任务

定时任务是由 Scheduled::Poller 进行处理的,将其中需要执行的异步任务加入到指定的队列中,而这些任务最终都会在 Processor#run 真正被执行:

def run
  begin
    while !@done
      process_one
    end
    @mgr.processor_stopped(self)
  rescue Exception => ex
    # ...
  end
end

当处理结束或者发生异常时会调用 Manager#processor_stopped 或者 Manager#processor_died 方法对 Processor 进行处理;在处理任务时其实也分为两个部分,也就是 #fetch 和 #process 两个方法:

def process_one
  @job = fetch
  process(@job) if @job
  @job = nil
end

我们先来看一下整个方法的调用栈,任务的获取从 Processor#process_one 一路调用下来,直到 BasicFetch#retrive_work 返回了 UnitOfWork 对象,返回的对象会经过分发最后执行对应类的 #perform 传入参数真正运行该任务:

Processor#process_one
├── Processor#fetch
   └── Processor#get_one
       └── BasicFetch#retrive_work
           ├── Redis#brpop
           └── UnitOfWork#new
└── Processor#process
    ├── Processor#dispatch
    ├── Processor#execute_job
    └── Worker#perform

对于任务的获取,我们需要关注的就是 BasicFetch#retrive_work 方法,他会从 Redis 中相应队列的有序数组中 Redis#brpop 出一个任务,然后封装成 UnitOfWork 对象后返回。

def retrieve_work
  work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd) }
  UnitOfWork.new(*work) if work
end

#queues_cmd 这个实例方法其实就用到了在主题的订阅一节中的 queues 参数,该参数会在 Processor 初始化是创建一个 BasicFetch 策略对象,最终在 BasicFetch#queues_cmd 方法调用时返回一个类似下面的数组:

queue:high
queue:high
queue:high
queue:low
queue:low
queue:default

这样就可以实现了队列的优先级这一个功能了,返回的 UnitOfWork 其实是一个通过 Struct.new 创建的结构体,它会在 Processor#process 方法中作为资源被处理:

def process(work)
  jobstr = work.job
  queue = work.queue_name

  begin
    # ...

    job_hash = Sidekiq.load_json(jobstr)
    dispatch(job_hash, queue) do |worker|
      Sidekiq.server_middleware.invoke(worker, job_hash, queue) do
        execute_job(worker, cloned(job_hash['args'.freeze]))
      end
    end
  rescue Exception => ex
    # ...
  end
end

该方法对任务的执行其实总共有四个步骤:

将 Redis 中存储的字符串加载为 JSON; 执行 Processor#dispatch 方法并在内部提供方法重试等功能,同时也实例化一个 Sidekiq::Worker 对象; 依次执行服务端的中间件,可能会对参数进行更新; 调用 Processor#execute_job 方法执行任务; 而最后调用的时用于执行任务的方法 Processor#execute_job,它的实现也是到目前为止最为简单的方法之一了:

def execute_job(worker, cloned_args)
  worker.perform(*cloned_args)
end

该方法在线程中执行了客户端创建的 Worker 类的实例方法 #perform 并传入了经过两侧中间件处理后的参数。

参考文章:


Show Disqus Comments

Post Directory