Client
入队
队列分为2种
SortedSet(有序集合):用于存以后执行的任务
放perform_in
和perform_at
创建的任务,score 是任务执行时间
{"class"=>"TestWorker", "args"=>[], "at"=>1458998746.806636, "retry"=>true, "queue"=>"default", "jid"=>"99993d7d1adf0e023c82dd1e", "created_at"=>1458998566.806747}
相比于下面多个一个at
List(列表):用于马上执行的任务
放perform_async
创建的任务
{"class"=>"TestWorker", "args"=>["haha"], "retry"=>true, "queue"=>"default", "jid"=>"4dd735256dbd510e6dd76169", "created_at"=>1458998304.433952}
入队代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17def atomic_push(conn, payloads)
if payloads.first['at']
conn.zadd('schedule'.freeze, payloads.map do |hash|
at = hash.delete('at'.freeze).to_s
[at, Sidekiq.dump_json(hash)]
end)
else
q = payloads.first['queue']
now = Time.now.to_f
to_push = payloads.map do |entry|
entry['enqueued_at'.freeze] = now
Sidekiq.dump_json(entry)
end
conn.sadd('queues'.freeze, q)
conn.lpush("queue:#{q}", to_push)
end
end
Client Middleware
在执行入队操作时,会先执行中间件
1 | def invoke(*args) |
chain = retrieve.dup
获取中间件的实例
居然用block的形式,依次执行chain里面的中间件。
Server
Sidekiq::Scheduled
轮询SortedSet
1 | def start |
找出一个执行时间已到的任务,然后放入List
去执行
1 | while job = conn.zrangebyscore(sorted_set, '-inf'.freeze, now, :limit => [0, 1]).first do |
Sidekiq::Processor
执行任务
从队列找出一个job,执行
1 | def process_one |
Server Middleware
RetryJobs
捕获执行任务出的错误,在job中记录错误和retry信息
1 | def call(worker, msg, queue) |
然后放入retry(SortedSet
)队列中
1 | Sidekiq.redis do |conn| |
Web
存状态信息
用concurrent-ruby的Map
(job状态)和AtomicFixnum
(完成,失败数量)
创建一个线程,每隔5s的读取Map
和AtomicFixnum
中更新的数据,然后写入Redis
1 | while true |
读状态信息
delay
为所有class定义delay方法:
- 定义了
sidekiq_delay
,同名为delay
1 | def sidekiq_delay(options={}) |
把方法扩展到
Module
中,这样所有的类都拥有了这个方法1
Module.__send__(:include, Sidekiq::Extensions::Klass) unless defined?(::Rails)
Proxy定义了一个幽灵方法(method_missing),获取方法名称,然后入列
sidekiq-limit_fetch 插件
sidekiq-limit_fetch-3.1.0,复写sidekiq的fetch strategySidekiq::Manager
prepend了一个module,覆写了initialize
和start
方法
1 | class Sidekiq::Manager |
fetch strategy
limit例子:"#{PREFIX}:probed:#@name"
:(List,@name是队列名称) 存执行的job
写了一个redis的脚本筛去在limit限制外的队列
关闭sidekiq
接收Ctri + C
发出的INT信号,raise和rescueInterrupt
- 不再从List队列中拿任务
- 不再轮询
SortedSet
- 对于已经在执行的任务:最多等一个timeout的时间,如果还没结束,强制退出,重新入队
Awesome
x, Thread.current[:sidekiq_worker_set] = Thread.current[:sidekiq_worker_set], nil
__send__
不能被复写 send
可以被复写