elasticsearchのindex更新を非同期に行う(tire + resque)
昨日の記事でtireを用いて全文検索エンジンのelasticsearchを操作する方法をまとめました。
前回はデータ追加、削除時に即indexの更新、削除を行っていましたが、ジョブキューサーバに処理を投げてindex更新を非同期に行えるようにしてみます。
ジョブキューには 先日ご紹介したresqueを使います。
初期設定
- Gemfile
#Gemfile
gem 'resque'
gem 'daemon-spawn', :require => 'daemon_spawn'
- bundle install
./bin/bundle install
Topicモデル更新
昨日の記事で作成したTopicモデルのindex_updateメソッドとindex_removeメソッドを修正します。
- before
# app/models/topic.rb
# save後にindexを更新
def index_update
self.index.store self
end
# destroy後にindexから削除
def index_remove
self.index.remove self
end
- after
resqueに処理を投げるようにします
#app/models/topic.rb
# save後にindexを更新
def index_update
Resque.enqueue(IndexUpdater, :update, self.id)
end
# destroy後にindexから削除
def index_remove
Resque.enqueue(IndexUpdater, :delete, self.id)
end
Worker作成
- workersディレクトリ作成
mkdir app/workers
- index_updater追加
# app/workers/index_updater.rb
class IndexUpdater
@queue = :defaul
def self.perform(action_type, id)
case action_type.to_sym
when :update
# :updateの場合はindex更新
Topic.find(id).tire.update_index
when :delete
# :deleteの場合はindexから該当Topicを削除
Topic.index.remove Topic.find(id)
end
end
end
- resque_worker用のdaemonスクリプト作成
#bin/resque_worker
#!/usr/bin/env ruby
require File.expand_path('../../config/application', __FILE__)
Rails.application.require_environment!
class ResqueWorkerDaemon < DaemonSpawn::Base
def start(args)
@worker = Resque::Worker.new('default')
@worker.verbose = true
@worker.work
end
def stop
end
end
ResqueWorkerDaemon.spawn!({
:processes => 1,
:working_dir => Rails.root,
:pid_file => File.join(Rails.root, 'tmp', 'pids', 'resque_worker.pid'),
:log_file => File.join(Rails.root, 'log', 'resque_worker.log'),
:sync_log => true,
:singleton => true,
:signal => 'QUIT'
})
- 実行権限追加
chmod 755 ./bin/resque_worker.rb
- daemon起動
./bin/resque_worker.rb start
確認
別ターミナルでtail -f log/resque_worker.log でログを眺めておく
rails consoleからTopicを追加
./bin/rails c
irb(main):001:0> Topic.create(title: 'Nodeでhello world', body: 'おはようございます。サンプルです')
irb(main):002:0> Topic.create(title: 'Nodeでhello world', body: 'こんにちわ。サンプルです')
- ログにindex更新の結果が残る
# log/resque_worker.log
*** Found job on default
*** got: (Job{default} | IndexUpdater | ["update", 8])
*** resque-1.25.1: Processing default since 1386161763 [IndexUpdater]
*** Running before_fork hooks with [(Job{default} | IndexUpdater | ["update", 8])]
*** resque-1.25.1: Forked 42946 at 1386161763
*** Running after_fork hooks with [(Job{default} | IndexUpdater | ["update", 8])]
*** done: (Job{default} | IndexUpdater | ["update", 8])
- ブラウザから「node」で検索すると絞り込まれた結果が取得できます
http://localhost:3000/?keyword=node
{
"paging": {
"current_page": 1,
"per_page": 3,
"total_pages": 1,
"total": 2
},
"topics": [
{
"updated_at": "2013-12-04T12:56:00.161Z",
"created_at": "2013-12-04T12:56:00.161Z",
"body": "こんにちわ。サンプルです",
"title": "Nodeでhello world",
"id": 8
},
{
"updated_at": "2013-12-04T12:55:01.113Z",
"created_at": "2013-12-04T12:55:01.113Z",
"body": "おはようございます。サンプルです",
"title": "Nodeでhello world",
"id": 7
}
]
}
まとめ
resqueを使ってelasticsearchのindex更新を非同期に行うことができました。
サンプルコードはこちらに置いておきます: hilotter / tire_sample