首页ruby

Sidekiq 实战进阶

hfpp2012发布于1219 次阅读

1. 存储机制

这一节我们来分析一下sidekiq更为进阶的内容,先从sidekiq的存储开始。

sidekiq是使用redis作为存储机制的。目前有一个job是下面这样的:

class UpdateArticleVisitCountJob < ActiveJob::Base
  queue_as :default

  def perform(article_id)
    Sidekiq.logger.info 'update article visit count begin'
    @article = Article.find(article_id)
    @article.visit_count += 1
    @article.save!(validate: false)
    Sidekiq.logger.info 'update article visit count end'
  end
end

先不开启sidekiq。先在网页上或console上激发这个job。

# rails console
article = Article.first
UpdateArticleVisitCountJob.perform_later article.id

再来查看redis的存储情况。

127.0.0.1:6379> keys sidekiq:*
1) "sidekiq:queue:default"
2) "sidekiq:queues"

分别来看一下这两个key存的是何值。

127.0.0.1:6379> type sidekiq:queue:default
list
127.0.0.1:6379> lrange sidekiq:queue:default 0 -1
1) "{\"class\":\"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper\",\"wrapped\":\"UpdateArticleVisitCountJob\",\"queue\":\"default\",\"args\":[{\"job_class\":\"UpdateArticleVisitCountJob\",\"job_id\":\"d8c1e0c3-76bc-4bfe-b7a6-3ae816b7edb1\",\"queue_name\":\"default\",\"arguments\":[\"2015-10-27-redis-de-tu-xing-hua-gong-ju-si\"]}],\"retry\":true,\"jid\":\"373bc9016faa7bef4dc13280\",\"created_at\":1452411730.167636,\"enqueued_at\":1452411730.167695}"

127.0.0.1:6379> type sidekiq:queues
set
127.0.0.1:6379> smembers sidekiq:queues
1) "default"

由此可见,sidekiq:queues存的是队列的集合,因为只有一个队列,所以只有default,而sidekiq:queue:default存的是default这个队列的job,它是一个list,每次有新的job过来,就会push进这个list。这些job都有各自的class,arguments等参数,格式是一个json,这样就能找到执行的class和代码,加上传递过来的参数就可以执行了。

我把上面的job的queue从default换成myqueue

127.0.0.1:6379> smembers sidekiq:queues
1) "myqueue"
2) "default"

127.0.0.1:6379> keys sidekiq:*
1) "sidekiq:queues"
2) "sidekiq:queue:default"
3) "sidekiq:queue:myqueue"

在开启sidekiq进程之前,我们先在redis-cli打开monitor

127.0.0.1:6379> monitor
OK

再来开启sidekiq。运行bundle exec sidekiq即可。

会发现monitor中有类似下面这样的输出:

1452413290.988732 [0 127.0.0.1:53645] "brpop" "sidekiq:queue:default" "2"
1452413290.991246 [0 127.0.0.1:53646] "incrby" "sidekiq:stat:processed" "0"
1452413291.064356 [0 127.0.0.1:53646] "incrby" "sidekiq:stat:processed:2016-01-10" "0"
1452413291.074665 [0 127.0.0.1:53645] "brpop" "sidekiq:queue:default" "2"
...
1452413291.085721 [0 127.0.0.1:53646] "incrby" "sidekiq:stat:failed" "0"
1452413291.085772 [0 127.0.0.1:53646] "incrby" "sidekiq:stat:failed:2016-01-10" "0"
1452413291.085813 [0 127.0.0.1:53646] "del" "sidekiq:MacintoshdeMacBook-Air.local:21359:aff51da8b2a6:workers"
...
1452413291.108264 [0 127.0.0.1:53646] "sadd" "sidekiq:processes" "MacintoshdeMacBook-Air.local:21359:aff51da8b2a6"
1452413291.115825 [0 127.0.0.1:53666] "brpop" "sidekiq:queue:default" "2"
1452413291.117003 [0 127.0.0.1:53667] "brpop" "sidekiq:queue:default" "2"
1452413291.117066 [0 127.0.0.1:53646] "hmset" "sidekiq:MacintoshdeMacBook-Air.local:21359:aff51da8b2a6" "info" "{\"hostname\":\"MacintoshdeMacBook-Air.local\",\"started_at\":1452413290.988961,\"pid\":21359,\"tag\":\"rails365\",\"concurrency\":25,\"queues\":[\"default\"],\"labels\":[],\"identity\":\"MacintoshdeMacBook-Air.local:21359:aff51da8b2a6\"}" "busy" "1" "beat" "1452413291.1081119"
1452413291.118247 [0 127.0.0.1:53668] "brpop" "sidekiq:queue:default" "2"
1452413291.124678 [0 127.0.0.1:53646] "expire" "sidekiq:MacintoshdeMacBook-Air.local:21359:aff51da8b2a6" "60"
1452413291.124745 [0 127.0.0.1:53646] "rpop" "sidekiq:MacintoshdeMacBook-Air.local:21359:aff51da8b2a6-signals"

上面列出的只是一部分输出,从输出你也可以看到sidekiq会开启多个线程不断地执行brpop指令。

brpop是redis中一个阻塞式的pop指令,它能够弹出list的一个元素。

下面来看下redis中存了哪些内容。

1) "sidekiq:stat:failed:2016-01-10"
2) "sidekiq:MacintoshdeMacBook-Air.local:21359:aff51da8b2a6"
3) "sidekiq:stat:processed"
4) "sidekiq:queues"
5) "sidekiq:stat:processed:2016-01-10"
6) "sidekiq:processes"
7) "sidekiq:stat:failed"
8) "sidekiq:queue:myqueue"

也就是说,它会把处理过的,失败的job的一些信息都记录下来,这些信息也是sidekiq中的web ui利用的,它就是利用这些信息来查询一共处理了多少job,失败的有多少等等。

2. 配置文件

sidekiq是一个程序,它能够带参数来运行,下面是它的所有参数。

$ sidekiq --help
2016-01-10T08:21:42.317Z 22167 TID-ouj4fd3hk INFO: sidekiq [options]
    -c, --concurrency INT            processor threads to use
    -d, --daemon                     Daemonize process
    -e, --environment ENV            Application environment
    -g, --tag TAG                    Process tag for procline
    -i, --index INT                  unique process index on this machine
    -q, --queue QUEUE[,WEIGHT]       Queues to process with optional weights
    -r, --require [PATH|DIR]         Location of Rails application with workers or file to require
    -t, --timeout NUM                Shutdown timeout
    -v, --verbose                    Print more verbose output
    -C, --config PATH                path to YAML config file
    -L, --logfile PATH               path to writable logfile
    -P, --pidfile PATH               path to pidfile
    -V, --version                    Print version and exit
    -h, --help                       Show help

比如指定并发数,队列,超时时间等,还有作为进程一些必须的参数,例如指定pid文件,日志文件等。

然而这些参数也可以以配置文件的形式存在。

比如:

# config/sidekiq.yml
---
:concurrency: 5
:pidfile: tmp/pids/sidekiq.pid
:logfile: ./log/sidekiq.log
staging:
  :concurrency: 10
production:
  :concurrency: 20
:queues:
  - default
  - [myqueue, 2]

只要运行的时候指定好-c参数就行的,例如:sidekiq -c config/sidekiq.yml

3. 部署问题

sidekiq部署到线上环境,首先需要让sidekiq以deamon的形式运行,就是以后台程序的形式运行。

sidekiq有个参数-d可以做到这个。

还需要指定-e参数,一般是production

为了能让进程被友好地kill掉,还需要pidfile文件,也就是-P参数。

除此之外,还有查错使用的日志文件,也即-L参数。

不过,我们都会使用capistranomina来自动化部署我们的项目,它们对应使用的插件分别是下面两个:

具体的可以研究这两个插件的源码,来看它是如何启动,停止sidekiq进程的。

然而,官方并不推荐这种做法,原因是不能保证可用性,比如sidekiq挂掉,它不能自动重启。

所以官方的推荐作法是用systemd或者upstart,它们比较有可靠性。

不过我推荐monit来管理sidekiq

关于这几个官方都有类似的example:

4. 并发控制

上面的配置文件有一个参数concurrency就是来控制并发数的,这个并发数指的是线程的数量,也就是同一时刻能处理的并发量。

关于此点,必须来说说sidekiqdelayed_jobresque的并发机制的区别。

delayed_job是使用mysql,postgresql,mongodb等作为存储介质的,而sidekiqresque是使用速度更快的内存系统redis作为存储介质。delayed_jobresque是使用单线程的进程机制,也就是说,它们同一时刻能处理的job是一个。假如要实现并发,同时处理多个job,就得开多个进程,且它们都有这种支持。众所周知,进程开销是很大,也很占用内存。而sidekiq是使用线程机制的,它默认可以开25个线程,也就是25个并发,当然,它也可以开多个进程,如果要实现高并发,就可以开N个进程,每个进程又开N个线程。那并发数就是NxN的数量。

sidekiq是使用线程连接池来管理线程,所以不用太担心线程的开销问题。

还有一个问题就是参数concurrency的值可以等于数据库的线程连接池pool的数量,这个数据库的线程连接池pool的数量,默认是5,可以调成跟concurrency的值一样。

production:
  adapter: mysql2
  database: foo_production
  pool: 25

5. 异常捕获

在跑sidekiq的job的时候,也是有可能出现异常的,如果这时候我们要查看异常,只能去看log。不然的话就使用一些通知机制,比如slack,邮件等来主动通知我们。

还有一种方式就是把这个异常捕获起来存放到数据库,然后再去查看。

我是用了一张表来存放这些异常。

# config/initializers/sidekiq.rb
Sidekiq.configure_server do |config|
  config.error_handlers << Proc.new do |ex,ctx_hash|
    Admin::SidekiqException.create!(ex: ex, ctx_hash: ctx_hash)
  end
end

6. 其他

sidekiqresque等相比,它默认有个retry机制,这个可以确保job失败的时候去重新触发,job失败会抛出异常,抛出异常的原因可以有很多种,可能本来程序就有错,这种情况下可以等程序恢复正常之后,job能正常运行,确保不丢失原来的job。还有一种情况,就是可能这个job需要连接数据库,或其他网络,如果当时不通,job就会失败,可以不断地retry,等网络通了之后job就可以成功运行了。所以这是一个很好的机制。

sidekiq还可以指定retry的次数,也可以在job中关闭retry这个功能。

还可以以中间件的形式关闭掉这个功能。

# config/initializers/sidekiq.rb
Sidekiq.configure_server do |config|
  config.server_middleware do |chain|
    chain.remove Sidekiq::Middleware::Server::RetryJobs
  end
end

如果用activejob来包装sidekiq,默认就是retry的,也不能指定retry的选项了。

sidekiq没有job的callback功能,而resque有,如果使用activejob,也有这个功能。

完结。

本站文章均为原创内容,如需转载请注明出处,谢谢。

0 条回复
暂无回复~~
喜欢
友情提示
   官方 QQ 群 697272886

© Rails365 | 隐私条款 | 服务条款 | 加盟本站 | 粤ICP备15004902号 | 在线学员:13

Top