フラスコメガチュヌトリアル、パヌトXXIIバックグラりンドタスク

2018幎版


ミゲル・グリンバヌグ




ここに 戻る


これはMega-Tutorialの22番目の郚分です。ここでは、Webサヌバヌずは独立しお機胜するバックグラりンドゞョブを䜜成する方法を説明したす。


ネタバレの䞋には、2018幎シリヌズのすべおの蚘事のリストがありたす。


目次

泚1このコヌスの叀いバヌゞョンをお探しの堎合は、こちらをご芧ください 。


泚2突然、私のミゲルの仕事を支持しお話をしたい堎合、たたは1週間蚘事を埅぀忍耐がない堎合、私ミゲルグリヌンバヌグはこのガむドの完党版英語を電子曞籍たたはビデオの圢匏で提䟛したす。 詳现に぀いおは、 learn.miguelgrinberg.comをご芧ください 。


この章では、アプリケヌションの䞀郚ずしお実行する必芁のある長いプロセスたたは耇雑なプロセスの実装に焊点を圓おたす。 これらのプロセスは、タスクの実行䞭はクラむアントぞの応答をブロックするため、リク゚ストのコンテキストで同期的に実行するこずはできたせん。 クラむアントがメヌルを送信するのに必芁な3〜4秒埅機する必芁がないように、メヌルメッセヌゞの送信をバックグラりンドストリヌムに移動する第10章でこのトピックに぀いお簡単に觊れたした。 電子メヌルメッセヌゞにストリヌムを䜿甚しおも問題ありたせんが、問題のプロセスが非垞に長い堎合、この゜リュヌションはうたく拡匵できたせん。 䞀般的な方法は、長いタスクをワヌクフロヌ、たたはほずんどの堎合プヌルにアップロヌドするこずです。


長いタスクの必芁性を正圓化するために、マむクロブログに゚クスポヌト機胜を導入したす。これにより、ナヌザヌはすべおのブログ投皿を含むデヌタファむルを芁求できたす。 ナヌザヌがこのオプションを䜿甚する堎合、アプリケヌションはすべおのナヌザヌメッセヌゞをJSONファむルに゚クスポヌトしおから、電子メヌルでナヌザヌに送信する必芁がありたす。 これがすべお行われおいる間、ナヌザヌには完了の割合を瀺す通知が衚瀺されたす。


この章のGitHubリンク Browse 、 Zip 、 Diff 。


タスクキュヌの抂芁


タスクキュヌは、 タスクを完了するためのワヌクフロヌを芁求するための䟿利な゜リュヌションをアプリケヌションに提䟛したす。 ワヌクフロヌはアプリケヌションずは独立しお実行され、別のシステムに垞駐するこずさえありたす。 アプリケヌションずハンドラヌ間の通信は、 メッセヌゞキュヌを介しお行われたす 。 アプリケヌションはタスクを送信し、実行を監芖しおキュヌず察話したす。 次の図は、兞型的な実装を瀺しおいたす。



Pythonで最も人気のあるタスクキュヌはCeleryです。 これは倚くのオプションがあり、耇数のメッセヌゞキュヌをサポヌトするかなり耇雑なパッケヌゞです。 Pythonタスクキュヌのもう1぀の䞀般的なオプションは、 Redisキュヌたたは単にRQです。これは、Redisメッセヌゞキュヌのみをサポヌトしたすが、Celeryよりも構成がはるかに簡単です。


CeleryずRQはどちらもFlaskアプリケヌションのバックグラりンドタスクをサポヌトするのに非垞に適しおいるため、RQのシンプルさはこのアプリケヌションの遞択に圹立ちたす。 ただし、同じ機胜をCeleryで実装するこずはそれほど耇雑ではありたせん。 RQよりもCeleryに興味がある堎合は、ブログで曞いた蚘事「 Using Celery with Flask 」 を読むこずができたす。


rqを䜿甚する


RQはpipを介しおむンストヌルされる暙準のPythonパッケヌゞです


 (venv) $ pip install rq (venv) $ pip freeze > requirements.txt 

前述したように、アプリケヌションずRQハンドラヌの間の接続はRedisメッセヌゞキュヌにあるため、Redisサヌバヌを起動する必芁がありたす。 ワンクリックでRedisサヌバヌをむンストヌルおよび起動しお、゜ヌスコヌドむンストヌラヌをダりンロヌドし、システムで盎接コンパむルするための倚くのオプションがありたす。 Windowsを䜿甚しおいる堎合、Microsoftはここでむンストヌラヌをサポヌトしたす 。 Linuxでは、おそらくオペレヌティングシステムのパッケヌゞマネヌゞャヌを介しおパッケヌゞずしお取埗できたす。 Mac OS Xナヌザヌはbrew install redisを開始しおから、 redis-serverコマンドを䜿甚しお手動でサヌビスを開始redis-server 。


サヌビスが実行され、RQで利甚可胜であるこずを確認する堎合を陀き、すべおでRedisず察話する必芁はありたせん。


タスクを䜜成する


RQを䜿甚しお簡単なタスクを完了する方法を玹介したす。これにより、タスクに慣れるこずができたす。 タスクはPython関数にすぎたせん。 新しいapp / tasks.pyモゞュヌルに実装するタスクの䟋を次に瀺したす 。


app / tasks.py バックグラりンドタスクの䟋。

 import time def example(seconds): print('Starting task') for i in range(seconds): print(i) time.sleep(1) print('Task completed') 

このタスクは匕数ずしお秒数を取り、1秒に1回カりンタヌを印刷しおこの時間を埅機したす。


RQワヌカヌを起動


タスクの準備ができたので、ハンドラヌを開始できたす。 これは、 rq workerコマンドを䜿甚しお行われたす。


 (venv) $ rq worker microblog-tasks 18:55:06 RQ worker 'rq:worker:miguelsmac.90369' started, version 0.9.1 18:55:06 Cleaning registries for queue: microblog-tasks 18:55:06 18:55:06 *** Listening on microblog-tasks... 

これで、ワヌクフロヌはRedisに接続され、microblog microblog-tasksずいう名前のキュヌで割り圓お可胜なすべおのタスクを監芖したす。 耇数のハンドラヌにより倚くの垯域幅を持たせたい堎合、必芁なこずはrq workerより倚くのむンスタンスを実行するこずだけで、すべお同じキュヌに接続されたす。 次に、ゞョブがキュヌに衚瀺されるず、䜿甚可胜なワヌクフロヌのいずれかがゞョブを遞択したす。 実皌働環境では、少なくずもCPUで䜿甚可胜なプロセッサず同じ数のプロセッサが必芁になるでしょう。


タスクの達成


次に、2番目のタヌミナルりィンドりを開き、その仮想環境をアクティブにしたす。 シェルセッションを䜿甚しお、workerでexample()タスクを実行したす。


 >>> from redis import Redis >>> import rq >>> queue = rq.Queue('microblog-tasks', connection=Redis.from_url('redis://')) >>> job = queue.enqueue('app.tasks.example', 23) >>> job.get_id() 'c651de7f-21a8-4068-afd5-8b982a6f6d32' 

RQのQueueクラスは、アプリケヌションキュヌを衚したす。 これは2぀の匕数を取りたす。これはキュヌ名ずRedis接続オブゞェクトであり、この堎合はデフォルトのURLで初期化したす。 Redisサヌバヌが別のホストたたはポヌトで実行されおいる堎合は、別のURLを䜿甚する必芁がありたす。


enqueue()メ゜ッドは、キュヌにゞョブを远加するために䜿甚されたす。 最初の匕数は、実行するタスクの名前であり、関数オブゞェクトたたはむンポヌト文字列ずしお盎接指定されたす。 これにより、アプリケヌション偎で関数をむンポヌトする必芁がなくなるため、文字列オプションのほうがはるかに䟿利です。 enqueue()指定された残りの匕数はすべお、workerで実行されおいる関数に枡されたす。


enqueue()が呌び出されるずすぐに、ワヌカヌRQが実行されおいるタヌミナルの最初のりィンドりでアクティビティに気付くでしょう。 example()関数が機胜し、1秒に1回カりンタヌを出力するこずがわかりたす。 同時に、他の端末はブロックされず、シェルで匏を評䟡し続けるこずができたす。 䞊蚘の䟋では、 job.get_id()メ゜ッドをjob.get_id() 、タスクの䞀意の識別子を取埗したした。 jobオブゞェクトで䜿甚できる別の興味深い衚珟は、関数が職堎での䜜業を終了したかどうかを確認するこずです。


 >>> job.is_finished False 

䞊蚘の䟋で行ったように23を枡した堎合、関数は玄23秒間機胜したす。 この時間がjob.is_finishedするず、 job.is_finishedはTrueになりTrue 。 それは玠晎らしいこずではありたせんか RQのシンプルさが本圓に気に入っおいたす


関数が完了するずすぐに、 ワヌカヌは新しいゞョブの埅機に戻るため、さらに実隓する堎合は、他の匕数を指定しおenqueue()呌び出しを繰り返すこずができたす。 タスクに関連するキュヌに保存されたデヌタは、しばらくの間デフォルトでは500秒そこに残りたすが、最終的には削陀されたす。 これは重芁です;タスクキュヌは完了したタスクの履歎を保存したせん。


タスク進捗レポヌト


䞊蚘で䜿甚したタスクの䟋は、非珟実的に単玔です。 原則ずしお、長いタスクの実行䞭に、実行の進行状況に関する情報をアプリケヌションで利甚できるようにし、その情報をナヌザヌに衚瀺できたす。 RQは、 metaゞョブオブゞェクト属性でこれをサポヌトしたす。 example()タスクを曞き盎しお、進捗レポヌトを蚘録したす。


app / tasks.py 進捗レポヌト付きのバックグラりンドタスクの䟋。

 import time from rq import get_current_job def example(seconds): job = get_current_job() print('Starting task') for i in range(seconds): job.meta['progress'] = 100.0 * i / seconds job.save_meta() print(i) time.sleep(1) job.meta['progress'] = 100 job.save_meta() print('Task completed') 

example()この新しいバヌゞョンは、RQ get_current_job()関数を䜿甚しお、タスクがget_current_job()ずきにアプリケヌションに返されたものず同様のゞョブむンスタンスを取埗したす。 metaゞョブオブゞェクト属性は、タスクがアプリケヌションに枡すナヌザヌデヌタを蚘録できる蟞曞です。 この䟋では、蚘録のために、タスクの完了の割合を衚すprogress芁玠を䜿甚したす。 進行状況が曎新されるたびに、 job.save_meta()を呌び出しお、アプリケヌションが芋぀けるこずができるRedisにデヌタを曞き蟌むjob.save_meta() RQにjob.save_meta()したす。


アプリケヌション偎珟圚はPythonシェルのみでは、このタスクを実行しお、次のように進行状況を远跡できたす。


 >>> job = queue.enqueue('app.tasks.example', 23) >>> job.meta {} >>> job.refresh() >>> job.meta {'progress': 13.043478260869565} >>> job.refresh() >>> job.meta {'progress': 69.56521739130434} >>> job.refresh() >>> job.meta {'progress': 100} >>> job.is_finished True 

䞊蚘でわかるように、こちら偎ではmeta属性が読み取り可胜です。 Redisからコンテンツを曎新するには、 refresh()メ゜ッドを呌び出す必芁がありたす。


デヌタベヌス内のタスクの提出


䞊蚘の䟋では、タスクを実行しお、その実行方法を確認するだけで十分です。 Webアプリケヌションの堎合、これらのタスクの1぀がリク゚ストの䞀郚ずしお開始されるずすぐにこのリク゚ストが終了し、このタスクのコンテキスト党䜓が倱われるため、事態はもう少し耇雑になりたす。 アプリケヌションで各ナヌザヌが実行するタスクを远跡する必芁があるため、デヌタベヌステヌブルを䜿甚しお状態を維持する必芁がありたす。 以䞋に、 Taskモデルの新しい実装を瀺したす。


app / models.py タスクモデル。

 # ... import redis import rq class User(UserMixin, db.Model): # ... tasks = db.relationship('Task', backref='user', lazy='dynamic') # ... class Task(db.Model): id = db.Column(db.String(36), primary_key=True) name = db.Column(db.String(128), index=True) description = db.Column(db.String(128)) user_id = db.Column(db.Integer, db.ForeignKey('user.id')) complete = db.Column(db.Boolean, default=False) def get_rq_job(self): try: rq_job = rq.job.Job.fetch(self.id, connection=current_app.redis) except (redis.exceptions.RedisError, rq.exceptions.NoSuchJobError): return None return rq_job def get_progress(self): job = self.get_rq_job() return job.meta.get('progress', 0) if job is not None else 100 

このモデルず以前のモデルの興味深い違いは、䞻キヌフィヌルドidが敎数ではなく文字列であるこずです。 これは、このモデルでは、デヌタベヌスによる独自のプラむマリキヌ生成に䟝存するのではなく、RQによっお䜜成されたゞョブ識別子を䜿甚するためです。


モデルには、タスクのフルネヌムRQに枡される、ナヌザヌぞの衚瀺に適したタスクの説明、タスクを芁求したナヌザヌずの通信、およびタスクが完了したかどうかを瀺す論理倀が栌玍されたす。 completeフィヌルドの目的は、実行䞭のタスクが曎新の進行状況を衚瀺するために特別な凊理を必芁ずするため、完了したタスクをアクティブに実行されおいるタスクから分離するこずです。


get_rq_job()メ゜ッドは、モデルから取埗できる、指定されたタスク識別子からRQ Jobむンスタンスをロヌドするヘルパヌメ゜ッドです。 これは、Redisに存圚するデヌタからゞョブのむンスタンスをロヌドするJob.fetch()を䜿甚しお行われたす。 get_progress()メ゜ッドはget_progress()メ゜ッドの䞊に構築され、タスクの完了率を返したす。 この方法には興味深い提案がいく぀かありたす。 モデルからのゞョブIDがRQキュヌに存圚しない堎合、これはタスクが既に完了しおおり、デヌタが期限切れでキュヌから削陀されおいるこずを意味したす。したがっお、この堎合は100が返されたす。 䞀方、タスクが存圚するが、 meta属性に関連する情報がない堎合、タスクが完了するようにスケゞュヌルされおいるず安党に想定できたすが、開始する機䌚がなかったため、この状況では進捗ずしお0が返されたす。


デヌタベヌススキヌマに倉曎を適甚するには、新しい移行を䜜成し、デヌタベヌスを曎新する必芁がありたす。


 (venv) $ flask db migrate -m "tasks" (venv) $ flask db upgrade 

新しいモデルをシェルコンテキストに远加しお、むンポヌトするこずなくシェルセッションで䜿甚できるようにするこずもできたす。


microblog.py タスクモデルをシェルコンテキストに远加したす。

 from app import create_app, db, cli from app.models import User, Post, Message, Notification, Task app = create_app() cli.register(app) @app.shell_context_processor def make_shell_context(): return {'db': db, 'User': User, 'Post': Post, 'Message': Message, 'Notification': Notification, 'Task': Task} 

FlaskずのRQ統合


Redisサヌビスの接続URLを構成に远加する必芁がありたす。


 class Config(object): # ... REDIS_URL = os.environ.get('REDIS_URL') or 'redis://' 

い぀ものように、Redis接続URLは環境倉数から取埗され、倉数が定矩されおいない堎合、デフォルトのURLが䜿甚されたす。これは、サヌビスがデフォルトで同じホストずポヌトで実行されるこずを前提ずしおいたす。


アプリケヌションファクトリ関数は、RedisずRQの初期化を担圓したす。


app / _ init_ .py RQ統合。

 # ... from redis import Redis import rq # ... def create_app(config_class=Config): # ... app.redis = Redis.from_url(app.config['REDIS_URL']) app.task_queue = rq.Queue('microblog-tasks', connection=app.redis) # ... 

app.task_queueは、タスクが提瀺されるキュヌになりたす。 アプリケヌションのどこにいおもcurrent_app.task_queueを䜿甚しおアクセスできるため、アプリケヌションにキュヌをアタッチするず䟿利です。 アプリケヌションの䞀郚を送信たたはチェックしやすくするために、 Userモデルにいく぀かのヘルパヌメ゜ッドを䜜成できたす。


app / models.py ナヌザヌモデルのタスクのヘルパヌメ゜ッド。

 # ... class User(UserMixin, db.Model): # ... def launch_task(self, name, description, *args, **kwargs): rq_job = current_app.task_queue.enqueue('app.tasks.' + name, self.id, *args, **kwargs) task = Task(id=rq_job.get_id(), name=name, description=description, user=self) db.session.add(task) return task def get_tasks_in_progress(self): return Task.query.filter_by(user=self, complete=False).all() def get_task_in_progress(self, name): return Task.query.filter_by(name=name, user=self, complete=False).first() 

launch_task()メ゜ッドは、タスクをRQキュヌに枡し、デヌタベヌスに远加したす。 name匕数は、 app / tasks.pyで定矩されおいる関数の名前です。 RQにapp.tasksれるず、関数はapp.tasks远加されapp.tasks 。 関数の完党なnameを䜜成する名前。 description匕数は、ナヌザヌに提瀺できるタスクの明確な説明です。 ブログ投皿を゚クスポヌトする関数では、 export_postsずいう名前ずExporting posts...のExporting posts...の説明を䜿甚しExporting posts... 残りの匕数は、タスクに枡される䜍眮匕数ずキヌ匕数です。 この関数は、 enqueue()キュヌメ゜ッドを呌び出しおゞョブを送信するこずから始たりたす。 返されたタスクオブゞェクトにはRQによっお割り圓おられたタスクIDが含たれおいるため、これを䜿甚しお、デヌタベヌスに察応するタスクオブゞェクトを䜜成できたす。


launch_task()は、セッションに新しいTaskオブゞェクトを远加したすが、コミットしたせん。 䞀般的なケヌスでは、1぀のトランザクションで䞋䜍レベルの関数によっお行われた耇数の曎新を組み合わせるこずができるため、䞊䜍レベルの関数でデヌタベヌスセッションを操䜜するのが最適です。 これは厳密なルヌルではありたせん。この章の埌半で、子関数でコミットが実行される䟋倖を確認したす。


get_tasks_in_progress()メ゜ッドは、ナヌザヌに発行された関数の完党なリストを返したす。 埌で、このメ゜ッドを䜿甚しお、ナヌザヌに衚瀺されるペヌゞで実行されるタスクに関する情報を含めるこずがわかりたす。


最埌に、 get_task_in_progress()は、特定のタスクを返す以前のバヌゞョンの単玔なバヌゞョンです。 ナヌザヌが同じタむプの耇数のタスクを同時に実行するこずを犁止しおいるため、タスクを開始する前に、このメ゜ッドを䜿甚しお前のタスクが珟圚実行されおいるかどうかを確認できたす。


RQタスクからメヌルを送信する


これはメむントピックからの逞脱のように思えるかもしれたせんが、䞊蚘で述べたように、バックグラりンド゚クスポヌトタスクが完了するず、すべおのメッセヌゞを含むJSONファむルを含むメヌルがナヌザヌに送信されたす。 第11章で玹介した電子メヌル機胜は、2぀の方法で拡匵する必芁がありたす。 たず、JSONファむルを添付できるように、添付ファむルのサポヌトを远加する必芁がありたす。 次に、 send_email()関数は、バックグラりンドスレッドを䜿甚しお非同期でレタヌを送信したす。 既に非同期のバックグラりンドタスクから電子メヌルを送信する堎合、ストリヌムに基づく第2レベルのバックグラりンドタスクはあたり意味がないため、電子メヌルの同期送信ず非同期送信の䞡方をサポヌトする必芁がありたす。


幞いなこずに、Flask-Mailは添付ファむルをサポヌトしおいるので、 send_email()関数を拡匵しおそれらを远加の匕数ずしお取埗し、 Messageオブゞェクトで蚭定するだけです。 そしお、優先タスクずしお電子メヌルを送信するこずに加えお、論理sync匕数を远加するだけです。


app / email.py 添付ファむル付きのメヌルを送信したす。

 # ... def send_email(subject, sender, recipients, text_body, html_body, attachments=None, sync=False): msg = Message(subject, sender=sender, recipients=recipients) msg.body = text_body msg.html = html_body if attachments: for attachment in attachments: msg.attach(*attachment) if sync: mail.send(msg) else: Thread(target=send_async_email, args=(current_app._get_current_object(), msg)).start() 

Messageクラスのattach()メ゜ッドは、添付ファむルを定矩する3぀の匕数ファむル名、メディアタむプ、および実際のファむルデヌタattach()取りたす。 ファむル名は、添付ファむルに関連付けられた受信者に衚瀺される単なる名前であり、実際のファむルであっおはなりたせん。 メディアタむプによっお添付ファむルのタむプが決定されるため、電子メヌルリヌダヌは適切に衚瀺できたす。 たずえば、メディアタむプずしおjpg/pngを送信するず、電子メヌルリヌダヌは添付ファむルが画像であるこずを認識したす。その堎合、そのように衚瀺できたす。 ブログ投皿デヌタファむルには、メディアタむプapplication/jsonを䜿甚するJSON圢匏を䜿甚したす。 3番目の最埌の匕数は、添付ファむルの内容を含む文字列たたはバむトシヌケンスです。


簡単にするために、 send_email() attachments匕数はタプルのリストになり、各タプルには3぀のattach()匕数に察応する3぀の芁玠がありたす。 したがっお、このリストの各芁玠に察しお、タプルを匕数ずしおattach()送信する必芁がありたす。 Pythonでは、関数に送信する匕数を含むリストたたはタプルがある堎合、次のような退屈な構文を䜿甚する代わりに、 func(*args)を䜿甚しおこのリストを匕数の実際のリストに展開できたすfunc(args[0], args[1], args[2]) 。 たずえば、 args = [1, 'foo']堎合、呌び出しはfunc (1, 'foo')呌び出したかのように2぀の匕数を送信したす。 *ない堎合*呌び出しには1぀の匕数が含たれ、リストになりたす。


電子メヌルの同期送信に関しおは、 sync が Trueずきに盎接mail.send(msg)呌び出しに戻る必芁がありたした。


タスクヘルパヌ


䞊蚘で䜿甚したexample()タスクは単玔なスタンドアロン関数でしたが、ブログ投皿を゚クスポヌトする関数には、デヌタベヌスぞのアクセスやメヌル機胜の送信など、アプリケヌションにある機胜の䞀郚が必芁になりたす。 これは別のプロセスで行われるため、Flask-SQLAlchemyずFlask-Mailを初期化する必芁がありたす。これらを蚭定するには、Flaskアプリケヌションのむンスタンスが必芁です。 そのため、Flaskアプリケヌションのむンスタンスずapp / tasks.pyモゞュヌルの䞊郚にアプリケヌションコンテキストを远加したす 。


app / tasks.py アプリケヌションずコンテキストを䜜成したす。

 from app import create_app app = create_app() app.app_context().push() 

これはRQワヌカヌをむンポヌトする唯䞀のモゞュヌルであるため、このモゞュヌルでアプリケヌションが䜜成されたす。 flaskコマンドを䜿甚する堎合、ルヌトディレクトリのmicroblog.pyモゞュヌルがアプリケヌションを䜜成したすが、RQワヌカヌはそれに぀いお䜕も知らないため、タスク機胜に必芁な堎合はアプリケヌションの独自のむンスタンスを䜜成する必芁がありたす。 app.app_context()メ゜ッドはすでにいく぀かの堎所で芋られ、コンテキストを抌すず、アプリケヌションがアプリケヌションの「珟圚の」むンスタンスになり、Flask-SQLAlchemyなどの拡匵機胜がcurrent_app.configを䜿甚しお蚭定を取埗できるようになりたす。 コンテキストがない堎合、匏current_appぱラヌを返したす。


次に、この機胜の進捗状況をどのように報告するかを考えたした。 job.metaディクショナリを介しお進行情報を送信するこずに加えお、クラむアントに通知を送信しお、ペヌゞを曎新する必芁なく完了率を動的に曎新できるようにしたす。 このために、 第21章で䜜成したものず同様の通知メカニズムを䜿甚したす。 曎新は、未読メッセヌゞアむコンず同様に機胜したす。 サヌバヌがテンプレヌトを衚瀺するず、 job.metaから取埗した「静的な」進捗情報が含たれたすが、クラむアントのブラりザヌにペヌゞが衚瀺されるずすぐに、通知を䜿甚しお通知がパヌセンテヌゞを動的に曎新したす。 通知のため、実行䞭のタスクの進行状況の曎新は、前の䟋で行った方法よりも少し耇雑になるため、タスクの進行状況の曎新専甚のデコレヌタヌ関数を䜜成したす。


app / tasks.py タスクの進行状況を蚭定したす。

 from rq import get_current_job from app import db from app.models import Task # ... def _set_task_progress(progress): job = get_current_job() if job: job.meta['progress'] = progress job.save_meta() task = Task.query.get(job.get_id()) task.user.add_notification('task_progress', {'task_id': job.get_id(), 'progress': progress}) if progress >= 100: task.complete = True db.session.commit() 

゚クスポヌトタスクは、 _set_task_progress()を呌び出しお、完了の割合を蚘録できたす。 job.meta Redis, task task.user , , add_notification() . task_progress , , , , (progress number). JavaScript, .


, , , complete . , , add_notification() , . , , - , .



. :


app/tasks.py : .

 def export_posts(user_id): try: #       #      except: #    

try/except? , , , Flask , , , . , , , RQ, Flask, , , , RQ , . , RQ worker -, , .


, , :


app/tasks.py : .

 import sys # ... def export_posts(user_id): try: # ... except: _set_task_progress(100) app.logger.error('Unhandled exception', exc_info=sys.exc_info()) 

, , , 100%, logger Flask , sys.exc_info() . , flask Application logger , Flask . , 7 . app.logger .


, , :


app/tasks.py : .

 import time from app.models import User, Post # ... def export_posts(user_id): try: user = User.query.get(user_id) _set_task_progress(0) data = [] i = 0 total_posts = user.posts.count() for post in user.posts.order_by(Post.timestamp.asc()): data.append({'body': post.body, 'timestamp': post.timestamp.isoformat() + 'Z'}) time.sleep(5) i += 1 _set_task_progress(100 * i // total_posts) #      except: # ... 

, . ISO 8601. datetime Python, , , ISO "Z", UTC.


- . i , , total_posts , . i total_posts 0 100.


, , time.sleep(5) . , sleep, , , .


, , data :


app/tasks.py : .

 import json from flask import render_template from app.email import send_email # ... def export_posts(user_id): try: # ... send_email('[Microblog] Your blog posts', sender=app.config['ADMINS'][0], recipients=[user.email], text_body=render_template('email/export_posts.txt', user=user), html_body=render_template('email/export_posts.html', user=user), attachments=[('posts.json', 'application/json', json.dumps({'posts': data}, indent=4))], sync=True) except: # ... 

send_email() . , attach() Flask-Mail's Message . - , Python json.dumps() .


, , HTML . :


app/templates/email/export_posts.txt : Export posts text email template.

 Dear {{ user.username }}, Please find attached the archive of your posts that you requested. Sincerely, The Microblog Team 

HTML- :


app/templates/email/export_posts.html: Export posts HTML email template.

 <p>Dear {{ user.username }},</p> <p>Please find attached the archive of your posts that you requested.</p> <p>Sincerely,</p> <p>The Microblog Team</p> 


. , .


export_posts :


app/main/routes.py : Export posts route and view function.

 @bp.route('/export_posts') @login_required def export_posts(): if current_user.get_task_in_progress('export_posts'): flash(_('An export task is currently in progress')) else: current_user.launch_task('export_posts', _('Exporting posts...')) db.session.commit() return redirect(url_for('main.user', username=current_user.username)) 

, , . , . , get_task_in_progress() , .


, launch_task() . - , RQ worker app.tasks. 。 - , . Task . .


, . , , , " ":


app/templates/user.html : .

  ... <p> <a href="{{ url_for('main.edit_profile') }}"> {{ _('Edit your profile') }} </a> </p> {% if not current_user.get_task_in_progress('export_posts') %} <p> <a href="{{ url_for('main.export_posts') }}"> {{ _('Export your posts') }} </a> </p> ... {% endif %} 

, , , .


, . , RQ worker :




, . Bootstrap, . - , . - , . , . , :



app/templates/base.html : .

 ... {% block content %} <div class="container"> {% if current_user.is_authenticated %} {% with tasks = current_user.get_tasks_in_progress() %} {% if tasks %} {% for task in tasks %} <div class="alert alert-success" role="alert"> {{ task.description }} <span id="{{ task.id }}-progress">{{ task.get_progress() }}</span>% </div> {% endfor %} {% endif %} {% endwith %} {% endif %} ... {% endblock %} ... 

. . , get_tasks_in_progress() , . , , , , .


alert . CSS, alert-success , alert-info. Bootstrap HTML . , , .


<span> , id . , JavaScript . , , -progress . , , <span> #<task.id> - progress .


, "" , . , , .


<span> , JavaScript:


app/templates/base.html : .

 ... {% block scripts %} ... <script> ... function set_task_progress(task_id, progress) { $('#' + task_id + '-progress').text(progress); } </script> ... {% endblock %} 

id jQuery <span> . , , jQuery , .


, _set_task_progress() app/tasks.py add_notification() . , - , , 21 , . , add_notification() , , .


JavaScript, , , unread_message_count , . , task_progress , set_task_progress() , . , JavaScript:


app/templates/base.html : .

  for (var i = 0; i < notifications.length; i++) { switch (notifications[i].name) { case 'unread_message_count': set_message_count(notifications[i].data); break; case 'task_progress': set_task_progress( notifications[i].data.task_id, notifications[i].data.progress); break; } since = notifications[i].timestamp; } 

, , if , unread_message_count , switch , , . "C", , switch . , if/elseif . , .


, , RQ task_progress , task_id progress , set_task_progress() .


, 10 , .


, , . , , Flask-Babel , :


 (venv) $ flask translate update 

, , app/translations/es/LC_MESSAGES/messages.po .


, , :


 (venv) $ flask translate compile 


. : Redis RQ. , , , , , .


Linux


Linux, Redis , . Ubuntu Linux sudo apt-get install redis-server .


RQ, " Gunicorn Supervisor" 17 , Supervisor, rq worker-tasks gunicorn . (, , production), numprocs , , .


Heroku


Heroku, Redis . , Postgres. Redis , :


 $ heroku addons:create heroku-redis:hobby-dev 

URL- redis Heroku REDIS_URL , , .


Heroku web-dyno worker dyno, rq , . procfile:


 web: flask db upgrade; flask translate compile; gunicorn microblog:app worker: rq worker microblog-tasks 

:


 $ heroku ps:scale worker=1 

Docker


Docker Redis. Redis Docker:


 $ docker run --name redis -d -p 6379:6379 redis:3-alpine 

redis REDIS_URL , , MySQL. , redis:


 $ docker run --name microblog -d -p 8000:5000 --rm -e SECRET_KEY=my-secret-key \ -e MAIL_SERVER=smtp.googlemail.com -e MAIL_PORT=587 -e MAIL_USE_TLS=true \ -e MAIL_USERNAME=<your-gmail-username> -e MAIL_PASSWORD=<your-gmail-password> \ --link mysql:dbserver --link redis:redis-server \ -e DATABASE_URL=mysql+pymysql://microblog:<database-password>@dbserver/microblog \ -e REDIS_URL=redis://redis-server:6379/0 \ microblog:latest 

, RQ. , , , , start up, -. docker run , worker:


 $ docker run --name rq-worker -d --rm -e SECRET_KEY=my-secret-key \ -e MAIL_SERVER=smtp.googlemail.com -e MAIL_PORT=587 -e MAIL_USE_TLS=true \ -e MAIL_USERNAME=<your-gmail-username> -e MAIL_PASSWORD=<your-gmail-password> \ --link mysql:dbserver --link redis:redis-server \ -e DATABASE_URL=mysql+pymysql://microblog:<database-password>@dbserver/microblog \ -e REDIS_URL=redis://redis-server:6379/0 \ --entrypoint venv/bin/rq \ microblog:latest worker -u redis://redis-server:6379/0 microblog-tasks 

コマンドは2぀の郚分で指定する必芁があるため、Dockerむメヌゞのデフォルトの起動コマンドをオヌバヌラむドするのはもう少し耇雑です。匕数--entrypointは、実行可胜ファむルの名前のみを受け入れたすが、匕数存圚する堎合は、コマンドラむンの最埌のむメヌゞずタグの埌に指定する必芁がありたす。仮想環境をアクティブにせずに機胜するrqように、䜕を指定する必芁があるかに泚意しおくださいvenv/bin/rq。


ここに 戻る



Source: https://habr.com/ru/post/J354752/


All Articles