ã¿ãªããããã«ã¡ã¯ïŒ
ãZeroMQ.ZeroMQã䜿çšããããŸããŸãªã¡ãã»ãŒãžãã¿ãŒã³ãé©çšããæ¹æ³ãåŠã¶ããšããæ¬ã®
ç¡æ翻蚳ãç¶ããŸãã ç¶ç·šãããŸãé·ãå
¬éããªãã£ãããšãäºåã«è¬çœªããŸããã圌ããèšãããã«ïŒãæ azineã¯ç§ãã¡ã®åã«çãŸããŸãã...ãã ããŠãæè©ã¯ããŠãããŸãããã
å
容
- 第1ç« ïŒã¯ããã«
- 第2ç« ïŒãœã±ããã®çŽ¹ä»
- 第3ç« ïŒãœã±ããããããžã®äœ¿çš
- 第4ç« ïŒè¿œå ã®ãã¿ãŒã³
åã®ç« ã§ZeroMQã®åºæ¬æ§é ã調ã¹ãåŸãããã§ãœã±ãããæ€èšããŸãã
- ã¯ã©ã€ã¢ã³ã/ãµãŒããŒãã¿ãŒã³ïŒãããªãã·ã¥/ãµãã¹ã¯ã©ã€ãïŒ
- ãã€ãã©ã€ã³ãã¿ãŒã³
ãããªãã·ã¥/ãµãã¹ã¯ã©ã€ããã¿ãŒã³
æåã«ããµãŒããŒãç¹å®ã®ã¯ã©ã€ã¢ã³ãã®ãªã¹ãã«ã¡ãã»ãŒãžãéä¿¡ãããšãã®é
ä¿¡ã®æ§è³ªã«ãã£ãŠäžæ¹åã§ãããã¯ã©ã€ã¢ã³ããµãŒããŒãã¿ãŒã³ïŒãããªãã·ã¥ãµãã¹ã¯ã©ã€ãïŒãšããå€å
žçãªãã¿ãŒã³ã玹ä»ããŸãããã ããã¯1察å€ã®ã¢ãã«ã§ãã ãã®ãã¿ãŒã³ã®äž»ãªã¢ã€ãã¢ã¯ããµãŒããŒãã¡ãã»ãŒãžãéä¿¡ããæ¥ç¶ãããã¯ã©ã€ã¢ã³ãããã®ã¡ãã»ãŒãžãåä¿¡ããäžæ¹ã§ãåæãããã¯ã©ã€ã¢ã³ãã¯åã«ãããã¹ãããããããšã§ãã ãµãŒããŒã¯ã¯ã©ã€ã¢ã³ãã«ç·©ããã«æ¥ç¶ãããŠãããã¯ã©ã€ã¢ã³ããååšãããã©ããã¯ãŸã£ããæ°ã«ããŸããã ããã¯ããã¬ããã£ã³ãã«ãã©ãžãªå±ã®ä»çµã¿ã«äŒŒãŠããŸãã ãã¬ããã£ã³ãã«ã¯åžžã«ãã¬ãçªçµãæŸéããèŠèŽè
ã®ã¿ããã®æŸéãåãå
¥ãããã©ããã決å®ããŸãã é©åãªæéãéããšããæ°ã«å
¥ãã®ãã¬ãçªçµãèŠãããšãã§ããªããªããŸãïŒTiVoãªã©ããªãå Žåã¯ãé²ç»ãçºæãããŠããªãäžçã§ã¹ã¯ãªãããè¡ããããšä»®å®ããŸãããïŒã ãããªãã·ã¥/ãµãã¹ã¯ã©ã€ããã³ãã¬ãŒãã®å©ç¹ã¯ãåçãªãããã¯ãŒã¯ããããžãæäŸããããšã§ãã
ã¯ã©ã€ã¢ã³ã/ãµãŒããŒã¢ãã«ã¯ã次ã®äž»ãªåŽé¢ãã衚瀺ã§ããŸãã
- å
¬éïŒæçš¿ã¯äœæè
ã«ãã£ãŠæçš¿ãããŸã
- éç¥ïŒã¯ã©ã€ã¢ã³ãã«ã¡ãã»ãŒãžã®å
¬éãéç¥ãããŸã
- ãµãã¹ã¯ã©ã€ãïŒæ°ãã眲åãã¯ã©ã€ã¢ã³ãã«çºè¡ãããŸã
- ç»é²è§£é€ïŒã¯ã©ã€ã¢ã³ãã¯æ¢åã®çœ²åãåé€ããŸã
ç¶æ³ãæ確ã«ããããã«äŸãèŠãŠã¿ãŸãããã 蚌åžååŒããã°ã©ã ãäœæããã·ããªãªãèããŠã¿ãŸãããã ãããŒã«ãŒãããŠã圌ãã¯åžå Žã§ã©ã®ãããªè¡åãèµ·ãã£ãŠããã®ãç¥ããããšæã£ãŠããŸãã ç§ãã¡ã®ãµãŒããŒã¯æ ªåŒåžå Žã«ãªãããããŒã«ãŒã¯é¡§å®¢ã«ãªããŸãã
å®éã®æ ªåŒåžå ŽäŸ¡æ Œã®ä»£ããã«ãæ°åã®æ°å€ãçæããã ãã§ãã
ã³ãŒãã«ç§»ãåã«ããŸããã¯ã©ã€ã¢ã³ããµãŒããŒã¢ãã«ãã©ã®ããã«èŠããããèŠãŠã¿ãŸãããã

äœæè
ïŒãµãŒããŒïŒã®ã³ãŒãã¯æ¬¡ã®ãšããã§ãã
#include <string.h> #include "zmq.h" int main (int argc, char const *argv[]) { void* context = zmq_ctx_new(); void* publisher = zmq_socket(context, ZMQ_PUB); printf("Starting server...\n"); int conn = zmq_bind(publisher, "tcp://*:4040"); const char* companies[2] = {"Company1", "Company2"}; int count = 0; for(;;) { int price = count % 2; int which_company = count % 2; int index = strlen(companies[0]); char update[12]; snprintf(update, sizeof update, "%s", companies[which_company]); zmq_msg_t message; zmq_msg_init_size(&message, index); memcpy(zmq_msg_data(&message), update, index); zmq_msg_send(&message, publisher, 0); zmq_msg_close(&message); count++; } zmq_close(publisher); zmq_ctx_destroy(context); return 0; }
ã¯ã©ã€ã¢ã³ãã³ãŒãã¯æ¬¡ã®ãšããã§ãã
#include <stdlib.h> #include <string.h> #include "zmq.h" int main (int argc, char const *argv[]) { void* context = zmq_ctx_new(); void* subscriber = zmq_socket(context, ZMQ_SUB); printf("Collecting stock information from the server.\n"); int conn = zmq_connect(subscriber, "tcp://localhost:4040"); conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, 0, 0); int i; for(i = 0; i < 10; i++) { zmq_msg_t reply; zmq_msg_init(&reply); zmq_msg_recv(&reply, subscriber, 0); int length = zmq_msg_size(&reply); char* value = malloc(length); memcpy(value, zmq_msg_data(&reply), length); zmq_msg_close(&reply); printf("%s\n", value); free(value); } zmq_close(subscriber); zmq_ctx_destroy(context); return 0; }
SUBãœã±ããã䜿çšããå Žåã¯åžžã«ã
zmq_setsockopt ()
ããã³
subscribeis
ã䜿çšããŠçœ²åãèšå®ããå¿
èŠããããŸããããããªããšãã¡ãã»ãŒãžãåä¿¡ããŸããã ããã¯éåžžã«ããããééãã§ãã
ã¯ã©ã€ã¢ã³ãã¯ãæŽæ°ãããããã®ãµãã¹ã¯ãªãã·ã§ã³ãšäžèŽããå Žåã«åä¿¡ããã¡ãã»ãŒãžã®ããããã«å€ãã®çœ²åãèšå®ã§ããŸãã ãŸããç¹å®ã®ãµãã¹ã¯ãªãã·ã§ã³ãæåŠããå ŽåããããŸãã ãµãã¹ã¯ãªãã·ã§ã³ã¯åºå®é·ã§ãã
ã¯ã©ã€ã¢ã³ãã¯
zmq_msg_recv
ïŒïŒã䜿çšããŠã¡ãã»ãŒãžãåä¿¡ããŸãã
zmq_msg_recv
ïŒïŒã¯ãã¡ãã»ãŒãžãåä¿¡ããŠââä¿åããŸãã åã®ã¡ãã»ãŒãžãããã°ãã¢ã³ããŒããããŸãã
int zmq_msg_recv (zmq_msg_t *msg, void *socket, int flags);
ãªãã·ã§ã³ãã©ã°ã¯ã
ZMQ_DONTWAIT
1ã€ã®å€ã®ã¿ãåãããšãã§ããŸãã ãã©ã°ã
ZMQ_DONTWAIT
å Žåãæäœã¯éãããã¯ã¢ãŒãã§å®è¡ãããŸãã ã¡ãã»ãŒãžãåä¿¡ãããšã
zmq_msg_recv
ïŒïŒã¯ã¡ãã»ãŒãžã®ãµã€ãºããã€ãåäœã§è¿ããŸãããã以å€ã®å Žåã¯-1ãšãšã©ãŒã¡ãã»ãŒãžãã©ã°ãè¿ãããŸãã
ã¯ã©ã€ã¢ã³ã/ãµãŒããŒã¢ãã«ã¯éåæã§ããã
SUB
ãœã±ããã«ã¡ãã»ãŒãžãéä¿¡ãããšãšã©ãŒãçºçããŸãã
zmq_msg_send
ïŒïŒãåŒã³åºããŠã¡ãã»ãŒãžãéä¿¡ã§ããŸãããPUBãœã±ããã§
zmq_msg_recv
ïŒïŒãåŒã³åºããªãã§ãã ããã
以äžã¯ãã¯ã©ã€ã¢ã³ãåŽã®åºåã®äŸã§ãã
Company2 570 Company2 878 Company2 981 Company2 783 Company1 855 Company1 524 Company2 639 Company1 984 Company1 158 Company2 145
ãµãŒããŒã¯ãã¯ã©ã€ã¢ã³ãããªããŠãåžžã«ã¡ãã»ãŒãžãéä¿¡ããŸãã ããããã¹ãããçµæãèŠãããšãã§ããŸãã ãããè¡ããšã次ã®ãããªãã®ã衚瀺ãããŸãã
Sending... Company2 36 Sending... Company2 215 Sending... Company2 712 Sending... Company2 924 Sending... Company2 721 Sending... Company1 668 Sending... Company2 83 Sending... Company2 209 Sending... Company1 450 Sending... Company1 940 Sending... Company1 57 Sending... Company2 3 Sending... Company1 100 Sending... Company2 947
Company1ããŸãã¯ååãåŒæ°ãšããŠæå®ããå¥ã®äŒç€Ÿã®çµæãååŸãããšããŸãã ãã®å Žåãã¯ã©ã€ã¢ã³ãããã°ã©ã ã次ã®ããã«å€æŽããå¿
èŠããããŸãã
åºåã«ã¯æ¬¡ã®ãããªãã®ãå«ãŸããŸãã
Company1 575 Company1 504 Company1 513 Company1 584 Company1 444 Company1 1010 Company1 524 Company1 963 Company1 929 Company1 718
ã¡ãã»ãŒãžãã£ã«ã¿ãªã³ã°
ã¡ã€ã³ã®èšŒåžååŒæã¢ããªã±ãŒã·ã§ã³ã¯ã顧客ã«ã¡ãã»ãŒãžãéä¿¡ããŸãã ãã¹ãŠã®ã¡ãã»ãŒãžã¯æåŸ
ã©ããã«é
ä¿¡ãããããã§ãã æ®å¿µãªããããããŸããã
ãµãŒããŒã³ãŒãã次ã®ããã«å€æŽããŸãããã
ã¯ã©ã€ã¢ã³ãã³ãŒãã次ã®ããã«å€æŽããŸãããã
ãã®å Žåãåºåã¯æ¬¡ã®ããã«ãªããŸãã
Collecting stock information from the server. Company101 950 Company10 707 Company101 55 Company101 343 Company10 111 Company1 651 Company10 287 Company101 8 Company1 889 Company101 536
ç§ãã¡ã®ã¯ã©ã€ã¢ã³ãã³ãŒãã¯ãCompany1ã®çµæãèŠãããšã¯ã£ãããšèšã£ãŠããŸãã ãã ãããµãŒããŒã¯Company10ãšCompany101ã®çµæãå床éä¿¡ããŸãã ãã¡ãããããã¯ç§ãã¡ãæããã®ã§ã¯ãããŸããã ãã®å°ããªåé¡ã解決ããå¿
èŠããããŸãã
å°ãããã¯ããŠç®çã®ãã®ãååŸã§ããŸãããã»ãã¬ãŒã¿ã䜿çšããæ¹ãç°¡åã§ãã
ã¯ã©ã€ã¢ã³ãã³ãŒããšãµãŒããŒã³ãŒãã®äž¡æ¹ã§ããã€ãã®å€æŽãè¡ãå¿
èŠããããŸããã»ãã¬ãŒã¿ã䜿çšããŠäŒç€Ÿåããã£ã«ã¿ãªã³ã°ããŸãã
以äžã¯ã以åã®åé¡ãä¿®æ£ããæŽæ°ããããµãŒããŒã³ãŒãã§ãã 匷調衚瀺ãããè¡ã«æ³šæããŠãã ããããããã¯ãã»ãã¬ãŒã¿ã䜿çšããŠé¡§å®¢ã«ã¡ãã»ãŒãžãéä¿¡ããæ¹æ³ã瀺ããŠããŸãã
æŽæ°ãããã¯ã©ã€ã¢ã³ãã³ãŒããèŠãŠãçµæããã£ã«ã¿ãªã³ã°ããŸãã
ã¯ã©ã€ã¢ã³ããšãµãŒããŒã®ã³ãŒãã«å ããããå€æŽã®åŸãäºæ³ãããçµæãæ£ç¢ºã«èŠãããšãã§ããŸãã
ãœã±ãããªãã·ã§ã³
ã¯ã©ã€ã¢ã³ããµãŒããŒã¢ãã«ã䜿çšããããã
ZMQ_SUBSCRIBE
ãšããååã®ãã©ã¡ãŒã¿ãŒã䜿çšããŸãã
int conn = zmq_connect(subscriber, "tcp://localhost:4040"); conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, option_value, strlen(option_value));
ãœã±ãããªãã·ã§ã³ã¯ãzmq_setsockoptïŒïŒé¢æ°ã§èšå®ãããŸãã 次ã®4ã€ã®ãã©ã¡ãŒã¿ãŒãå¿
èŠã§ãã
- ãœã±ãã
- ãªãã·ã§ã³å
- ãªãã·ã§ã³å€
- ãªãã·ã§ã³ãµã€ãº
ããã¯ã次ã®è¡ãã確èªã§ããŸãã
int zmq_setsockopt (void *socket, int option_name, const void *option_ value, size_t option_len);
賌èªãã
ZMQ_SUBSCRIBE
ã¯ã
ZMQ_SUB
ãœã±ããã«æ°ããã¡ãã»ãŒãžãäœæããŸãã
option_value
åŒæ°
option_value
空ã§ãªãå Žåã
option_value
ã§å§ãŸããã¹ãŠã®ã¡ãã»ãŒãžããµãã¹ã¯ã©ã€ãããŸãã 1ã€ã®
ZMQ_SUB
ãœã±ããã«è€æ°ã®ãã£ã«ã¿ãŒãæ§æã§ããŸãã
ç»é²è§£é€
ZMQ_UNSUBSCRIBE
ã¯ã
ZMQ_SUB
ãœã±ããããã¡ãã»ãŒãžãåé€ããŸãã è€æ°ã®ãã£ã«ã¿ãŒãæ§æãããŠããå Žåã§ãã1ã€ã®ã¡ãã»ãŒãžã®ã¿ãåé€ããŸãã
ã¯ã©ã€ã¢ã³ããšãµãŒããŒã®ãœã±ããã«ã€ããŠåŠç¿ããªããã°ãªããªãäž»ãªããšã¯ãã¯ã©ã€ã¢ã³ããã¡ãã»ãŒãžã®åä¿¡ãéå§ããã¿ã€ãã³ã°ãããããªãããšã§ãã ãã®å Žåãæåã«ã¯ã©ã€ã¢ã³ããèµ·åãã次ã«ãµãŒããŒãèµ·åããããšããå§ãããŸãã ã¯ã©ã€ã¢ã³ãã¯åžžã«æåã®ã¡ãã»ãŒãžããµãŒããŒãžã®æ¥ç¶ãšããŠèªèãããããå€ãã®æéããããããµãŒããŒã¯ãã®æç¹ã§ãã§ã«ã¡ãã»ãŒãžãéä¿¡ããŠããå¯èœæ§ãããããã§ãã
ãã ããã¯ã©ã€ã¢ã³ããæ¥ç¶ãããŠããªãå Žåã¯ã¡ãã»ãŒãžãéä¿¡ããªãããããµãŒããŒãšã¯ã©ã€ã¢ã³ããåæããæ¹æ³ã«ã€ããŠèª¬æããŸãã
ã¯ã©ã€ã¢ã³ããµãŒããŒã¢ãã«ããŒã
ã¯ã©ã€ã¢ã³ã/ãµãŒããŒã¢ãã«ã§ã¯ã次ã®ç¹ã«æ³šæããå¿
èŠããããŸãã
- TCPã䜿çšãããšãã¡ãã»ãŒãžã¯ãµãŒããŒåŽã§ãã¥ãŒã«å
¥ããããã¯ã©ã€ã¢ã³ãã¯ã¡ãã»ãŒãžã®åä¿¡ãé
ãããŸãã ããããã¢ããªã±ãŒã·ã§ã³ãä¿è·ããæ¹æ³ã瀺ããŸãã
- ã¯ã©ã€ã¢ã³ãã¯è€æ°ã®ãµãŒããŒã«æ¥ç¶ã§ããŸãã ããŒã¿ã¯ãã§ã¢ãã¥ãŒæŠç¥ã䜿çšããŠéä¿¡ãããŸã
- ãµãŒããŒã¯ããã¹ãŠã®ã¡ãã»ãŒãžããã¹ãŠã®ã¯ã©ã€ã¢ã³ãã«éä¿¡ããäžèšã®çºæ¿åžå Žããã°ã©ã ã«ç€ºãããã«ããã£ã«ã¿ãªã³ã°ã¯ã¯ã©ã€ã¢ã³ãåŽã§å®è¡ãããŸã
第4ç« ã§ã¯ãã¯ã©ã€ã¢ã³ããµãŒããŒã¢ãã«ã«æ»ããããè€éãªäŸãæ€èšãããé
ããã¯ã©ã€ã¢ã³ãã«å¯ŸåŠããæ¹æ³ã瀺ããŸãã
ãã€ãã©ã€ã³ãã¿ãŒã³
ãã€ãã©ã€ã³ã¢ãã«ã«ã€ããŠèããŠã¿ãŸãããã ãã€ãã©ã€ã³ãã¿ãŒã³ã¯ããã€ãã©ã€ã³å
ã®é åºä»ããããããŒãéã§ããŒã¿ã転éããŸãã ããŒã¿ã¯ç¶ç¶çã«éä¿¡ãããåã¹ãããã§ãã€ãã¯ããã€ãã®ããŒãã®ããããã«æ¥ç¶ãããŸãã ããŒãéã§ã¯ãåšæçãªããŒã¿è»¢éæŠç¥ã䜿çšãããŸãã ããã¯ãèŠæ±/å¿çãªãã®ã¢ãã«ã«å°ã䌌ãŠããŸãã
æŠç¥ãåå²ããŠåŸæãã
ãã®æŠç¥ãåå²ããŠåŸæããŠãã ãããããã°ã©ã ãããšãã«éãéã¯ãããŸããã ããã°ã©ãã³ã°ã®å匷ãå§ããã°ããã§ãæåž«ããããã»ãšãã©ããŒãžãœãŒãã§äœ¿çšãã1é±éåŸã«ã°ã«ãŒãã®ååãã¯ã©ã¹ãžã®åå ãåæ¢ããããšãæãåºããŠãã ããã ãã®ãã¹ãŠãããèšæ¶ãããŠãããšç¢ºä¿¡ããŠããŸãã ãããŠããã§ããåå²ããŠåŸæããŸãããïŒ
ZeroMQã§äœãå¹³è¡ããŠæžããŸãããã ä¹±æ°ãçæãããžã§ãã¬ãŒã¿ãŒãããã·ããªãªãèããŸãã ãã¥ãŒãã³æ³ã䜿çšããŠãããã®æ°å€ã®å¹³æ¹æ ¹ãèŠã€ããã¯ãŒã«ãŒãããŸãã ãŸããåŽåè
ããçµæãåéããã³ã¬ã¯ã¿ãŒãããŸãã
以äžã¯ãµãŒããŒã³ãŒãã§ãã
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <sys/time.h> #include <time.h> #include "zmq.h" int main (int argc, char const *argv[]) { void* context = zmq_ctx_new(); // This is the socket that we send messages. void* socket = zmq_socket(context, ZMQ_PUSH); zmq_bind(socket, "tcp://*:4040"); // This is the socket that we send batch message. void* connector = zmq_socket(context, ZMQ_PUSH); zmq_connect(connector, "tcp://localhost:5050"); printf("Please press enter when workers are ready..."); getchar(); printf("Sending tasks to workers...\n"); // The first message. It's also the signal start of batch. int length = strlen("-1"); zmq_msg_t message; zmq_msg_init_size(&message, length); memcpy(zmq_msg_data(&message), "-1", length); zmq_msg_send(&message, connector, 0); zmq_msg_close(&message); // Generate some random numbers. srandom((unsigned) time(NULL)); // Send the tasks. int count; int msec = 0; for(count = 0; count < 100; count++) { int load = (int) ((double) (100) * random () / RAND_MAX); msec += load; char string[10]; sprintf(string, "%d", load); } printf("Total: %d msec\n", msec); sleep(1); zmq_close(connector); zmq_close(socket); zmq_ctx_destroy(context); return 0; }
ãŸããNewtonã¡ãœããã䜿çšããŠæ°å€ã®å¹³æ¹æ ¹ãèšç®ããããã®ããã€ãã®èšç®ãè¡ãåŸæ¥å¡ã³ãŒããèŠãŠã¿ãŸãããã
#include <stdlib.h> #include <string.h> #include <unistd.h> #include "zmq.h" double square(double x) { return x * x; } double average(double x, double y) { return (x + y) / 2.0; } double good_enough(double guess, double x) { return abs(square(guess) - x) < 0.000001; } double improve(double guess, double x) { return average(guess, x / guess); } double sqrt_inner(double guess, double x) { if(good_enough(guess, x)) return guess; else return sqrt_inner(improve(guess, x), x); } double newton_sqrt(double x) { return sqrt_inner(1.0, x); } int main (int argc, char const *argv[]) { void* context = zmq_ctx_new(); // Let's initialize a socket to receive messages. void* receiver = zmq_socket(context, ZMQ_PULL); zmq_connect(receiver, "tcp://localhost:4040"); // Let's initialize a socket to send the messages. void* sender = zmq_socket(context, ZMQ_PUSH); zmq_connect(sender, "tcp://localhost:5050"); for(;;) { zmq_msg_t reply; zmq_msg_init(&reply); zmq_msg_recv(&reply, receiver, 0); int length = zmq_msg_size(&reply); char* msg = malloc(length + 1); memcpy(msg, zmq_msg_data(&reply), length); zmq_msg_close(&reply); fflush(stdout); double val = atof(msg); printf("%.1f: %.1f\n", val, newton_sqrt(val)); sleep(1); free(msg); zmq_msg_t message; char* ssend = "T"; int t_length = strlen(ssend); zmq_msg_init_size(&message, t_length); memcpy(zmq_msg_data(&message), ssend, t_length); zmq_msg_send(&message, receiver, 0); zmq_msg_close(&message); } zmq_close(receiver); zmq_close(sender); zmq_ctx_destroy(context); return 0; }
ã³ãŒããšã³ã¬ã¯ã¿ãŒã¯æ¬¡ã®ãšããã§ãã
#include <stdlib.h> #include <string.h> #include "zmq.h" int main (int argc, char const *argv[]) { void* context = zmq_ctx_new(); void* receiver = zmq_socket(context, ZMQ_PULL); zmq_bind(receiver, "tcp://*:5050"); // We receive the first message and discard it since it's the // signal start of batch which is -1. zmq_msg_t reply; zmq_msg_init(&reply); zmq_msg_recv(&reply, receiver, 0); int length = zmq_msg_size(&reply); char* msg = malloc(length + 1); memcpy(msg, zmq_msg_data(&reply), length); zmq_msg_close(&reply); free(msg); int count; for(count = 0; count < 100; count++) { zmq_msg_t reply; zmq_msg_init(&reply); zmq_msg_recv(&reply, receiver, 0); int length = zmq_msg_size(&reply); char* value = malloc(length + 1); memcpy(value, zmq_msg_data(&reply), length); zmq_msg_close(&reply); free(value); if(count / 10 == 0) printf("10 Tasks have been processed."); fflush(stdout); } zmq_close(receiver); zmq_ctx_destroy(context); return 0; }
次ã®å³ã¯ãäžèšã®ã³ãŒãã瀺ããŠããŸãã

ç§ãã¡ã¯äœãæã£ãŠããŸãïŒ
- æåã«ãåŸæ¥å¡ãä»äºãå§ããæéãåæããå¿
èŠããããŸãã åè¿°ã®ããã«ãæ¥ç¶ããã»ã¹ã«ã¯æéãããããŸãã åæããªãå Žåãæåã®åŸæ¥å¡ã¯ã¡ãã»ãŒãžãåä¿¡ããŸãããä»ã®åŸæ¥å¡ã¯æ¥ç¶æ®µéã«ãããŸãã ãããé²ãã«ã¯ãäœæ¥ã®éå§ãåæããŠããã¹ãŠã䞊è¡ããŠåäœããããã«ããå¿
èŠããããŸãã
- ã³ã¬ã¯ã¿ãŒã®PULLãœã±ããã¯ãéçºããããã§ã¢ãã¥ãŒã䜿çšããŠçµæãåãåããŸãïŒ æåã®éšåã§è©³çŽ°ã«èª¬æããŸããïŒã
- PUSHãµãŒããŒãœã±ããã¯ã¿ã¹ã¯ãã¯ãŒã«ãŒã«åçã«éä¿¡ããŸã
- ã¯ãŒã«ãŒã¯ãµãŒããŒãšã³ã¬ã¯ã¿ãŒã®äž¡æ¹ã«æ¥ç¶ãããŠããŸãã å¿
èŠã«å¿ããŠãããå€ãã®åŸæ¥å¡ãéçšã§ããŸãã
ã¯ãŒã«ãŒã¯ãµãŒããŒãšã³ã¬ã¯ã¿ãŒã®äž¡æ¹ã«æ¥ç¶ãããŠãããšè¿°ã¹ãŸããã ãããã®ãªã³ã¯ãããã«è©³ããèŠãŠã¿ãŸãããã
åŸæ¥å¡ã³ãŒããã次ã®è¡ãèŠãŠã¿ãŸãããã
ZMQ_PULLãœã±ãã
å
¥åããããŒããžã®ããŒã¿ãååŸããå Žåã
ZMQ_PULL
ã䜿çšã
ZMQ_PULL
ã ãœã±ããã¿ã€ã
ZMQ_PULL
ããã€ãã©ã€ã³ã®äžæµããŒãããã¡ãã»ãŒãžãåä¿¡ããããã«äœ¿çšãããŸãã åè¿°ããããã«ããã®ããã»ã¹ã¯å
¬å¹³ãªãã¥ãŒèšç»ã䜿çšããŠå®è¡ãããŸãã
ZMQ_PUSHãœã±ãã
äžäœããŒããšéä¿¡ãããå Žåã
ZMQ_PUSH
ã䜿çšã
ZMQ_PUSH
ã ãœã±ããã¿ã€ã
ZMQ_PUSH
ããã€ãã©ã€ã³ã®æ¬¡ã®ããŒãã«ã¡ãã»ãŒãžãéä¿¡ããããã«äœ¿çšãããŸãã
ZMQ_PUSH
ã¯ã¡ãã»ãŒãžãç Žæ£ããŸããã äžæµããŒããäžæµããŒãã«ã¡ãã»ãŒãžãéä¿¡ããæºåãã§ããŠããããåŸè
ãã¡ãã»ãŒãžãåä¿¡ããŠââåŠçããæºåãã§ããŠããªãå Žåã
zmq_send
ïŒïŒã䜿çšããŠéä¿¡ããããã¹ãŠã®ã¡ãã»ãŒãžã¯ãå°ãªããšã1ã€ã®ããŒããã¡ãã»ãŒãžãåä¿¡ã§ããããã«ãªããŸã§ãããã¯ãããŸãã
ZeroMQã³ã³ããã¹ãã®ååŸ
ã»ãšãã©ã®å Žåãåã«ç€ºãããã¹ãŠã®äŸãzmq_ctx_newïŒïŒã§å§ãŸã£ãŠããããšã«æ°ã¥ããã§ãããã ZeroMQã¢ããªã±ãŒã·ã§ã³ã¯ãåžžã«ã³ã³ããã¹ãã®äœæããå§ãŸããŸãã ãã¹ãŠã®ãœã±ããã¯ã1ã€ã®ããã»ã¹ã§ã¹ã¬ãããæ¥ç¶ããæãéãæ¹æ³ã§ããããããœã±ããã®äœæããã»ã¹ã«é¢ä¿ããã³ã³ããã¹ãã䜿çšããŠã1ã€ã®ããã»ã¹å
ã§äœæãããŸãã ZeroMQã³ã³ããã¹ãã¯ã¹ã¬ããã»ãŒãã§ãããããã¹ã¬ããéã§ç°¡åã«è»¢éã§ããŸãã
ZeroMQã³ã³ããã¹ããäœæã§ããªãå ŽåãNULLãè¿ãããŸãã
åå¥ã®ZeroMQã¢ããªã±ãŒã·ã§ã³ãšèŠãªãããè€æ°ã®ã³ã³ããã¹ããäœæã§ãããšããäºå®ã«ãããããããæè¯ã®ã¢ã€ãã¢ã¯ã1ã€ã®ã³ã³ããã¹ããäœæãããããä»ã®ã¹ã¬ããã«è»¢éããããšã§ãã
ZeroMQã³ã³ããã¹ããã¹ãã©ã¯ã¿
åã¢ããªã±ãŒã·ã§ã³ã®æåŸã«ã
zmq_ctx_destroy
ïŒïŒãåŒã³åºããŠäœæããã³ã³ããã¹ããç Žæ£ããå¿
èŠããããŸãã
zmq_ctx_destroy
ïŒïŒãåŒã³åºããåŸããã¹ãŠã®ããã»ã¹ã¯ãšã©ãŒã³ãŒãïŒ
ETERM
ïŒã
zmq_ctx_destroy
ã
zmq_ctx_destroy
ïŒïŒã¯ãœã±ãããéãåŒã³åºãããããã¯ãã
zmq_ctx_destroy
ïŒïŒãåŒã³åºããŠããããéããŸãã
ã¯ãªãŒãã³ã°
PyhtonãJavaãªã©ã®ããã°ã©ãã³ã°èšèªã§ããã°ã©ãã³ã°ããå Žåããããã®èšèªã«ã¯ã¬ããŒãžã³ã¬ã¯ã¿ãŒãçµã¿èŸŒãŸããŠãããããã¡ã¢ãªç®¡çã«ã€ããŠå¿é
ããå¿
èŠã¯ãããŸããã
ããšãã°ãPyhtonã¯åç
§ã«ãŠã³ãã䜿çšããŸããã«ãŠã³ã¿ãŒããŒãã«ãªããšãã¡ã¢ãªã¯èªåçã«è§£æŸãããŸãã ãããã£ãŠãPyhtonã§ZeroMQã¢ããªã±ãŒã·ã§ã³ãäœæãããšãã«ããªããžã§ã¯ãã®åç
§ã«ãŠã³ãããŒãã«ãªããšããã«èªåçã«éãããããããæ瀺çã«æ¥ç¶ãéããªãã§ãã ããã ãã ããããã¯JythonãPyPyãIronPythonã§ã¯æ©èœããªãããšã«æ³šæããŠãã ããã ãšã«ãããPythonããã¥ã¡ã³ãã§ååãªæ
å ±ãèŠã€ããããšãã§ããŸãã ã¡ã€ã³ã¿ã¹ã¯ã«æ»ããŸãããã
Cã§èšè¿°ããå Žåãã¡ã¢ãªç®¡çã¯å®å
šã«ããªãã®è²¬ä»»ã§ãã ããããªããšãã¡ã¢ãªãªãŒã¯ãçºçããäžå®å®ãªã¢ããªã±ãŒã·ã§ã³ã«ãªããŸãã
ãœã±ããã®ã¯ããŒãºãã¡ãã»ãŒãžãšZeroMQã³ã³ããã¹ãã®ç Žå£ã¯èªåã§è¡ãå¿
èŠããããŸãã ã¢ããªã±ãŒã·ã§ã³ãæ£åžžã«å®äºããããã«èæ
®ãã¹ãããšãããã€ããããŸãã
- åè¿°ã®ããã«ãã¢ããªã±ãŒã·ã§ã³ãéããã«ã¯ã
zmq_ctx_destroy
ïŒïŒãåŒã³åºããŠZeroMQã³ã³ããã¹ããç Žæ£ããå¿
èŠããããŸãã ãã ããéããŠãããœã±ãããããå Žåã zmq_ctx_destroy
ã¯ãœã±ãããç¡æéã«éããã®ãåŸ
ã€ããšãã§ããŸãã ãããã£ãŠãæåã«ãã¹ãŠã®ãœã±ãããéããŠããã zmq_ctx_destroy
ïŒïŒãåŒã³åºããŠã³ã³ããã¹ããç Žæ£ããå¿
èŠããããŸãã zmq_ctx_destroy
ïŒïŒã¯ãéããŠããæ¥ç¶ãããå ŽåããŸãã¯éä¿¡ãããã¥ãŒã«ã¡ãã»ãŒãžãããå Žåãç¡æéã«åŸ
æ©ããŸãã- ã¡ãã»ãŒãžã®åŠçãçµäºãããã
zmq_msg_close
ïŒïŒãåŒã³åºããŠããã«ã¡ãã»ãŒãžãéããå¿
èŠããããŸããããããªããšãã¢ããªã±ãŒã·ã§ã³ã§ã¡ã¢ãªãªãŒã¯ãçºçããŸãã - å€ãã®ãœã±ãããéããªãã§ãã ããã ãããè¡ãå Žåãããã¯ããªããäœãééã£ãããšãããŠããããšãæå³ããã¢ããªã±ãŒã·ã§ã³ããŒãããèšèšããããšãæå³ããŸãã
ã¹ãã«ãééã£ãŠããå Žåãç¹ã«ãã«ãã¹ã¬ããã®å Žåãã¢ããªã±ãŒã·ã§ã³ã§äœãèµ·ããããèŠãŠé©ããããããŸããã ãã®å Žåããšã©ãŒããã£ããããããšã¯éåžžã«å°é£ã§ãã
ã¡ã¢ãªãªãŒã¯æ€åº
ã¡ã¢ãªç®¡çã¯å®å
šã«ããã°ã©ããŒã®è²¬ä»»ã§ãããããCãŸãã¯C ++ã§èšè¿°ãããã¢ããªã±ãŒã·ã§ã³ã«ã¯ãé©åã«äœæãããã¡ã¢ãªãããŒãžã£ãå¿
èŠã§ãã ãããè¡ãã«ã¯ãValgrindãšåŒã°ãããã°ãããLinuxããŒã«ã䜿çšããŸãã ãœãŒã¹ã³ãŒãã®åæã«åœ¹ç«ã€ä»ã®å€ãã®æ©èœã®äžã§ãããã®ããŒã«ã¯ã¡ã¢ãªãªãŒã¯ã®æ€åºã«äœ¿çšã§ããŸãã
次ã®ã»ã¯ã·ã§ã³ã¯ãValgrindã®å°ããªãã¥ãŒããªã¢ã«ã§ãããã®ãã¥ãŒããªã¢ã«ã§ã¯ãZeroMQã§ã¢ããªã±ãŒã·ã§ã³ãäœæãããšãã«Valgrindã䜿çšããæ¹æ³ã詳ããèŠãŠãããŸãã
Valgrindã®æŠèŠ
-gãªãã·ã§ã³ã䜿çšããŠã¢ããªã±ãŒã·ã§ã³ãã³ã³ãã€ã«ãããããã°æ
å ±ã衚瀺ã§ããŸãã ãã®å Žåããšã©ãŒã¡ãã»ãŒãžã«ã¯æ£ç¢ºãªè¡çªå·ãå«ãŸããŸãã
次ã®äŸãèããŠã¿ãŸãããã
#include <stdio.h> #include <stdlib.h> int main(int argc, char const *argv[]) { char* a = malloc(4); int b; printf("b = %d\n", b); return 0; }
gcc âg âo test test.c
å
¥åããŠãgccã§ã³ã³ãã€ã«ããŸãããã 次ã«ãValgrindãå®è¡ããŠã¡ã¢ãªãªãŒã¯ããã§ãã¯ããŸãã 次ã®ã³ãã³ããå®è¡ããŠã¿ãŸãããã
valgrind --leak-check=full --show-reachable=yes test
åã®ã³ãã³ããå
¥åããåŸãValgrindã¯memcheckããŒã«ã䜿çšããŠã¡ã¢ãªãšã©ãŒã®ã³ãŒãã®ãã§ãã¯ãéå§ããŸãã tool = memcheckãå®è¡ããŠåå¥ã«åŒã³åºãããšãã§ããŸãããmemcheckãããã©ã«ãã®ããŒã«ã§ãããããããã¯ç¡æå³ã§ãã åºåã¯æ¬¡ã®ããã«ãªããŸãã
==98190== Conditional jump or move depends on uninitialised value(s) ==98190== at 0x2D923: __vfprintf ==98190== by 0x4AC5A: vfprintf_l ==98190== by 0x952BE: printf ==98190== by 0x1F5E: main (test.c:8) ==98190== 4 bytes in 1 blocks are definitely lost in loss record 1 of 5 ==98190== at 0xF656: malloc (vg_replace_malloc.c:195) ==98190== by 0x1F46: main (test.c:6) ==98190== LEAK SUMMARY: ==98190== definitely lost: 4 bytes in 1 blocks ==98190== indirectly lost: 0 bytes in 0 blocks ==98190== possibly lost: 0 bytes in 0 blocks
ããã§ã以åã®åºåã«ã€ããŠå°ã説æããŸãããã
- ããã¯ããã»ã¹IDã§ããããã98190ã«ã€ããŠã¯èª¬æããŸããã
Conditional jump or move depends on uninitialised value(s)
ãŸããããã¯ãã³ãŒãã®åæåãæåããããšãæå³ããŸãdefinitely lost
ãšããããšã¯ãã¡ã¢ãªãªãŒã¯ããããä¿®æ£ããå¿
èŠãããããšãæå³ããŸãindirectly lost
ãããšã indirectly lost
ããå¯èœæ§ã®ãããããã¯ã瀺ããŸãpossibly lost
ã¯ãã¡ã¢ãªãªãŒã¯ã®å¯èœpossibly lost
ããšãæå³ããŸã
ããã©ã«ãã§ã¯ãValgrindã¯
$PREFIX/lib/valgrind/default.supp
ãŸãã ãã ããZeroMQã§äœ¿çšããç¬èªã®ãã¡ã€ã«ãäœæããå¿
èŠããããŸãã次ã®ããã«ãªããŸãã
{ <socketcall_sendto> Memcheck:Param socketcall.sendto(msg) fun:send ... } { <socketcall_sendto> Memcheck:Param socketcall.send(msg) fun:send ... }
次ã«ãå¿
èŠãªåŒæ°ã次ã®ããã«äœ¿çšããŠValgrindãå®è¡ã§ããŸãã
valgrind --leak-check=full --show-reachable=yes --suppressions=zeromq.supp server
ãããã«
ãã®ç« ã§ã¯ããœã±ããã«ã€ããŠèª¬æãã2ã€ã®æ°ããã¢ãã«ãã€ãŸãã¯ã©ã€ã¢ã³ããµãŒããŒã¢ãã«ãšãã€ãã©ã€ã³ãã¿ãŒã³ã玹ä»ããŸããã ãŸãããããã®ãã³ãã¬ãŒãã䜿çšããŠç¹å®ã®åé¡ã解決ããæ¹æ³ã«ã€ããŠèª¬æããç°¡åãªäŸã䜿çšããŠããã瀺ããŸããã
ãŸããValgrindã®ã¡ã¢ãªãªãŒã¯ãèŠã€ããããã®åªããããŒã«ãæ€èšããŸããããæž
èŽããããšãããããŸãããPMã®ç¿»èš³ã«é¢ããã³ã¡ã³ãããéããã ããã