首页ruby

消息队列之 active_job 结合 Sidekiq (一)

hfpp2012发布于1070 次阅读

1. 简介

sidekiq是用ruby实现的消息列队的gem,相关的gem还有resquedelayed_jobqueue_classic等。关于消息队列的解释可以查阅这篇文章PostgreSQL的listen/notify之消息队列(一),这里不再详述。每个gem的存储介质和原理都不太一样,比如queue_classic只用PostgreSQL来存,delayed_job可以用Mysql,PostgreSQL,Sqlite等,而sidekiq用redis。原理也不一样,比如queue_classic是基于PostgreSQL的listen/notify,sidekiq是基于redis的pub/sub模式。

有这么多的消息队列,每个都要用,都要去学,是一件很耗精力的事情。而且,这些gem的使用方法都差不多,很多使用上的概念很类似,比如指定队列,优先级,执行时间等都很相似。何不用一种接口或方法来统一呢,就像activerecord一样,假如要换数据库,那只是换个gem加一个配置文件而已,因为他们的使用方法都差不多。

只要把相同的使用方法抽出来,形成一个接口,而这些gem就成了适配器(adapter)。这个就像汽车的引擎或发动机一样,可以随时更换。

所以有了active_job,这个原本是一个gem,不过从4.2开始被合作到rails,成为rails的一部分了。

2. sidekiq

现在我们来单独使用sidekiq,也就是不用active_job的情况下。

2.1 安装

在Gemfile文件添加下面这行。

gem 'sidekiq', '~> 3.5.1'
# 因为依赖于redis
sudo apt-get install redis-server

由于消息队列是开了另一个独立于web的进程,在那个进程中,是在时刻接收进来的任务,然后排队处理,所以需要在controller或model把那个任务丢给消息队列的进程就好了。详情可参阅PostgreSQL的listen/notify之消息队列(一)的详述。

而controller或model是通过什么和那进程连接的呢?就是通过worker,一个worker是暴露在外面的类,可以在controller或model中全局调用。

首先来定义一个worker。

# app/workers/update_article_visit_count_worker.rb
class UpdateArticleVisitCountWorker
  include Sidekiq::Worker
  def perform(article_id)
    logger.info 'update article visit count begin'
    @article = Article.find(article_id)
    @article.visit_count += 1
    @article.save!(validate: false)
    logger.info 'update article visit count end'
  end
end

现在就可以把sidekiq的进程启动起来,准备测试。

bundle exec sidekiq

进入rails console中。

article = Article.find(:first)
UpdateArticleVisitCountWorker.perform_async(article.id)

如果没有报错且数据库被作了相应的改变,说明是有效果的。

这里有个要注意的地方,在上面的例子中,不传article整个对象,而是传article的id,是有原因的,主要是基于两点。

第一,无论article还是article_id是作为参数传给sidekiq进程的,你传对象不好处理的,那是ruby的对象,你传到另一个进程需要反序列化之类的,而传id,只是一个数字就好传多了,也好处理。

第二,worker中是放耗时的动作的,那个find可以比较耗时,这取决于数据库的记录,所以尽量将这种需要时间的动作放到worker中的perform。

整个sidekiq的使用很简单,就是建立一个worker的类,然后把逻辑写到perform方法,在需要的地方用perform_async调用就好了。

关于sidekiq的线上部署,如果你是使用mina的话,那就简单了,使用mina-sidekiq,也就几行代码搞定。

sidekiq默认会使用redis的默认ip地址,端口等,不过这些都是可以改变的。

sidekiq远不止这么简单,它是有强大的功能却易用,关于sidekiq的更加详细的信息,比如错误处理,日志记录,Batches,监控,高级选项等可以参考sidekiq-wiki。还有,sidekiq还有很多插件,你往github搜一下可能就可以找到一些。

3. active_job

把上面的例子改成用active_job来实现。

生成类似worker的文件。

rails generate job update_article_visit_count

生成了下面这个文件。

# app/jobs/update_article_visit_count_job.rb
class UpdateArticleVisitCountJob < ActiveJob::Base
  queue_as :default

  def perform(*args)
    # Do something later
  end
end

UpdateArticleVisitCountJob类继承自ActiveJob::Base

现在把上面的那段逻辑放进去。

# app/jobs/update_article_visit_count_job.rb
class UpdateArticleVisitCountJob < ActiveJob::Base
  queue_as :default

  def perform(article_id)
    logger.info 'update article visit count begin'
    @article = Article.find(article_id)
    @article.visit_count += 1
    @article.save!(validate: false)
    logger.info 'update article visit count end'
  end
end

指定用sidekiq作为adapter。

# config/application.rb
module YourApp
  class Application < Rails::Application
    config.active_job.queue_adapter = :sidekiq
  end
end

进入rails console中测试。

article = Article.first
UpdateArticleVisitCountJob.perform_later article.id

跟上面的调用也是类似。

这是基本的功能,active_job还支持指定队列,回调函数,异常捕获等。具体地参考官方文档

下一篇:消息队列之RabbitMQ结合sneakers(二)
完结。

本站文章均为原创内容,如需转载请注明出处,谢谢。

0 条回复
暂无回复~~
喜欢
友情提示
   官方 QQ 1 群 697272886(500/2000)
   官方 QQ 2 群 856141852

© Rails365 | 隐私条款 | 服务条款 | 粤ICP备15004902号 | 在线学员:12

Top