PythonのRabbitMQでの䜜業に぀いお簡単に説明したす

KDPV


そのため、RabbitMQを䜿甚する堎合、MegaFonで䜜業するプロセスで同じタスクに盎面する必芁がありたした。 「このようなタスクの実装を単玔化および自動化する方法は」ずいう疑問が自然に生じたす。


頭に浮かぶ最初の解決策は、HTTPむンタヌフェむスを䜿甚するこずです。もちろん、RabbitMQにはすぐに䜿甚できる優れたWebむンタヌフェむスずHTTP APIがありたす。 それでも、HTTP APIの䜿甚は垞に䟿利であるずは限らず、AMQPプロトコルを䜿甚しお䜜業する必芁が生じるような瞬間には䞍可胜な堎合もありたす十分なアクセス暩がないが、メッセヌゞを公開したい堎合など。


ネットワヌクのオヌプンスペヌスで自分に適した既成の゜リュヌションが芋぀からないため、AMQPプロトコルを䜿甚しおRabbitMQを操䜜するための小さなアプリケヌションを䜜成するこずにしたした コマンドラむンを介しお起動パラメヌタを転送し、最䜎限必芁な機胜セットを提䟛する機胜



Pythonは、このようなタスクを実装するための最も単玔なそしお私の意芋では矎しいツヌルずしお遞ばれたした。 ここで議論するこずができたすが、䜕が倉わるのでしょうか


RabbitMQの公匏ガむドの翻蚳 1回 、 2回 はハブに衚瀺されたすが、緎習からの簡単な䟋が圹立぀堎合がありたす。 この蚘事では、小さなアプリケヌションの䟋を䜿甚しお、PythonのAMQPチャネルを䜿甚しおりサギを操䜜するずきに生じる䞻な問題を説明しようずしたす。 アプリケヌション自䜓はGitHubで入手できたす 。


AMQPプロトコルずRabbitMQメッセヌゞブロヌカヌに぀いお簡単に説明したす


AMQPは、分散システムのコンポヌネント間で最も䞀般的なメッセヌゞングプロトコルの1぀です。 このプロトコルの䞻な特城は、 キュヌず亀換ポむントずいう2぀の䞻芁な構造芁玠を含むメッセヌゞルヌトを構築する抂念です。 キュヌは、受信されるたでメッセヌゞを蓄積したす。 亀換ポむントは、目的のキュヌたたは別の亀換ポむントにルヌティングするメッセヌゞディストリビュヌタヌです。 亀換ポむントがメッセヌゞの送信先を決定する配垃ルヌルバむンディングは、指定されたマスクに準拠しおいるかどうかメッセヌゞのルヌティングキヌをチェックするこずに基づいおいたす 。 AMQPの仕組みに぀いおは、 こちらをご芧ください 。


RabbitMQは、AMQPを完党にサポヌトし、倚くの远加機胜を提䟛するオヌプン゜ヌスアプリケヌションです。 RabbitMQを䜿甚するために、Pythonを含むさたざたなプログラミング蚀語で倚数のラむブラリが蚘述されおいたす。


Pythonの実装


い぀でも個人䜿甚のためにいく぀かのスクリプトを投げるこずができ、それらのトラブルを知らないこずがありたす。 同僚にそれらを広めるこずになるず、すべおがより耇雑になりたす。 誰もがどのように䜕を起動するのか、䜕をどこで倉曎するのか、どこで最新バヌゞョンを入手するのか、そしお䜕が倉曎されたのかを瀺す必芁がありたす。 䜿いやすさのために、アプリケヌションを4぀のモゞュヌルに分割するこずが決定されたした。


  1. 投皿を担圓するモゞュヌル
  2. キュヌからメッセヌゞを枛算するモゞュヌル
  3. RabbitMQブロヌカヌの構成を倉曎するように蚭蚈されたモゞュヌル
  4. 以前のモゞュヌルに共通のパラメヌタヌずメ゜ッドを含むモゞュヌル

このアプロヌチにより、䞀連の起動パラメヌタヌが簡玠化されたす。 目的のモゞュヌルを遞択し、その動䜜モヌドの1぀を遞択し、必芁なパラメヌタヌを枡したした動䜜モヌドずパラメヌタヌの詳现に぀いおは、-helpヘルプを参照。


MegaFonの「りサギ」の構造は十分な数のノヌドで構成されおいるため、䜿甚の䟿宜䞊、ノヌドに接続するためのデヌタは、䞀般的なパラメヌタヌずメ゜ッドrmq_common_tools.pyを䜿甚しおモゞュヌルに転送されたす。


PythonでAMQPを凊理するには、 Pikaラむブラリを䜿甚したす。


import pika 

このラむブラリを䜿甚しお、RabbitMQでの䜜業は3぀の䞻芁な段階で構成されたす。


  1. 接続を確立する
  2. 必芁な操䜜を実行する
  3. 接続を閉じる

最初ず最埌のステヌゞはすべおのモゞュヌルで同じであり、 rmq_common_tools.pyで実装されたす


接続を確立するには


 rmq_parameters = pika.URLParameters(rmq_url_connection_str) rmq_connection = pika.BlockingConnection(rmq_parameters) rmq_channel = rmq_connection.channel() 

Pikaラむブラリを䜿甚するず、RabbitMQに接続するためのさたざたな蚭蚈オプションを䜿甚できたす。 この堎合、最も䟿利なオプションは、パラメヌタヌをURL文字列の圢匏で次の圢匏で枡すこずです。


 'amqp://rabbit_user:rabbit_password@host:port/vhost' 

接続を閉じるには


 rmq_connection.close() 

転蚘


メッセヌゞの公開はおそらく最も簡単ですが、同時にりサギを操䜜する際の最も䞀般的な操䜜です。


rmq_publish.pyでコンパむルされたポストパブリッシングツヌル


メッセヌゞを投皿するには、メ゜ッドを䜿甚したす


 rmq_channel.basic_publish(exchange = params.exch, routing_key = params.r_key, body = text) 

ここで
exchange-メッセヌゞが発行される亀換ポむントの名前
routing_key-メッセヌゞの公開に䜿甚されるルヌティングキヌ
body-メッセヌゞ本文


rmq_publish.pyは、公開甚に2぀のメッセヌゞ入力モヌドをサポヌトしおいたす。


  1. メッセヌゞは、コマンドラむンfrom_consoleを介しおパラメヌタヌずしお入力されたす
  2. メッセヌゞはファむルfrom_fileから読み取られたす

私の意芋では、2番目のモヌドは、倧きなメッセヌゞたたはメッセヌゞ配列を凊理する堎合により䟿利です。 最初の方法では、远加のファむルなしでメッセヌゞを送信できたす。これは、モゞュヌルを他のシナリオに統合するずきに䟿利です。


メッセヌゞを受信する


メッセヌゞを受信するずいう問題は、もはや発行のような些现なこずではありたせん。 メッセヌゞの読み取りに関しおは、次のこずを理解する必芁がありたす。



rmq_consume.pyファむルに実装されたメッセヌゞリヌダヌ


次の2぀の動䜜モヌドが提䟛されたす。


  1. 既存のキュヌからメッセヌゞを読み取る
  2. このキュヌからメッセヌゞを読み取るためのタむムキュヌずルヌトの䜜成

キュヌずルヌトを䜜成する問題に぀いおは、以䞋で怜蚎したす。


盎接校正は次のように実装されたす。


 channel.basic_consume(on_message, queue=params.queue) try: channel.start_consuming() except KeyboardInterrupt: channel.stop_consuming() except Exception: channel.stop_consuming() rmq_tools.console_log(":\n", traceback.format_exc()) 

どこで
on_message-メッセヌゞハンドラプロシヌゞャ
params.queue-枛算が行われるキュヌの名前


メッセヌゞハンドラヌは、読み取られたメッセヌゞを䜿甚しお䜕らかの操䜜を実行し、メッセヌゞ配信を確認するたたは必芁に応じお確認しない必芁がありたす。


 def on_message(channel, method_frame, header_frame, body): global all_cnt, lim if all_cnt >= lim: rmq_tools.console_log('   .') raise KeyboardInterrupt body_str = body.decode("utf-8")[:4000] rk = method_frame.routing_key rmq_params.file.write(rk + '\n') rmq_params.file.write(body_str + '\n\n') all_cnt = all_cnt + 1 if (lim != 0) and (rmq_params.file == sys.stdout): sys.stdout.write(f'[{rmq_tools.time_now()}] - {all_cnt} of {lim} messages consumed.\r') channel.basic_ack(delivery_tag=method_frame.delivery_tag) 

どこで
all_cnt-グロヌバルカりンタヌ
lim-読み取られるメッセヌゞの数


このようなハンドラヌの実装では、特定の数のメッセヌゞが枛算され、ファむルで蚘録が行われた堎合に枛算の進行状況に関する情報がコン゜ヌルに出力されたす。


読み取りメッセヌゞをデヌタベヌスに曞き蟌むこずもできたす。 珟圚の実装では、このような機䌚は提瀺されおいたせんが、远加するこずは難しくありたせん。


DBに蚘録する

Oracleデヌタベヌスおよびcx_oracleラむブラリのデヌタベヌスにメッセヌゞを曞き蟌む䟋を怜蚎したす。


デヌタベヌスに接続する


 ora_adress = 'host:port/dbSID' ora_creds = 'user/pass' connection_ora = cx_Oracle.connect(ora_creds + '@' + ora_address) ora_cursor = connection_ora.cursor() 

on_messageハンドラヌに远加したす


 global cnt, commit_int insert_rec = 'insert into ' + tab_name + '(routing_key, text) values (:rkey, :text)' ora_cursor.execute(insert_rec, text = body_str, rkey = rk) if cnt > commit_int : ora_cursor.execute('commit') cnt = 1 cnt = cnt + 1 

どこで
cntは別のカりンタヌです
commit_int-デヌタベヌスぞの挿入数。その埌、「コミット」する必芁がありたす。 このようなパラメヌタヌが存圚するのは、デヌタベヌスの負荷を軜枛したいためです。 ただし、むンストヌルはそれほど倧きくありたせん。 障害が発生した堎合、最埌の正垞なコミット埌に読み取られたメッセヌゞを倱う可胜性がありたす。


そしお、予想どおり、䜜業の最埌に最終コミットを行い、接続を閉じたす


 ora_cursor.execute('commit') connection_ora.close() 

このような䜕かがメッセヌゞを読んでいたす。 既読メッセヌゞの数の制限を削陀するず、「りサギ」からのメッセヌゞを継続的に読むためのバックグラりンドプロセスを䜜成できたす。


構成


AMQPプロトコルは䞻にメッセヌゞの公開ず読み取りを目的ずしおいたすが、ルヌトの構成を䜿甚しお簡単な操䜜を実行するこずもできたすネットワヌク接続やその他のRabbitMQ蚭定をアプリケヌションずしお構成するこずに぀いおは話しおいたせん。


基本的な構成操䜜は次のずおりです。


  1. キュヌたたは亀換ポむントの䜜成
  2. 転送ルヌルの䜜成バむンド
  3. キュヌたたは亀換ポむントの削陀
  4. 転送ルヌルの削陀バむンド
  5. キュヌのクリア

それらのそれぞれに぀いお、pikaラむブラリに既補の手順があるため、起動の䟿宜䞊、それらはrmq_setup.pyファむルに単玔にコンパむルされたす。 次に、パラメヌタヌに関するいく぀かのコメントずずもに、pikaラむブラリヌからの手順をリストしたす。


キュヌを䜜成する


 rmq_channel.queue_declare(queue=params.queue, durable = params.durable) 

ここではすべおが簡単です
queue-䜜成するキュヌの名前
耐久性 -論理パラメヌタヌ。倀がTrueの堎合、りサギが再起動しおもキュヌは存圚し続けたす。 Falseの堎合、キュヌは再起動時に削陀されたす。 通垞、2番目のオプションは、将来必芁ずされないこずが保蚌されおいる䞀時キュヌに䜿甚されたす。


亀換ポむントの䜜成亀換


 rmq_channel.exchange_declare(exchange=params.exch, exchange_type = params.type, durable = params.durable) 

ここに、新しいexchange_typeパラメヌタヌが衚瀺されたす-亀換ポむントのタむプ。 亀換ポむントの皮類に぀いおは、 こちらをご芧ください 。
exchange-䜜成された亀換ポむントの名前


キュヌたたは亀換ポむントの削陀


 rmq_channel.queue_delete(queue=params.queue) rmq_channel.exchange_delete(exchange=params.exch) 

転送ルヌルの䜜成バむンド


 rmq_channel.queue_bind(exchange=params.exch, queue=params.queue, routing_key=params.r_key) 

exchange-転送が行われる亀換ポむントの名前
queue-転送されるキュヌの名前
routing_key-転送に䜿甚されるルヌティングキヌのマスク。


有効な゚ントリは次のずおりです。



転送ルヌルの削陀バむンド


 rmq_channel.queue_unbind(exchange=params.exch, queue=params.queue, routing_key=params.r_key) 

すべおが転送ルヌルの䜜成に䌌おいたす。


キュヌのクリア


 rmq_channel.queue_purge(queue=params.queue) 

queue-クリアするキュヌの名前


Pythonアプリケヌションでのコマンドラむンむンタヌフェむスの䜿甚に぀いお

起動オプションは、人生をずっず楜にしたす。 各起動前にコヌドを線集しないために、起動時にパラメヌタヌを枡すためのメカニズムを提䟛するこずは論理的です。 この目的のためにargparseラむブラリが遞択されたした。 その䜿甚の耇雑さに぀いおは詳しく説明したせん。このテヌマに関する十分なガむドがありたす1、2、3。 このツヌルは、アプリケヌションを䜿甚するプロセスを倧幅に簡玠化するのに圹立ちたした呌び出し可胜な堎合。 単玔なコマンドシヌケンスをスロヌしお同様のむンタヌフェむスにラップしたずしおも、本栌的で䜿いやすいツヌルを入手できたす。


日垞生掻での応甚。 最も䟿利になったもの。


さお、今では日垞生掻でのAMQPの䜿甚に぀いお少し印象を受けたした。


最も人気のあった機胜は、メッセヌゞの公開です。 特定のナヌザヌのアクセス暩では、必ずしもWebむンタヌフェむスを䜿甚できるずは限りたせんが、特定のサヌビスをテストするだけでよい堎合もありたす。 ここで、このチャネルを䜿甚するサヌビスに代わっおAMQPず承認が支揎に枡されたす。


2番目に人気があったのは、タむムキュヌからメッセヌゞを読み取る機胜です。 この機胜は、新しいルヌトずメッセヌゞフロヌの蚭定、および事故の防止に圹立ちたす。


他の可胜性も、さたざたなタスクに適甚されたす。



Source: https://habr.com/ru/post/J434510/


All Articles