मुझे लगता है कि अधिकांश अजगर प्रोग्रामर पहले से ही कुछ हद तक सेलेरी से परिचित हैं। पहले भाग में मैं आपको बताऊंगा कि अजवाइन के बिना RabbitMQ का उपयोग कैसे करें, और दूसरे भाग में - अजवाइन 3.0 की नई विशेषताओं का एक संक्षिप्त अवलोकन।
आप यहां Django-Celery-RabbitMQ बंडल स्थापित करने के बारे में पढ़ सकते हैं।
RabbitMQ के उपयोग के बारे में अच्छी तरह से
यहाँ लिखा गया
है , और
यहाँ , अच्छी तरह से, RabbitMQ वेबसाइट पर।
संक्षेप में स्थापना और कॉन्फ़िगरेशन को याद रखें:
RabbitMQ:
sudo apt-get install rabbitmq-server
उपयोगकर्ता जोड़ें:
$ rabbitmqctl add_user myuser mypassword $ rabbitmqctl add_vhost '/' $ rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"
संक्षेप में अजवाइन, खरगोश की रूपरेखाएँ:
सेटिंग्स में
import djcelery os.environ["CELERY_LOADER"] = "django" djcelery.setup_loader() AMQP_HOST = 'localhost' BROKER_HOST='localhost' BROKER_PORT = 5672 BROKER_VHOST = "/" BROKER_USER = "myuser" BROKER_PASSWORD = "mypassword" INSTALLED_APPS+='djcelery'
कथन: एक छोटे से कार्य को अतुल्यकालिक बनाने के लिए अजवाइन का उपयोग करना बिल्कुल भी आवश्यक नहीं है। आप RabbitMQ के द्वारा प्राप्त कर सकते हैं।
सबूत:
चलो विपरीत से शुरू करते हैं:
कार्य: निर्दिष्ट प्रेषक से एक पत्र की उपस्थिति के लिए ईमेल की जांच करें, यदि कोई पत्र नहीं है, तो एक मिनट में चेक दोहराएं, अगर वहाँ है, तो आगे बढ़ें (उदाहरण के लिए इसे पार्स करें ...)
हम poplib, ईमेल का उपयोग करते हैं।
हम एक फ़ंक्शन लिखेंगे जो पहले से दिए गए प्रेषक से ईमेल प्राप्त करता है और इसे टास्क डेकोरेटर के साथ लपेटता है
फ़ंक्शन एक ईमेल पता, पासवर्ड और ईमेल पता स्वीकार करता है, जिसमें से पत्र आना चाहिए और स्थिति (ओके, त्रुटि) और संदेश लौटाता है
कार्यों में
from celery.task import task, periodic_task from celery.task.schedules import crontab import poplib import email @task def mail_content(user_mail, mail_pass, mail_from): mail_server = 'pop.'+user_mail.split('@')[1] mail_login = user_mail.split('@')[0] p = poplib.POP3(mail_server) print p.getwelcome() try: p.user(mail_login) p.pass_(mail_pass) except poplib.error_proto: return 'Error', 'Email is blocked' try: print p.list() except: return 'Error', 'dont receive list of mails' numMessages = len(p.list()[1]) print numMessages for i in range(numMessages): m = email.message_from_string("\n".join(p.top(i+1, 1)[1])) try: this_email_from = m['From'] if this_email_from.find(mail_from) >= 0: print this_email_from m = email.message_from_string('\n'.join(p.retr(i+1)[1])) content = get_text_body(m) print content return 'Ok', content else: pass except Exception, e: return 'Error', unicode(e, 'utf8') raise mail_content.retry(exc=e, countdown=30)
कोड की अंतिम पंक्ति 30 सेकंड के बाद कार्य को पुनः आरंभ करने का वर्णन करती है यदि संदेश नहीं मिला।
अब हम इस तरह कार्य चला सकते हैं:
>>>res = mail_content.delay('user@domen', 'password', 'email_from@domen.email.from')
इस मामले में, निष्पादन तुरंत या इस तरह शुरू होगा:
>>>res = mail_content.apply_async(('user@domen', 'password', 'email_from@domen.email.from'), countdown=30)
इस मामले में, निष्पादन 30 सेकंड के बाद शुरू होगा।
(पहले आपको अजवाइन सर्वर शुरू करने की आवश्यकता है:
अजगर प्रबंधन के लिए अजवाइन
और एक अन्य विंडो रन शेल में:
अजगर प्रबंधन ओरेकल शेल,
और पहले से ही शेल इन कमांड को कॉल करने के लिए आया)
हम करके परिणाम प्राप्त कर सकते हैं
>>>res.get() () >>>res.info
(यदि कोई परिणाम अभी तक नहीं हुआ है और यदि कोई है तो कोई नहीं लौटाता)
लेकिन यह जांचने के लिए कि क्या कोई परिणाम हमेशा सुविधाजनक नहीं होता है और हमेशा इसका मतलब है अनावश्यक क्रियाएं करना।
कार्य पूरा होने के बाद किसी फ़ंक्शन को कॉल करने के लिए, कॉलबैक लागू किया जा सकता है। यदि आपके पास अजवाइन स्थापित है और आप कार्य को कार्य को स्वीकार करने के लिए कार्य कर सकते हैं, तो आप अगली उपधारा के लिए आगे बढ़ सकते हैं। अजवाइन के बिना कौन करना चाहता है, पिका और खरगोश पर आधारित कॉलबैक को व्यवस्थित करने का एक तरीका है।
AMQP के साथ काम करने के लिए, पिका पैकेज स्थापित करें:
$ sudo pip install pika==0.9.5
विवरण इस पुस्तकालय और RabbitMQ का उपयोग कर नमस्ते दुनिया
यहाँ वर्णित
हैसज्जाकार में:
import pika import pickle import time importr settings def callback(function_to_decorate): user = settings.BROKER_USER broker_host = settings.BROKER_HOST password = settings.BROKER_PASSWORD credentials = pika.PlainCredentials(user, password) parameters = pika.ConnectionParameters(host=broker_host, credentials=credentials) def receiver(*args, **kw): (backend_function, data) = function_to_decorate(*args, **kw) pickled_obj = pickle.dumps(data) queue_name = str(time.time()) print "call_backend", backend_function.__name__ connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.queue_declare( queue = queue_name) channel.basic_publish(exchange='', routing_key=queue_name, body=pickled_obj) channel.basic_consume( backend_function, queue=queue_name, no_ack = True) channel.queue_delete(queue=queue_name) connection.close() return receiver
यह डेकोरेटर है जिसके साथ हम
कार्य सज्जाकार को लपेटने से पहले mail_content फ़ंक्शन को लपेटेंगे
डेकोरेटर खरगोश के लिए एक संदेश भेजने के लिए अतिरिक्त निर्देशों के साथ हमारे mail_content फ़ंक्शन को लौटाता है
मैं केवल फ़ंक्शन में संपूर्ण फ़ंक्शन को फिर से नहीं लिखूंगा, बस बदल दिया गया है
कार्यशैली में:
from decorators import * from tasks_backend import mail_analizer, mail_error @task @callback def mail_content(...): ... if (...): ... return mail_analizer, (content,) return mail_error, ('error',)
हम फ़ंक्शन को पहले तर्क के रूप में वापस करते हैं, दूसरा उन तर्कों की सूची है जिन्हें हम फ़ंक्शन में पास करना चाहते हैं
कार्यों में_बैकेंडहोम
import tasks def mail_analizer(ch, method, properties, body): email_text = pickle.loads(body) if emai_text.find(u'Hello'): tasks.send_emails.delay(email_text) else: tasks.send_twitter_status.delay(email_text)
ईमेल को स्वीकार कर लिया, इसे पहचान लिया और नए कार्यों का शुभारंभ किया।
ध्यान दें कि तर्क बहुत सुविधाजनक नहीं हैं, इसे ठीक करें:
सज्जाकारों में
def backend(function_to_decorate): def receive(ch, method, properties, body): data=pickle.loads(body) args = data function_to_decorate(*args) return receive
अब हम इस तरह mail_analizer फ़ंक्शन को फिर से लिख सकते हैं:
@backend def mail_analizer(email_text): if emai_text.find(u'Hello'): tasks.send_emails.delay(email_text) else: tasks.send_twitter_status.delay(email_text)
निम्नलिखित कार्यों को चलाने के लिए, डेकोरेटर का उपयोग करें
@callback
mail_content में समान:
@backend @callback def mail_analizer(cont): print cont return send_twitter_status, (cont,)
इस इंटरफ़ेस के साथ फ़ंक्शंस की श्रृंखला के निर्माण का एक सरल उदाहरण:
@callback def first(*args): print first.__name__ print args return senders, args @backend @callback def senders(*args): print args return analizer, args @backend @callback def analizer( *args): print args return ended_fun, args @backend def ended_fun(*args): print ended_fun.__name__ print args
पहला फ़ंक्शन केवल डेकोरेटर द्वारा लपेटा गया है
@callback
क्योंकि वह खरगोश से कुछ नहीं लेती है, और केवल आखिरी होती है
@backend
- क्योंकि वह कुछ भी प्रसारित नहीं करती है।
ध्यान दें कि एक फ़ंक्शन स्वयं कॉल कर सकता है। यह भी ध्यान दें कि बैकएंड डेकोरेटर द्वारा लपेटे गए फ़ंक्शन को केवल खरगोशबिटक से बुलाया जा सकता है।
शुरू करने के लिए, हम एक फ़ंक्शन का उपयोग करते हैं जो केवल कॉलबैक द्वारा लपेटा जाता है।
@callback def runer(*args): return test_func, (args) @backend @callback def test_func( *args): print args return test_func, args
कार्यों का अंतिम संस्करण mail_content, email_analizer, run_email:
@backend @call_backend def mail_content(user_mail, mail_pass, mail_from): mail_server = 'pop.'+user_mail.split('@')[1] mail_login = user_mail.split('@')[0] p = poplib.POP3(mail_server) print p.getwelcome() try: p.user(mail_login) p.pass_(mail_pass) except poplib.error_proto: return mail_error, 'Email is blocked' try: print p.list() except: return mail_error, 'dont receive list of mails' numMessages = len(p.list()[1]) print numMessages for i in range(numMessages): m = email.message_from_string("\n".join(p.top(i+1, 1)[1])) try: this_from = m['From'] this_from = this_from.decode('cp1251').split('<'.decode('cp1251'))[1] if this_from.find(mail_from) >= 0: print m['From'] m = email.message_from_string('\n'.join(p.retr(i+1)[1])) content = get_text_body(m) print content return email_analizer, (content, email_from) else: pass except Exception, e: return email_error, (unicode(e, 'utf8'),) return mail_content, (user_mail, mail_pass, mail_from) @backend @call_backend def email_analizer(content, email_from): if content.find(u'Hello'): email_to = email_from text=u'Hello, my dear friend' return send_mail, (email_to, text) return send_twitter_status, (cont,) @call_backend def run_email(): ''' , , email, password, email_from ''' return mail_content, (email, password, email_from)
उप-योग:
मुझे उम्मीद है कि कुछ भी जटिल नहीं था। इसका उपयोग अजवाइन के बजाय किया जा सकता है, उदाहरण के लिए यदि आपके पास एक छोटा कार्य (कार्य) है।
अजवाइन 3.0 के साथ यह कैसे करें
3.0 अजवाइन में, आप उस कार्य के नाम को पास कर सकते हैं, जिसे आप कार्य के लिए कार्य (कार्य) के लिए पास करना चाहते हैं
प्रलेखन से उदाहरण:
@celery.task def add(x, y): return x + y add.apply_async((2, 2), link=add.s(16))
जहाँ Add हमारा कार्य (कार्य) है, add.s वह सबटैस्क (सबटैस्क) है, जो ऐड (2, 2) के बाद शुरू होता है, निष्पादित किया जाता है, सबस्कॉक का पहला तर्क ऐड (2, 2) का परिणाम है, दूसरा तर्क 16: कुल है। (२ + २) + १६ = २०।
यहां उपमा क्या है
हमारे कार्य के संबंध में, हम mail_analizer कार्य फ़ंक्शन बनाते हैं, एक तर्क - सामग्री छोड़ते हैं, @call_backend डेकोरेटर को हटाते हैं और इसे इस तरह कहते हैं:
>>> mail_content.apply_async (mail_addres, mail_password, email_from, लिंक = mail_analizer.s ())
जब कोई त्रुटि उत्पन्न होती है, तो केस के लिए link_error चर भी प्रदान किया जाता है।
इसके बारे में
यहाँ और पढ़ें
।अजवाइन 3.0 के अलावा दिखाई दिया:
समूह
समूह, उन कार्यों की सूची को स्वीकार करता है जिन्हें समानांतर में लागू किया जाना चाहिए:
प्रलेखन से उदाहरण:
>>> from celery import group >>> res = group(add.s(i, i) for i in xrange(10))() >>> res.get(timeout=1) [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] >>> g = group(add.s(i, i) for i in xrange(10)) >>> g.apply_async()
श्रृंखला:
जंजीरों को बुलाओ
अब कार्यों को जंजीरों में कहा जा सकता है, उदाहरण के लिए:
>>> from celery import chain @task def mul(x,y): return x*y @task def div(x,y): return x/y
तार
एकॉर्ड:
समानांतर में किए जाने वाले कार्यों की एक सूची को स्वीकार करता है और एक कार्य जो कार्यों की सूची के निष्पादन के परिणामों की एक सूची को स्वीकार करता है। यह मुड़ा हुआ है।
@task def xsum(res_list): return sum(res_list) >>> from celery import chord >>> res = chord((add.s(i, i) for i in xrange(10)), xsum.s())() >>> res.get() 90
चेन (समूह) का उपयोग करके हमें कॉर्ड मिलता है:
>>> c3 = (group(add.s(i, i) for i in xrange(10) | xsum.s())) >>> res = c3() >>> res.get() 90
नक्शा
नक्शे की तरह (मज़ेदार, [1,2,3])
res=task.map([1,2]) res=[task(1), task(2)]
Starmap
res=add.starmap([(1,2), (2,4)]) res=[add(1,2), add(2,4)]
chuncs
विभिन्न कार्यों में तर्कों की लंबी सूची तोड़ता है,
>>> from proj.tasks import add >>> res = add.chunks(zip(range(100), range(100)), 10)() >>> res.get() [[0, 2, 4, 6, 8, 10, 12, 14, 16, 18], [20, 22, 24, 26, 28, 30, 32, 34, 36, 38], [40, 42, 44, 46, 48, 50, 52, 54, 56, 58], [60, 62, 64, 66, 68, 70, 72, 74, 76, 78], [80, 82, 84, 86, 88, 90, 92, 94, 96, 98], [100, 102, 104, 106, 108, 110, 112, 114, 116, 118], [120, 122, 124, 126, 128, 130, 132, 134, 136, 138], [140, 142, 144, 146, 148, 150, 152, 154, 156, 158], [160, 162, 164, 166, 168, 170, 172, 174, 176, 178], [180, 182, 184, 186, 188, 190, 192, 194, 196, 198]]
उप-योग:
अजवाइन 3.0 बहुत ही उपयोगी बन्स प्रदान करता है जो उपयोग करने पर एक खुशी है।
परिणाम:
अजवाइन कई सुविधाजनक उपकरण प्रदान करता है, लेकिन छोटे कार्यों के लिए जहां इन कार्यों का 90% आवश्यक नहीं है, यह संदेश कतार (खरगोश) के साथ करना संभव है, इस प्रकार अजवाइन को कॉन्फ़िगर करने की आवश्यकता को समाप्त करना, सर्वर पर लोड को कम करना, और अतिरिक्त परियोजना निर्भरता से छुटकारा पाना है।
ध्यान देने के लिए आप सभी का धन्यवाद।