Posts List
  1. Client
    1. 入队
      1. Client Middleware
  2. Server
    1. Sidekiq::Scheduled
    2. Sidekiq::Processor
    3. Server Middleware
      1. RetryJobs
    4. Web
      1. 存状态信息
      2. 读状态信息
    5. delay
    6. sidekiq-limit_fetch 插件
      1. fetch strategy
    7. 关闭sidekiq
  3. Awesome

研读sidekiq源码

sidekiq-4.1.1

Client

入队

队列分为2种

  • SortedSet(有序集合):用于存以后执行的任务
    perform_inperform_at创建的任务,score 是任务执行时间
    {"class"=>"TestWorker", "args"=>[], "at"=>1458998746.806636, "retry"=>true, "queue"=>"default", "jid"=>"99993d7d1adf0e023c82dd1e", "created_at"=>1458998566.806747}
    相比于下面多个一个at

  • List(列表):用于马上执行的任务
    perform_async创建的任务
    {"class"=>"TestWorker", "args"=>["haha"], "retry"=>true, "queue"=>"default", "jid"=>"4dd735256dbd510e6dd76169", "created_at"=>1458998304.433952}

入队代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def atomic_push(conn, payloads)
if payloads.first['at']
conn.zadd('schedule'.freeze, payloads.map do |hash|
at = hash.delete('at'.freeze).to_s
[at, Sidekiq.dump_json(hash)]
end)
else
q = payloads.first['queue']
now = Time.now.to_f
to_push = payloads.map do |entry|
entry['enqueued_at'.freeze] = now
Sidekiq.dump_json(entry)
end
conn.sadd('queues'.freeze, q)
conn.lpush("queue:#{q}", to_push)
end
end

Client Middleware

在执行入队操作时,会先执行中间件

1
2
3
4
5
6
7
8
9
10
11
def invoke(*args)
chain = retrieve.dup
traverse_chain = lambda do
if chain.empty?
yield
else
chain.shift.call(*args, &traverse_chain)
end
end
traverse_chain.call
end

chain = retrieve.dup获取中间件的实例
居然用block的形式,依次执行chain里面的中间件。

Server

Sidekiq::Scheduled

轮询SortedSet

1
2
3
4
5
6
7
8
9
10
11
def start
@thread ||= safe_thread("scheduler") do
initial_wait

while !@done
enqueue
wait
end
Sidekiq.logger.info("Scheduler exiting...")
end
end

找出一个执行时间已到的任务,然后放入List去执行

1
2
3
4
5
6
while job = conn.zrangebyscore(sorted_set, '-inf'.freeze, now, :limit => [0, 1]).first do
if conn.zrem(sorted_set, job)
Sidekiq::Client.push(Sidekiq.load_json(job))
Sidekiq::Logging.logger.debug { "enqueued #{sorted_set}: #{job}" }
end
end

Sidekiq::Processor

执行任务
从队列找出一个job,执行

1
2
3
4
5
def process_one
@job = fetch
process(@job) if @job
@job = nil
end

Server Middleware

RetryJobs

捕获执行任务出的错误,在job中记录错误和retry信息

1
2
3
4
5
6
7
8
9
10
11
12
def call(worker, msg, queue)
yield
rescue Sidekiq::Shutdown
# ignore, will be pushed back onto queue during hard_shutdown
raise
rescue Exception => e
# ignore, will be pushed back onto queue during hard_shutdown
raise Sidekiq::Shutdown if exception_caused_by_shutdown?(e)

raise e unless msg['retry']
attempt_retry(worker, msg, queue, e)
end

然后放入retry(SortedSet)队列中

1
2
3
Sidekiq.redis do |conn|
conn.zadd('retry', retry_at.to_s, payload)
end

Web

存状态信息

concurrent-rubyMap(job状态)和AtomicFixnum(完成,失败数量)
创建一个线程,每隔5s的读取MapAtomicFixnum中更新的数据,然后写入Redis

1
2
3
4
while true
heartbeat(k, data, json)
sleep 5
end

读状态信息

Sinatra

delay

为所有class定义delay方法:

  1. 定义了sidekiq_delay,同名为delay
1
2
3
4
def sidekiq_delay(options={})
Proxy.new(DelayedClass, self, options)
end
alias_method :delay, :sidekiq_delay
  1. 把方法扩展到Module中,这样所有的类都拥有了这个方法

    1
    Module.__send__(:include, Sidekiq::Extensions::Klass) unless defined?(::Rails)
  2. Proxy定义了一个幽灵方法(method_missing),获取方法名称,然后入列

sidekiq-limit_fetch 插件

sidekiq-limit_fetch-3.1.0,复写sidekiq的fetch strategy
Sidekiq::Manager prepend了一个module,覆写了initializestart方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Sidekiq::Manager
module InitLimitFetch
def initialize(options={})
options[:fetch] = Sidekiq::LimitFetch
super
end

def start
Sidekiq::LimitFetch::Queues.start options
Sidekiq::LimitFetch::Global::Monitor.start!
super
end
end

prepend InitLimitFetch
end

fetch strategy

limit例子:
"#{PREFIX}:probed:#@name":(List,@name是队列名称) 存执行的job
写了一个redis的脚本筛去在limit限制外的队列

关闭sidekiq

接收Ctri + C发出的INT信号,raise和rescueInterrupt

  • 不再从List队列中拿任务
  • 不再轮询SortedSet
  • 对于已经在执行的任务:最多等一个timeout的时间,如果还没结束,强制退出,重新入队

Awesome

x, Thread.current[:sidekiq_worker_set] = Thread.current[:sidekiq_worker_set], nil

__send__不能被复写 send可以被复写