Pub-Subã·ã¹ãã ããã®ã¡ãã»ãŒãžãåŠçããæ¹æ³ã¯å€æ°ãããŸãïŒåå¥ã®ãµãŒãã¹ã®äœ¿çšãåé¢ãããããã»ã¹ã®åé¢ãããã»ã¹/ã¹ã¬ããããŒã«ã®èª¿æŽãè€éãªIPCãPoll-over-Httpãªã©ã 仿¥ã¯ãPub-Sub over HTTPã®äœ¿ç𿹿³ãšããã®ããã«ç¹å¥ã«äœæãããç¬èªã®ãµãŒãã¹ã«ã€ããŠã話ããããšæããŸãã
æ¢è£œã®HTTPãµãŒãã¹ããã¯ãšã³ãã䜿çšããããšã¯ãã¡ãã»ãŒãžãã¥ãŒãåŠçããããã®çæ³çãªãœãªã¥ãŒã·ã§ã³ã§ãã
- ç®±ããåºããŠãã©ã³ã¹ãåãã éåžžãããã¯ãšã³ãã¯ãã§ã«ãã©ã³ãµãŒã®èåŸã«ãããããã«ããŒãã§ããã€ã³ãã©ã¹ãã©ã¯ãã£ãåããŠãããããã¡ãã»ãŒãžã®åŠçã倧å¹
ã«ç°¡çŽ åãããŸãã
- éåžžã®RESTã³ã³ãããŒã©ãŒïŒä»»æã®HTTPãªãœãŒã¹ïŒã䜿çšããŸãã HTTPã¡ãã»ãŒãžã䜿çšãããšãããã¯ãšã³ããæ··åšããŠããå Žåã«ç°ãªãèšèªã®ã³ã³ãã¥ãŒã¿ãŒãå®è£
ããã³ã¹ããæå°éã«æããããŸãã
- ä»ã®ãµãŒãã¹ã®Webããã¯ã®äœ¿çšã®ç°¡çŽ åã çŸåšãã»ãŒãã¹ãŠã®ãµãŒãã¹ïŒJiraãGitlabãMattermostãSlackãªã©ïŒãäœããã®æ¹æ³ã§å€çãšããåãããããã®Webããã¯ããµããŒãããŠããŸãã HTTPãã£ã¹ãããã£ãŒã®æ©èœãå®è¡ããããã«ãã¥ãŒãæãããšãçæŽ»ã楜ã«ãªããŸãã
ãã®ã¢ãããŒãã«ã¯æ¬ ç¹ããããŸãã
- ãœãªã¥ãŒã·ã§ã³ã®è»œããå¿ããããšãã§ããŸãã HTTPã¯éããããã³ã«ã§ãããã³ã³ã·ã¥ãŒãåŽã§ãã¬ãŒã ã¯ãŒã¯ã䜿çšãããšãé
å»¶ãšè² è·ãå³åº§ã«å¢å ããŸãã
- äžè«èª¿æ»ã¢ãããŒãã®é·æã倱ããããã·ã¥ã®åŒ±ç¹ãååŸããŸãã
- ã¯ã©ã€ã¢ã³ããåŠçããã®ãšåããµãŒãã¹ã€ã³ã¹ã¿ã³ã¹ã§ã¡ãã»ãŒãžãåŠçãããšãå¿çæ§ã«åœ±é¿ããå ŽåããããŸãã ãã©ã³ã¹ãšåé¢ã§åŠçããããããããã¯éèŠã§ã¯ãããŸããã
ãã®ã¢ã€ãã¢ãQueue-Over-HttpãµãŒãã¹ãšããŠå®è£
ããŸããããããã«ã€ããŠã¯åŸã§èª¬æããŸãã ãããžã§ã¯ãã¯ãSpring Boot 2.1ã䜿çšããŠKotlinã§èšè¿°ãããŠããŸãã ãããŒã«ãŒãšããŠãçŸåšå©çšã§ããã®ã¯Apache Kafkaã®ã¿ã§ãã
ããã«ãã®èšäºã§ã¯ãèªè
ã¯Kafkaã«ç²ŸéããŠãããã¡ãã»ãŒãžã®ã³ãããïŒã³ãããïŒããã³ãªãã»ããïŒãªãã»ããïŒãã°ã«ãŒãã®ååïŒã°ã«ãŒãïŒããã³ã³ã³ã·ã¥ãŒããŒïŒæ¶è²»è
ïŒãç¥ã£ãŠããããšãããã³ããŒãã£ã·ã§ã³ïŒããŒãã£ã·ã§ã³ïŒãšãããã¯ïŒãããã¯ïŒã®éããçè§£ããŠããããšãåæãšããŠããŸãã ã®ã£ãããããå Žåã¯ãç¶è¡ããåã«Kafkaã®ããã¥ã¡ã³ãã®ãã®ã»ã¯ã·ã§ã³ãèªãããšããå§ãããŸããå
容
埩ç¿
Queue-Over-Httpã¯ãã¡ãã»ãŒãžãããŒã«ãŒãšæçµçãªHTTPã³ã³ã·ã¥ãŒããŒãšã®éã®ä»²ä»åœ¹ãšããŠæ©èœãããµãŒãã¹ã§ãïŒãã®ãµãŒãã¹ã«ãããç°ãªã* RPCãªã©ãä»ã®æ¹æ³ã§æ¶è²»è
ã«ã¡ãã»ãŒãžãéä¿¡ããããã®ãµããŒããç°¡åã«å®è£
ã§ããŸãïŒã çŸæç¹ã§ã¯ãã³ã³ã·ã¥ãŒããŒã®ãªã¹ãã®ãµãã¹ã¯ãªãã·ã§ã³ãã¢ã³ãµãã¹ã¯ã©ã€ããããã³è¡šç€ºã®ã¿ã䜿çšå¯èœã§ãããããã¥ãŒãµãŒããã®ç¹å¥ãªãµããŒããªãã§ã¯ã¡ãã»ãŒãžã®é åºãä¿èšŒã§ããªããããHTTPãä»ãããããŒã«ãŒïŒãããã¥ãŒã¹ïŒãžã®ã¡ãã»ãŒãžéä¿¡ã¯ãŸã å®è£
ãããŠããŸããã
ãµãŒãã¹ã®éèŠãªäººç©ã¯æ¶è²»è
ã§ãããç¹å®ã®ããŒãã£ã·ã§ã³ãŸãã¯ãããã¯ã®ã¿ããµãã¹ã¯ã©ã€ãã§ããŸãïŒãããã¯ãã¿ãŒã³ããµããŒããããŠããŸãïŒã æåã®ã±ãŒã¹ã§ã¯ãããŒãã£ã·ã§ã³ã®èªåãã©ã³ã¹ããªãã«ãªããŸãã ãµãã¹ã¯ã©ã€ãåŸãæå®ãããHTTPãªãœãŒã¹ã¯å²ãåœãŠãããKafkaããŒãã£ã·ã§ã³ããã¡ãã»ãŒãžãåä¿¡ãå§ããŸãã ã¢ãŒããã¯ãã£çã«ã¯ãåãµãã¹ã¯ã©ã€ããŒã¯ãã€ãã£ãã®Kafka Javaã¯ã©ã€ã¢ã³ãã«é¢é£ä»ããããŠããŸãã
KafkaConsumerã«ã€ããŠã®é¢çœã話Kafkaã«ã¯ãå€ãã®ããšãã§ããåªããJavaã¯ã©ã€ã¢ã³ãããããŸãã ãã¥ãŒã¢ããã¿ãŒã§äœ¿çšããŠããããŒã«ãŒããã¡ãã»ãŒãžãåä¿¡ãããµãŒãã¹ã®ããŒã«ã«ãã¥ãŒã«éä¿¡ããŸãã ã¯ã©ã€ã¢ã³ããåäžã¹ã¬ããã®ã³ã³ããã¹ãã§æä»çã«åäœããããšã¯èšåãã䟡å€ããããŸãã
ã¢ããã¿ã®ã¢ã€ãã¢ã¯ç°¡åã§ãã 1ã€ã®ã¹ã¬ããã§éå§ããåŸ
æ©æéã®åæžã«çŠç¹ãåœãŠããã€ãã£ãã¯ã©ã€ã¢ã³ãã®æãåçŽãªã¹ã±ãžã¥ãŒã©ãŒãäœæããŸãã ã€ãŸããæ¬¡ã®ãããªãã®ãäœæããŸãã
while (!Thread.interrupted()) { var hasWork = false for (consumer in kafkaConsumers) { val queueGroup = consumers[consumer] ?: continue invalidateSubscription(consumer, queueGroup) val records = consumer.poll(Duration.ZERO) if (!records.isEmpty) { hasWork = true } } val committed = doCommit() if (!hasWork && committed == 0) {
ãã¹ãŠãçŽ æŽããããå€ãã®æ¶è²»è
ã§ãåŸ
ã¡æéã¯æå°éã«æããããŠããããã§ãã å®éã«ã¯ã
KafkaConsumer
ãã®æäœã¢ãŒãã«å¯ŸããŠ
KafkaConsumer
ãã¢ã€ãã«æéã§çŽ1.5 MB / sã®å²ãåœãŠã¬ãŒããæäŸããããšã倿ããŸããã 100ã¯ãŒãªãšã§ã¯ãå²ãåœãŠçã¯150 MB / sã«éããGCãã¢ããªã±ãŒã·ã§ã³ãããèããããã«ãªããŸãã ãã¡ããããã®ãŽãã¯ãã¹ãŠè¥ãé åã«ãããããGCã¯ããã«å¯ŸåŠã§ããŸãããããã§ã解決çã¯å®å
šã§ã¯ãããŸããã
æããã«ã
KafkaConsumer
äžè¬çãªæ¹æ³ã䜿çšããå¿
èŠããããŸãããããŠä»ãåãµãã¹ã¯ã©ã€ããŒãã¹ããªãŒã ã«é
眮ããŸãã ããã«ãããã¡ã¢ãªãšã¹ã±ãžã¥ãŒãªã³ã°ã®ãªãŒããŒããããçºçããŸãããä»ã®æ¹æ³ã¯ãããŸããã
äžèšã®ã³ãŒããæžãçŽããå
åŽã®ã«ãŒããåé€ããŠ
Duration.ZERO
ã
Duration.ofMillis(100)
ãŸãã å²ãåœãŠçã¯ãæ¶è²»è
ããšã«èš±å®¹å¯èœãª80ã150 KB / sãŸã§äœäžããŸãã ãã ããã¿ã€ã ã¢ãŠãã100ããªç§ã®ããŒãªã³ã°ã¯ããããã®åã100ããªç§ãžã®ã³ãããã®ãã¥ãŒå
šäœãé
å»¶ãããããã¯å€ãã®å Žååãå
¥ããããŸããã
åé¡ã®è§£æ±ºçãèŠã€ããéçšã§ãç§ã¯
KafkaConsumer::wakeup
ãæãåºã
KafkaConsumer::wakeup
ãããã¯ã
WakeupException
ãã¹ããŒããã³ã³ã·ã¥ãŒããŒã§ã®ããããã³ã°æäœãäžæããŸãã ãã®ã¡ãœããã䜿çšãããšãäœé
å»¶ãžã®ãã¹ã¯ç°¡åã§ããæ°ããã³ãããèŠæ±ãå°çãããšãããããã¥ãŒã«å
¥ãããã€ãã£ãã³ã³ã·ã¥ãŒããŒã§
wakeup
ãåŒã³åºããŸãã äœæ¥ãµã€ã¯ã«ã§ã
WakeupException
ããã£ããããèç©ããå
容ãã³ãããããŸãã äŸå€ã®å©ããåããŠå¶åŸ¡ãç§»ãã«ã¯ãããã«ãããæã«å
¥ããå¿
èŠããããŸãããããã¯äœããã®åœ¢ã§ç°ãªã£ãŠããã®ã§...
ãã€ãã£ãã³ã³ã·ã¥ãŒããŒã§ã®æäœã¯ãã¹ãŠãã³ãããèªäœãå«ã
WakeupException
ã¹ããŒãã
WakeupException
ããã®ãªãã·ã§ã³ã¯å®ç§ãšã¯ã»ã©é ãããšã
WakeupException
ãŸãã ãã®ç¶æ³ãåŠçãããšããã©ã°ã䜿çšããŠã³ãŒãããã£ãããã
wakeup
ãå®è¡ã§ããŸãã
远å ã®ãã©ã°ã«å¿ããŠã
KafkaConsumer::poll
ã¡ãœããã倿ŽããŠãæ£åžžã«äžæã§ããããã«ããã®ãè¯ããšããçµè«ã«éããŸããã ãã®çµæã
ãã©ã³ã±ã³ã·ã¥ã¿ã€ã³ã¯ãªãã¬ã¯ã·ã§ã³ããçãŸããŸããããªãã¬ã¯ã·ã§ã³ã¯ãå
ã®ããŒãªã³ã°ã¡ãœãããæ£ç¢ºã«ã³ããŒãããã©ã°ã«ããã«ãŒãã®çµäºã远å ããŸãã ãã®ãã©ã°ã¯ãå¥ã®interruptPollã¡ãœããã«ãã£ãŠèšå®ãããŸããããã«ããã®ã¡ãœããã¯ãã¯ã©ã€ã¢ã³ãã»ã¬ã¯ã¿ã§ãŠã§ã€ã¯ã¢ãããåŒã³åºããŠãI / Oæäœã§ã¹ã¬ããããã¯ãè§£é€ããŸãã
ãã®æ¹æ³ã§ã¯ã©ã€ã¢ã³ããå®è£
ãããšãã³ãããã®ãªã¯ãšã¹ããå°çããŠããæå€§100ãã€ã¯ãç§ãŸã§åŠçããããŸã§ã®åå¿é床ãšããããŒã«ãŒããã¡ãã»ãŒãžããã§ããããããã®åªããã¬ã€ãã³ã·ãåŸãããŸãã
åããŒãã£ã·ã§ã³ã¯ãã¢ããã¿ãŒããããŒã«ãŒããã®ã¡ãã»ãŒãžãæžã蟌ãåå¥ã®ããŒã«ã«ãã¥ãŒã§è¡šãããŸãã ã¯ãŒã«ãŒã¯ããããã¡ãã»ãŒãžãååŸããå®è¡ã®ããã«ãã€ãŸãHTTPçµç±ã§éä¿¡ããããã«éä¿¡ããŸãã
ãã®ãµãŒãã¹ã¯ãã¹ã«ãŒããããåäžããããããã¡ãã»ãŒãžåŠçããµããŒãããŠããŸãã ãµãã¹ã¯ã©ã€ããããšãã«
concurrencyFactor
åãããã¯ã®
concurrencyFactor
æå®ã§ããŸãïŒå²ãåœãŠãããåããŒãã£ã·ã§ã³ã«åå¥ã«é©çšãããŸãïŒã ããšãã°ã
concurrencyFactor=1000
ã¯ãHTTPèŠæ±ã®åœ¢åŒã®1000ã®ã¡ãã»ãŒãžãåæã«ã³ã³ã·ã¥ãŒãã«éä¿¡ã§ããããšãæå³ããŸãã ããã¯ããã®ãã¹ãŠã®ã¡ãã»ãŒãžãæ¶è²»è
ã«ãã£ãŠæç¢ºã«è§£æ±ºããããšããã«ããµãŒãã¹ã¯Kafkaã®æåŸã®ã¡ãã»ãŒãžã®ãªãã»ããã®æ¬¡ã®ã³ãããã«ã€ããŠæ±ºå®ãäžããŸãã ãããã£ãŠã
concurrencyFactor
ã®2çªç®ã®å€ã¯ãKafkaãŸãã¯Queue-Over-Httpã¯ã©ãã·ã¥ã®ã€ãã³ãã§ã³ã³ã·ã¥ãŒããŒã«ãã£ãŠåŠçãããã¡ãã»ãŒãžã®æå€§æ°ã§ãã
é
å»¶ãæžããããã«ããã¥ãŒã«ã¯
loadFactor = concurrencyFactor * 2
ããããŸããããã«ããããããŒã«ãŒããéä¿¡ã§ããã¡ãã»ãŒãžã®2åã®ã¡ãã»ãŒãžãèªã¿åãããšãã§ããŸãã ãã€ãã£ãã¯ã©ã€ã¢ã³ãã§ã®èªåã³ãããã¯ç¡å¹ã«ãªã£ãŠããããããã®ã¹ããŒã ã¯At-Least-Onceã®ä¿èšŒã«éåããŸããã
concurrencyFactor
å€ãé«ããããšãææªã®å Žåã«æå€§10ããªç§ãããã³ãããã®æ°ãæžãããããã¥ãŒã®ã¹ã«ãŒããããåäžããŸãã åæã«ãæ¶è²»è
ã®è² è·ãå¢å ããŸãã
ãã³ãã«å
ã§ã¡ãã»ãŒãžãéä¿¡ããé åºã¯ä¿èšŒãããŠããŸãããã
concurrencyFactor=1
èšå®ããããšã§å®çŸã§ããŸãã
ã³ããã
ã³ãããã¯ãµãŒãã¹ã®éèŠãªéšåã§ãã ããŒã¿ã®æ¬¡ã®ãã±ããã®æºåãã§ãããšããã±ããããã®æåŸã®ã¡ãã»ãŒãžã®ãªãã»ãããããã«Kafkaã«ã³ããããããã³ããããæåããåŸã«ã®ã¿æ¬¡ã®ãã±ãããåŠçå¯èœã«ãªããŸãã å€ãã®å Žåãããã§ã¯äžååã§ãããèªåã³ããããå¿
èŠã§ãã ãããè¡ãã«ã¯ã
autoCommitPeriodMs
ãã©ã¡ãŒã¿ãŒããããŸããããã¯ãããŒãã£ã·ã§ã³ããèªã¿åãããæåŸã®ã¡ãã»ãŒãžãã³ããããããã€ãã£ãã¯ã©ã€ã¢ã³ãã®åŸæ¥ã®èªåã³ãããæéãšã¯ã»ãšãã©é¢ä¿ãããŸããã
concurrencyFactor=10
æ³å
concurrencyFactor=10
ãã ããã ãµãŒãã¹ã¯10åãã¹ãŠã®ã¡ãã»ãŒãžãéä¿¡ããããããã®æºåãæŽããŸã§åŸ
æ©ããŠããŸãã ã¡ãã»ãŒãž3ã®åŠçãæåã«å®äºããæ¬¡ã«ã¡ãã»ãŒãž1ãæ¬¡ã«ã¡ãã»ãŒãž10ãå®äºããŸãããã®æç¹ã§ãèªåã³ãããã®æéã«éããŸããã At-Least-Onceã»ãã³ãã£ã¯ã¹ã«éåããªãããšãéèŠã§ãã ãããã£ãŠããã®æç¹ã§æ£åžžã«åŠçãããã®ã¯æåã®ã¡ãã»ãŒãžãã€ãŸããªãã»ãã2ã®ã¿ã§ãããããã³ãããã§ããŸãã ããã«ã次ã®èªåã³ããããŸã§ãã¡ãã»ãŒãž2ã5ã6ã4ãããã³8ãåŠçãããŸããããªãã»ãã7ã®ã¿ãã³ãããããå¿
èŠããããŸãã èªåã³ãããã¯ã¹ã«ãŒãããã«ã»ãšãã©åœ±é¿ããŸããã
ãšã©ãŒåŠç
éåžžã®æäœã¢ãŒãã§ã¯ããµãŒãã¹ã¯ã¹ãŒããŒãã€ã¶ã«ã¡ãã»ãŒãžã1åéä¿¡ããŸãã äœããã®çç±ã§4xxãŸãã¯5xxãšã©ãŒãçºçããå ŽåããµãŒãã¹ã¯ã¡ãã»ãŒãžãåéä¿¡ããæ£åžžãªåŠçãåŸ
æ©ããŸãã 詊è¡éã®æéã¯ãåå¥ã®ãã©ã¡ãŒã¿ãŒãšããŠæ§æã§ããŸãã
ãŸããã¡ãã»ãŒãžãåŠçæžã¿ãšããŠããŒã¯ããããŸã§ã®è©Šè¡åæ°ãèšå®ããããšãã§ããŸããããã«ãããå¿çã®ã¹ããŒã¿ã¹ã«é¢ä¿ãªãåéä¿¡ã忢ãããŸãã æ©å¯ããŒã¿ã«ããã䜿çšããããšã¯ãå§ãããŸãããæ¶è²»è
ã®å€±æã®ç¶æ³ã¯åžžã«æåã§èª¿æŽããå¿
èŠããããŸãã ã¹ãã£ãããŒã¡ãã»ãŒãžã¯ããµãŒãã¹ãã°ãšæ¶è²»è
ã®å¿çã®ç¶æ
ãç£èŠããããšã§è¿œè·¡ã§ããŸãã
ä»çã«ã€ããŠéåžžã4xxãŸãã¯5xxã«å¿çã®ã¹ããŒã¿ã¹ãäžããHTTPãµãŒããŒã¯ã Connection: close
ããããŒãéä¿¡ãConnection: close
ã ãã®æ¹æ³ã§éããããTCPæ¥ç¶ã¯ããã°ããããŠãªãã¬ãŒãã£ã³ã°ã·ã¹ãã ã«ãã£ãŠã¯ãªã¢ããããŸã§ã TIME_WAITED
ç¶æ
ã®ãŸãŸã«ãªããŸãã åé¡ã¯ããã®ãããªæ¥ç¶ãè§£æŸããããŸã§åå©çšã§ããªãããŒãå
šäœãå æããããšã§ãã ããã«ããããã·ã³äžã«TCPæ¥ç¶ã確ç«ããããã®ç©ºãããŒãããªãå Žåããããåéä¿¡ã®ãã°ã«äŸå€ãã¹ããŒãããŸãã å®éã«ã¯ãWindows 10ã§ã¯ã1äžãã2äžäººã誀ã£ãã¡ãã»ãŒãžã1ã2å以å
ã«éä¿¡ããåŸãããŒããçµäºããŸãã æšæºã¢ãŒãã§ã¯ãããã¯åé¡ã§ã¯ãããŸããã
ã¡ãã»ãŒãž
ãããŒã«ãŒããæœåºãããåã¡ãã»ãŒãžã¯ããµãã¹ã¯ãªãã·ã§ã³äžã«æå®ããããªãœãŒã¹ã«HTTPçµç±ã§ã¢ããã€ã¶ãŒã«éä¿¡ãããŸãã ããã©ã«ãã§ã¯ãã¡ãã»ãŒãžã¯æ¬æã®POSTãªã¯ãšã¹ãã«ãã£ãŠéä¿¡ãããŸãã ãã®åäœã¯ãä»ã®æ¹æ³ãæå®ããããšã§å€æŽã§ããŸãã ã¡ãœãããæ¬æã§ã®ããŒã¿éä¿¡ããµããŒãããŠããªãå Žåãã¡ãã»ãŒãžãéä¿¡ãããæååãã©ã¡ãŒã¿ãŒã®ååãæå®ã§ããŸãã ããã«ããµãã¹ã¯ã©ã€ããããšãã«ãåã¡ãã»ãŒãžã«è¿œå ããã远å ã®ããããŒãæå®ã§ããŸããããã¯ãããŒã¯ã³ã䜿çšããåºæ¬çãªæ¿èªã«äŸ¿å©ã§ãã æ¶è²»è
ããããã¯ãããŒãã£ã·ã§ã³ã®èå¥åãã¡ãã»ãŒãžçªå·ãã¡ãã»ãŒãžçªå·ãããŒãã£ã·ã§ã³ããŒïŒè©²åœããå ŽåïŒãããã³ãããŒã«ãŒã®ååãšãšãã«ãããããŒãåã¡ãã»ãŒãžã«è¿œå ãããŸãã
æ§èœ
ããã©ãŒãã³ã¹ãè©äŸ¡ããããã«ããµãŒãã¹ãå®è¡ããPCïŒWindows 10ãOpenJDK-11ïŒãã¥ãŒãã³ã°ãªãã®G1ïŒãi7-6700Kã16GBïŒãšãã¡ãã»ãŒãžãããã¥ãŒãµãŒãHTTPãå®è¡ãããã©ãããããïŒWindows 10ãi5-8250Uã8GBïŒããã©ã«ãèšå®ã®ãªãœãŒã¹ã³ã³ã·ã¥ãŒããšKafkaã PCã¯1Gb / sã®æç·æ¥ç¶ãä»ããŠã«ãŒã¿ãŒã«æ¥ç¶ãããã©ãããããã¯802.11acãä»ããŠæ¥ç¶ãããŸãã 100ããªç§ããšã«ã110ç§éã®ãããã¥ãŒãµãŒã¯ã110ãã€ãã®ã¡ãã»ãŒãžããç°ãªãã°ã«ãŒãããã³ã³ã·ã¥ãŒããŒããµãã¹ã¯ã©ã€ããããŠããïŒ
concurrencyFactor=500
ãèªåã³ãããããªãã«ãªã£ãŠããïŒæå®ããããããã¯ã«æžã蟌ã¿ãŸãã ã¹ã¿ã³ãã¯çæ³ãšã¯ã»ã©é ãã§ãããåçãæ®ãããšãã§ããŸãã
éèŠãªæž¬å®ãã©ã¡ãŒã¿ã¯ããµãŒãã¹ã®åŸ
ã¡æéãžã®åœ±é¿ã§ãã
ãããŠãã ããïŒ
-t
q-ãã€ãã£ãã¯ã©ã€ã¢ã³ãããã¡ãã»ãŒãžãåä¿¡ãããµãŒãã¹ã®ã¿ã€ã ã¹ã¿ã³ã
-d
t0ã¯ãt
qãšã¡ãã»ãŒãžãããŒã«ã«ãã¥ãŒãããšã°ãŒã¯ãã£ãã®ããŒã«ã«éä¿¡ãããæéã®éã®æéã§ãã
-d
tã¯ãt
qãšHTTPèŠæ±ãéä¿¡ãããæéã®éã®æéã§ãã ãã®d
tã¯ãã¡ãã»ãŒãžã®ã¬ã€ãã³ã·ã«å¯ŸãããµãŒãã¹ã®åœ±é¿ã§ãã
枬å®äžã«ã次ã®çµæãåŸãããŸããïŒC-æ¶è²»è
ãT-ãããã¯ãM-ã¡ãã»ãŒãžïŒïŒ

æšæºåäœã¢ãŒãã§ã¯ããµãŒãã¹èªäœã¯ã¬ã€ãã³ã·ã«ã»ãšãã©åœ±é¿ãããã¡ã¢ãªæ¶è²»ã¯æå°éã§ãã d
tã®æå€§å€ïŒçŽ60ããªç§ïŒã¯ããµãŒãã¹èªäœã§ã¯ãªãGCã®åäœã«äŸåãããããç¹ã«ç€ºãããŠããŸããã GCã®ç¹å¥ãªèª¿æŽãŸãã¯G1ãã·ã§ãã³ããŒã§çœ®ãæããããšã«ãããæå€§å€ã®åºãããã¹ã ãŒãºã«ããããšãã§ããŸãã
æ¶è²»è
ããã¥ãŒããã®ã¡ãã»ãŒãžã®æµãã«å¯ŸåŠããããµãŒãã¹ã調æŽã¢ãŒãããªã³ã«ãããšããã¹ãŠãåçã«å€åããŸãã ãã®ã¢ãŒãã§ã¯ãèŠæ±ã«å¯Ÿããå¿çæéã倧å¹
ã«å¢å ãããããã¡ã¢ãªæ¶è²»ãå¢å ãããªãœãŒã¹ã®ã¿ã€ã ãªãŒãªã¯ãªãŒãã³ã°ã劚ããããŸãã ããã§ã®åŸ
æ©æéãžã®åœ±é¿ã¯ä»¥åã®çµæãšåãã¬ãã«ã®ãŸãŸã§ãããããŒã«ã«ãã¥ãŒã«ã¡ãã»ãŒãžãããªããŒãããããšã§é«ãdtå€ãçºçããŸãã
æ®å¿µãªãããã©ãããããã¯ãã§ã«1300 RPSã§æ²ãã£ãŠããã®ã§ãããé«ãè² è·ã§ãã¹ãããããšã¯ã§ããŸããã 誰ããé«è² è·ã§ã®æž¬å®ã®çµç¹ãæäŒãããšãã§ãããªããç§ã¯åãã§ãã¹ãçšã®ã¢ã»ã³ããªãæäŸããŸãã
ãã¢ã³ã¹ãã¬ãŒã·ã§ã³
ããã§ã¯ãã¢ã«ç§»ããŸãããã ããã«ã¯æ¬¡ã®ãã®ãå¿
èŠã§ãã
- KafkaãããŒã«ãŒãæºåå®äºã Bitnamiãã192.168.99.100:9092ã§çºçããã€ã³ã¹ã¿ã³ã¹ãååŸããŸãã
- ã¡ãã»ãŒãžãåä¿¡ããHTTPãªãœãŒã¹ã ããããããããããã«ãSlackããWebããã¯ãåããŸããã
ãŸããQueue-Over-HttpãµãŒãã¹èªäœãäžããå¿
èŠããããŸãã ãããè¡ãã«ã¯ã空ã®
application.yml
ãã£ã¬ã¯ããªã«æ¬¡ã®ã³ã³ãã³ããäœæããŸãã
spring: profiles: default logging: level: com: viirrtus: queueOverHttp: DEBUG app: persistence: file: storageDirectory: "persist" brokers: - name: "Kafka" origin: "kafka" config: bootstrap.servers: "192.168.99.100:9092"
ããã§ã¯ãç¹å®ã®ãããŒã«ãŒã®æ¥ç¶ãã©ã¡ãŒã¿ãŒãšãèµ·åæã«å€±ãããªãããã«ãµãã¹ã¯ã©ã€ããŒãä¿åããå ŽæããµãŒãã¹ã«ç€ºããŸãã `app.brokers []ãConfig`ã§ã¯ããã€ãã£ãã®Kafkaã¯ã©ã€ã¢ã³ãã§ãµããŒããããŠããæ¥ç¶ãã©ã¡ãŒã¿ãŒãæå®ã§ããŸã;å®å
šãªãªã¹ãã¯
ãã¡ãã«ãããŸã ã
æ§æãã¡ã€ã«ã¯Springã«ãã£ãŠåŠçããããããããã«å€ãã®è峿·±ãããšãæžãããšãã§ããŸãã å«ããŠããã®ã³ã°ãæ§æããŸãã
次ã«ããµãŒãã¹èªäœãå®è¡ããŸãã æãç°¡åãªæ¹æ³
docker-compose.yml
ã䜿çšããŸãã
version: "2" services: app: image: viirrtus/queue-over-http:0.1.3 restart: unless-stopped command: --debug ports: - "8080:8080" volumes: - ./application.yml:/application.yml - ./persist:/persist
ãã®ãªãã·ã§ã³ãé©åã§ãªãå Žåã¯ããœãŒã¹ãããµãŒãã¹ãã³ã³ãã€ã«ã§ããŸãã ãããžã§ã¯ãã®Readmeã«ããã¢ã»ã³ããªæé ããªã³ã¯ã¯èšäºã®æåŸã«ãããŸããæ¬¡ã®ã¹ãããã¯ãæåã®ãµãã¹ã¯ã©ã€ããŒã®ç»é²ã§ãã ãããè¡ãã«ã¯ãConsumerã®èª¬æãšãšãã«ãµãŒãã¹ãžã®HTTPãªã¯ãšã¹ããå®è¡ããå¿
èŠããããŸãã
POST localhost:8080/broker/subscription Content-Type: application/json { "id": "my-first-consumer", "group": { "id": "consumers" }, "broker": "Kafka", "topics": [ { "name": "slack.test", "config": { "concurrencyFactor": 10, "autoCommitPeriodMs": 100 } } ], "subscriptionMethod": { "type": "http", "delayOnErrorMs": 1000, "retryBeforeCommit": 10, "uri": "<slack-wh-uri>", "additionalHeaders": { "Content-Type": "application/json" } } }
ãã¹ãŠãããŸãããã°ãå¿çã¯éä¿¡ãããã³ã³ãã³ããšã»ãŒåãã«ãªããŸãã
åãã©ã¡ãŒã¿ãŒãèŠãŠã¿ãŸãããã
Consumer.id
ãµãã¹ã¯ã©ã€ããŒã®IDConsumer.group.id
ã°ã«ãŒãèå¥åConsumer.broker
賌èªããå¿
èŠããããµãŒãã¹ãããŒã«ãŒã瀺ããŸãConsumer.topics[0].name
ã¡ãã»ãŒãžãåä¿¡ãããããã¯ã®ååConsumer.topics[0].config. concurrencyFactor
Consumer.topics[0].config. concurrencyFactor
åæã«éä¿¡ãããã¡ãã»ãŒãžã®æå€§æ°Consumer.topics[0].config. autoCommitPeriodMs
Consumer.topics[0].config. autoCommitPeriodMs
æºåå®äºã¡ãã»ãŒãžã®åŒ·å¶ã³ãããæéConsumer.subscriptionMethod.type
ãµãã¹ã¯ãªãã·ã§ã³ã®ã¿ã€ãã çŸåšãHTTPã®ã¿ã䜿çšå¯èœã§ããConsumer.subscriptionMethod.delayOnErrorMs
ãšã©ãŒã§çµäºããã¡ãã»ãŒãžãåéãããŸã§ã®æéConsumer.subscriptionMethod.retryBeforeCommit
ãšã©ãŒã¡ãã»ãŒãžã®åéä¿¡ã®è©Šè¡åæ°ã 0ã®å Žå-ã¡ãã»ãŒãžã¯åŠçãæåãããŸã§ã¹ãã³ããŸãã ç§ãã¡ã®å Žåãå®å
šãªé
ä¿¡ã®ä¿èšŒã¯ããããŒã®äžå®æ§ã»ã©éèŠã§ã¯ãããŸãããConsumer.subscriptionMethod.uri
ã¡ãã»ãŒãžã®éä¿¡å
ã®ãªãœãŒã¹Consumer.subscriptionMethod.additionalHeader
åã¡ãã»ãŒãžãšãšãã«éä¿¡ããã远å ããããŒã Slackããªã¯ãšã¹ããæ£ããè§£éã§ããããã«ãåã¡ãã»ãŒãžã®æ¬æã«JSONãããããšã«æ³šæããŠãã ããã
ãã®ãªã¯ãšã¹ãã§ã¯ãHTTPã¡ãœããã¯çç¥ãããŠããŸããããã¯ãããã©ã«ãã®POSTã§ããSlackã§ååã ããã§ãããã®æç¹ããããµãŒãã¹ã¯slack.testãããã¯ã®å²ãåœãŠãããããŒãã£ã·ã§ã³ã®æ°ããã¡ãã»ãŒãžãç£èŠããŸãã
ãããã¯ã«ã¡ãã»ãŒãžãæžã蟌ãã«
/opt/bitnami/kafka/bin
èµ·åãããKafkaã€ã¡ãŒãžã®
/opt/bitnami/kafka/bin
ããKafkaã®çµã¿èŸŒã¿ãŠãŒãã£ãªãã£ã䜿çšããŸãïŒä»ã®Kafkaã€ã³ã¹ã¿ã³ã¹ã®ãŠãŒãã£ãªãã£ã®å Žæã¯ç°ãªãå ŽåããããŸãïŒã
kafka-console-producer.sh --broker-list localhost:9092 --topic slack.test > {âtextâ: âHello!â}
åæã«ãSlackã¯æ°ããã¡ãã»ãŒãžãéç¥ããŸãïŒ
ã³ã³ã·ã¥ãŒããŒã®ãµãã¹ã¯ãªãã·ã§ã³ãè§£é€ããã«ã¯ããµãã¹ã¯ãªãã·ã§ã³äžãšåãã³ã³ãã³ãã䜿çšããŠPOSTãªã¯ãšã¹ããããããŒã«ãŒ/ãµãã¹ã¯ãªãã·ã§ã³è§£é€ãããã ãã§ååã§ãããããã«
çŸæç¹ã§ã¯ãåºæ¬çãªæ©èœã®ã¿ãå®è£
ãããŠããŸãã ããã«ããããåŠçãæ¹åãã1åéãã®ã»ãã³ãã£ã¯ã¹ãå®è£
ããHTTPçµç±ã§ãããŒã«ãŒã«ã¡ãã»ãŒãžãéä¿¡ããæ©èœã远å ããæãéèŠãªããšãšããŠãä»ã®äžè¬çãªPub-Subã®ãµããŒãã远å ããäºå®ã§ãã
Queue-Over-HttpãµãŒãã¹ã¯çŸåšæŽ»çºã«éçºäžã§ãã ããŒãžã§ã³0.1.3ã¯ãéçºããã³ã¹ããŒãžã¹ã¿ã³ãã§ã®ãã¹ãã«ååå®å®ããŠããŸãã ããã©ãŒãã³ã¹ã¯ãWindows 10ãDebian 9ãããã³Ubuntu 18.04ã§ãã¹ããããŠããŸãã prodã¯ãèªèº«ã®è²¬ä»»ã§äœ¿çšã§ããŸãã éçºã®æ¯æŽããµãŒãã¹ã«é¢ãããã£ãŒãããã¯ããæã¿ã®å Žåã¯ã
Githubãããžã§ã¯ããžããããã