ããã¯ç§ã®æŠèŠã§ããããã§ã¯ã次ã®ãããªã«ãã«ã®æŠå¿µã«ã€ããŠç°¡åãã€æ¬è³ªçã«è§ŠããŸãã
-ãããã¯
-ãµãã¹ã¯ã©ã€ããŒïŒã³ã³ã·ã¥ãŒããŒïŒ
-åºç瀟ïŒãããã¥ãŒãµãŒïŒ
-ã°ã«ãŒããã°ã«ãŒããããŒãã£ã·ã§ã³
-ã¹ããªãŒã
ã«ãã«-åºæ¬
Kafkaãå匷ãããšãã質åã䜿ã£ãŠãäŸã䜿çšããŠå®éšçã«åãåãå¿
èŠãããçãããããŸãããããã¯ãã®èŠçŽã§æŠèª¬ãããŠããŸãã éå§æ¹æ³ãšéå§å ޿以äžã®ãªã³ã¯ã®ãããããè³æã«èšèŒããŸãã
Apache Kafkaã¯ãJavaãã©ãããã©ãŒã ã®ã¡ãã»ãŒãžãããŒãžã£ãŒã§ãã Kafkaã«ã¯ã
ãããªãã·ã£ãŒãã¡ãã»ãŒãžãæžã蟌ãã¡ãã»ãŒãž
ãµããžã§ã¯ããããããããã®ã¡ãã»ãŒãžãèªããããã¯ã«
ãµãã¹ã¯ã©ã€ããŒãããŸãããã£ã¹ãããããã»ã¹ã®ãã¹ãŠã®ã¡ãã»ãŒãžã¯ãã£ã¹ã¯ã«æžã蟌ãŸããã³ã³ã·ã¥ãŒããŒã«äŸåããŸããã

Kafkaã«ã¯ãããŒããã»ã¯ã·ã§ã³ãæ¢è£œã®ãããªãã·ã£ãŒãç»é²è
ãªã©ãäœæããããã®ãŠãŒãã£ãªãã£ã»ãããå«ãŸããŠããŸããKafkaãåäœãããã«ã¯ãZooKeeperã³ãŒãã£ããŒã¿ãŒãå¿
èŠã§ãã ïŒãããããã¡ã€ã«ã¯ããããã®binãã©ã«ããŒã«ããããŠãŒãã£ãªãã£ããããŸãã
ã«å
¥ããŠãŒãã£ãªãã£ã§KafkaããŒããäœæããŸã
kafka-topics.bat --create --zookeeper localhostïŒ2181 --replication-factor 1 --partitions 1 --topic out-topic
ããã§ã¯ãzookeeperãµãŒããŒã瀺ããreplication-factorã¯ã¡ãã»ãŒãžãã°ã¬ããªã«ã®æ°ãpartitionsã¯ãããã¯å
ã®ã»ã¯ã·ã§ã³ã®æ°ïŒä»¥äžãåç
§ïŒããããã¯èªäœã¯ãã¢ãŠããããã¯ãã§ãã
ç°¡åãªãã¹ãã®ããã«ãä»å±ã®kafka-console-consumerããã³kafka-console-producerã¢ããªã±ãŒã·ã§ã³ã䜿çšã§ããŸãããç§ã¯èªåã§ããã€ããã§ãã ãµãã¹ã¯ã©ã€ããŒã¯å®éã«ã¯ã°ã«ãŒãåãããŠããŸããããã«ãããããŸããŸãªã¢ããªã±ãŒã·ã§ã³ããããã¯ããã®ã¡ãã»ãŒãžã䞊è¡ããŠèªã¿åãããšãã§ããŸãã

åã¢ããªã±ãŒã·ã§ã³ã¯ç¬èªã®ãã¥ãŒãæã¡ãããããæåŸã«èªã¿åãããã¡ãã»ãŒãžïŒãªãã»ããïŒã®ãã€ã³ã¿ãŒãç§»åããŸããããã¯ã³ããããšåŒã°ããŸãã ãã®ããããããªãã·ã£ãŒããããã¯ã«ã¡ãã»ãŒãžãéä¿¡ããå Žåãå®è¡äžãŸãã¯æ¥ç¶ãããšããã«ããã®ãããã¯ã®åä¿¡è
ã«ããèªã¿åããä¿èšŒãããŸãã ããã«ãåããããã¯ããèªã¿åãããç°ãªãã°ã«ãŒãã«ããç°ãªãã¯ã©ã€ã¢ã³ãïŒclient.idïŒãããå Žåããããã¯äºãã«é¢ä¿ãªããæºåãã§ãããšãã«ã¡ãã»ãŒãžãåä¿¡ããŸãã

ãããã£ãŠãã¡ãã»ãŒãžã®ãã©ãã¯ãŒãšã1ã€ã®ãããã¯ããã®æ¶è²»è
ã«ããç¬ç«ããèªæžãæ³åã§ããŸãã
ãããããããã¯å
ã®ã¡ãã»ãŒãžãéä¿¡ããããããæ©ãå°çãå§ããå ŽåããããŸãã æ¶è²»è
ã¯ããããããé·ãåŠçããŸãã ãããè¡ãããã«ããããã¯ã¯ã»ã¯ã·ã§ã³ïŒããŒãã£ã·ã§ã³ïŒãæäŸãããã®ãããã¯ã®1ã€ã®ã°ã«ãŒãã§ã³ã³ã·ã¥ãŒããŒãå®è¡ã§ããŸãã

ãã®åŸãè² è·åæ£ãè¡ããããããã¯ããã³ã°ã«ãŒãå
ã®ãã¹ãŠã®ã¡ãã»ãŒãžã1ã€ã®ã³ã³ã·ã¥ãŒããŒãééããããã§ã¯ãããŸããã ãããŠãã¡ãã»ãŒãžãã»ã¯ã·ã§ã³ã«åé
ããæ¹æ³ãéžæãããŸãã ããã€ãã®æŠç¥ããããŸãïŒã©ãŠã³ãããã³-ããã¯ãããŒã®ããã·ã¥å€ã«ããåå
ããŸãã¯æžã蟌ãã»ã¯ã·ã§ã³çªå·ã®æç€ºçãªæç€ºã§ãã ãã®å Žåã®ãµãã¹ã¯ã©ã€ãã¯ãã»ã¯ã·ã§ã³å
šäœã«åçã«åæ£ãããŸãã ããšãã°ãã°ã«ãŒãå
ã®ã°ã«ãŒãããããµãã¹ã¯ã©ã€ããŒãå€ãå Žåãã ãããã¡ãã»ãŒãžãåä¿¡ããŸããã ãã®ããã«ããŠãã¹ã±ãŒã©ããªãã£ãæ¹åããããã®ã»ã¯ã·ã§ã³ãäœæãããŸãã
ããšãã°ã1ã€ã®ã»ã¯ã·ã§ã³ã§ãããã¯ãäœæããåŸã2ã€ã®ã»ã¯ã·ã§ã³ã«å€æŽããŸããã
kafka-topics.bat --zookeeper localhostïŒ2181 --alter --topic out-topic --partitions 2
1ã€ã®ãããã¯ã§1ã€ã®ã°ã«ãŒãã«ãããªãã·ã£ãŒãš2ã€ã®ãµãã¹ã¯ã©ã€ããŒãèµ·åããŸããïŒJavaããã°ã©ã ã®äŸã以äžã«ç€ºããŸãïŒã ã°ã«ãŒãåãšã¯ã©ã€ã¢ã³ãIDãæ§æããå¿
èŠã¯ãããŸãããKafkaããããåŠçããŸãã
my_kafka_run.cmd com.home.SimpleProducer out-topicïŒãããªãã·ã£ãŒïŒ
my_kafka_run.cmd com.home.SimpleConsumer out-topic testGroup01 client01ïŒæåã®ãµãã¹ã¯ã©ã€ããŒïŒ
my_kafka_run.cmd com.home.SimpleConsumer out-topic testGroup01 client02ïŒ2çªç®ã®ãµãã¹ã¯ã©ã€ããŒïŒ
ãã¢ã®ãããªãã·ã£ãŒã§ããŒã®å
¥åãéå§ãããšãå€ã¯èª°ãååŸããããããããŸãã ãããã£ãŠãããšãã°ãããŒããã·ã¥ã®é
åžæŠç¥ã«åŸã£ãŠãã¡ãã»ãŒãžmïŒ1ã¯ã¯ã©ã€ã¢ã³ãclient01ã«å°éããŸããã

ã¡ãã»ãŒãžnïŒã¯ã©ã€ã¢ã³ãclient02ãžã®1

ãã¢ãæå®ããã«keyïŒvalueã®ãã¢ãå
¥åãå§ãããšïŒãããªãã·ã£ãŒã§ãã®æ©äŒãäœããŸããïŒããµãŒã¯ã«æŠç¥ãéžæãããŸãã æåã®ã¡ãã»ãŒãžãmãã¯client01ã«ããããããã§ã«3åclient02ã«ãããããŠããŸãã

ãããŠãäŸãã°ããã®åœ¢åŒã®ããŒã®ãããªã»ã¯ã·ã§ã³ãæã€å¥ã®ãªãã·ã§ã³ïŒå€ïŒããŒãã£ã·ã§ã³

以åãããã·ã¥æŠç¥ã§ã¯ãmïŒ1ã¯å¥ã®ã¯ã©ã€ã¢ã³ãïŒclient01ïŒã«ç§»åããçŸåšã¯ã»ã¯ã·ã§ã³ïŒNo. 1ã0ããçªå·ä»ãïŒã®æç€ºçãªæç€ºã§client02ã«ç§»åããŠããŸãã
åããããã¯ã«å¯ŸããŠç°ãªãã°ã«ãŒãåtestGroup02ã§ãµãã¹ã¯ã©ã€ããŒãèµ·åãããšãã¡ãã»ãŒãžã¯ãµãã¹ã¯ã©ã€ããŒã«äžŠè¡ããŠç¬ç«ããŠéä¿¡ãããŸãã æåã®ãã®ãèªã¿ã2çªç®ã®ãã®ãã¢ã¯ãã£ãã§ãªãã£ãå Žåãã¢ã¯ãã£ãã«ãªããšããã«èªã¿ãŸãã

ããããã°ã«ãŒãããããã¯ã®èª¬æãèŠãããšãã§ããŸãïŒ
kafka-consumer-groups.bat --bootstrap-server localhostïŒ9092 --describe --group testGroup01

kafka-topics.bat --describe --zookeeper localhostïŒ2181 --topic out-topic

SimpleProducerã³ãŒãpublic class SimpleProducer { public static void main(String[] args) throws Exception {
SimpleConsumerã³ãŒã public class SimpleConsumer { public static void main(String[] args) throws Exception { if (args.length != 3) { System.out.println("Enter topic name, groupId, clientId"); return; }
ããã°ã©ã ãå®è¡ããããã«ãmy_kafka_run.cmdãšããããããã¡ã€ã«ãäœæããŸãã
@echo off set CLASSPATH="C:\Project\myKafka\target\classes"; for %%i in (C:\kafka_2.11-1.1.0\libs\*) do ( call :concat "%%i" ) set COMMAND=java -classpath %CLASSPATH% %* %COMMAND% :concat IF not defined CLASSPATH ( set CLASSPATH="%~1" ) ELSE ( set CLASSPATH=%CLASSPATH%;"%~1" )
èµ·åäŸïŒ
my_kafka_run.cmd com.home.SimpleConsumer out-topic testGroup02 client01
ã«ãã«ã¹ããªãŒã
ãã®ãããKafkaã®ãããŒã¯ãç¹å®ã®æäœã倿ãå®è¡ããåŸãããšãã°ãå¥ã®ãããã¯ã«çµæã远å ããããäžè¬çã«ã©ãã«ã§ãããŒã¿ããŒã¹ã«ä¿åãããã§ãããããã¯ããååŸãããäžé£ã®ã€ãã³ãã§ãã æäœã¯ããã£ã«ã¿ãªã³ã°ïŒãã£ã«ã¿ãŒïŒã倿ïŒãããïŒããŸãã¯éçŽïŒã«ãŠã³ããåèšãå¹³åïŒã®ãããªãã®ã§ãã ãããè¡ãããã«ã察å¿ããã¯ã©ã¹KStreamãKTableããããŸããKTableã¯ãæ°ããã¡ãã»ãŒãžããããã¯ã«å°çãããšåžžã«æŽæ°ãããçŸåšã®éèšå€ãæã€ããŒãã«ãšããŠè¡šãããšãã§ããŸãã ããã¯ã©ãã§ããïŒ

ããšãã°ãçºè¡è
ã¯ã€ãã³ãïŒã¡ãã»ãŒãžïŒã®ä»¶åã«æžã蟌ã¿ãKafkaã¯ãã¹ãŠã®ã¡ãã»ãŒãžãã¡ãã»ãŒãžãã°ã«ä¿åããŸããã¡ãã»ãŒãžãã°ã«ã¯ãããšãã°7æ¥éã®ä¿æããªã·ãŒããããŸãã ããšãã°ãçžå Žå€æŽã€ãã³ãã¯ã¹ããªãŒã ã§ãããããå¹³åå€ãèŠã€ããŠããããžã£ãŒãã«ããå±¥æŽãååŸããŠå¹³åãèšç®ããã¹ããªãŒã ãäœæããŸããããã§ãããŒã¯æ ªåŒã§ãããå€ã¯å¹³åã§ãïŒããã¯æ¢ã«ã¹ããŒã¿ã¹ã®ããããŒãã«ã§ãïŒã ããã«ã¯æ©èœããããŸã-ãã£ã«ã¿ãªã³ã°ãªã©ã®æäœãšã¯ç°ãªããéèšæäœã¯ãã®ç¶æ
ãä¿æããŸãã ãããã£ãŠããµããžã§ã¯ãã«æ°ããå°çããã¡ãã»ãŒãžïŒã€ãã³ãïŒã¯èšç®ã®å¯Ÿè±¡ãšãªããçµæã¯ä¿åïŒç¶æ
ã¹ãã¢ïŒããããã®åŸãæ°ããå°çããã¡ãã»ãŒãžã¯ãžã£ãŒãã«ã«æžã蟌ãŸããStreamã¯ããããåŠçããæ¢ã«ä¿åãããç¶æ
ã«å€æŽã远å ããŸãã ãã£ã«ã¿ãªã³ã°æäœã§ã¯ãç¶æ
ãä¿åããå¿
èŠã¯ãããŸããã ãããŠãããã§ããåºç瀟ã«é¢ä¿ãªããã¹ããªãŒã ããããè¡ããŸãã ããšãã°ããããªãã·ã£ãŒã¯ã¡ãã»ãŒãžãæžã蟌ã¿ãããã°ã©ã -ã¹ããªãŒã ã¯ãã®æç¹ã§ã¯æ©èœãããäœã倱ãããããã¹ãŠã®ã¡ãã»ãŒãžã¯ãã°ã«ä¿åãããããã°ã©ã ã¹ããªãŒã ãã¢ã¯ãã£ãã«ãªããšããã«ãèšç®ãè¡ããç¶æ
ãä¿åããèªã¿åãã¡ãã»ãŒãžã®ãªãã»ãããå®è¡ããŸããããã¯èªã¿åãããŸãïŒãå°æ¥çã«ã¯è¿ãããŸãããããã«ããããã®ã¡ãã»ãŒãžã¯ãžã£ãŒãã«ïŒkafka-logsïŒãé¢ããŸãã ããã§ã¯ãæããã«ãäž»ãªããšã¯ãã°ïŒkafka-logsïŒãšãã®ã¹ãã¬ãŒãžããªã·ãŒããããèš±å¯ããŠããããšã§ãã ããã©ã«ãã§ã¯ãKafka Streamã¯ç¶æ
ãRocksDBã«ä¿åããŸãã ã¡ãã»ãŒãžãã°ãšããã«é¢é£ãããã¹ãŠïŒãããã¯ããªãã»ãããã¹ã¬ãããã¯ã©ã€ã¢ã³ããªã©ïŒã¯ãæ§æãã¡ã€ã«ãconfig \ server.propertiesãã®ãã©ã¡ãŒã¿ãŒãlog.dirs = kafka-logsãã§æå®ããããã¹ã«æ²¿ã£ãŠé
眮ããããã°ã¹ãã¬ãŒãžããªã·ãŒã瀺ãããŸããLog.retention.hours = 48ãã ãã°ã®äŸ

ãŸããã¹ããªãŒã ç¶æ
ãå«ãããŒã¿ããŒã¹ãžã®ãã¹ã¯ãã¢ããªã±ãŒã·ã§ã³ãã©ã¡ãŒã¿ã§æå®ãããŸã
config.putïŒStreamsConfig.STATE_DIR_CONFIGã "CïŒ/kafka_2.11-1.1.0/state"ïŒ;
ç¶æ
ã¯ãã¢ããªã±ãŒã·ã§ã³ID
ããšã«åå¥ã«ä¿åãããŸãïŒ
StreamsConfig.APPLICATION_ID_CONFIG ïŒã äŸ

次ã«ãStreamã®åäœã確èªããŸãããã ãµã³ãã«ããStreamã¢ããªã±ãŒã·ã§ã³ãæºåããŸãããããã®ã¢ããªã±ãŒã·ã§ã³ã¯ãåãåèªã®æ°ãšã¢ããªã±ãŒã·ã§ã³ããããªãã·ã£ãŒããµãã¹ã¯ã©ã€ããŒãèæ
®ããé
ä¿¡ïŒå®éšçšã®æ¹è¯ãå ãããã®ïŒã§ãã ãããã¯å
ã®ãããã¯ã«æžã蟌ã¿ãŸã
my_kafka_run.cmd com.home.SimpleProducerã®ãããã¯
Streamã¢ããªã±ãŒã·ã§ã³ã¯ãã®ãããã¯ãèªãã§åäžã®åèªã®æ°ãã«ãŠã³ãããŸããç¶æ
ãç¶æããã¢ãŠããããã¯ãå¥ã®ãããã¯ã«ãªãã€ã¬ã¯ãããããšã¯æç€ºçã§ã¯ãããŸããã ããã§ããã°ãšç¶æ
ïŒç¶æ
ã¹ãã¢ïŒã®é¢ä¿ãæç¢ºã«ããŸãã ãããŠãZooKeeperãšKafkaãµãŒããŒãéå§ãããŸãã App-ID = app_01ã§ã¹ããªãŒã ãéå§ããŸã
my_kafka_run.cmd com.home.KafkaCountStreamã€ã³ãããã¯app_01
ãããªãã·ã£ãŒãšãµãã¹ã¯ã©ã€ããŒããããã
my_kafka_run.cmd com.home.SimpleProducerã®ãããã¯
my_kafka_run.cmd com.home.SimpleConsumer out-topic testGroup01 client01
ããã«ãããŸãïŒ

åèªã®å
¥åãéå§ãããã®ã«ãŠã³ãã確èªããŸããã©ã®Stream App-IDãããããã«ãŠã³ããããã瀺ããŸã

äœæ¥ã¯ç¬ç«ããŠè¡ãããŸããã¹ããªãŒã ã忢ããŠãããã¯ãžã®æžã蟌ã¿ãç¶ãããšãéå§æã«ã«ãŠã³ããããŸãã æ¬¡ã«ãApp-ID = app_02ïŒãããã¢ããªã±ãŒã·ã§ã³ã§ãããIDãç°ãªããŸãïŒã§2çªç®ã®ã¹ããªãŒã ãæ¥ç¶ãããã°ïŒä¿æããªã·ãŒã«åŸã£ãŠä¿åãããäžé£ã®ã€ãã³ãïŒãèªã¿åããæ°ãã«ãŠã³ãããç¶æ
ãä¿åããŠçµæã衚瀺ããŸãã ãããã£ãŠãç°ãªãæç¹ã§åäœãéå§ãã2ã€ã®ã¹ã¬ããã¯åãçµæã«ãªããŸããã

ä»ãç§ãã¡ã®ãžã£ãŒãã«ãå€ããªã£ãŠããããšãèããŠã¿ãŸãããïŒä¿æããªã·ãŒïŒãŸãã¯ãããåé€ãïŒå®è¡ã§ããŸãïŒãApp-ID = app_03ã§3çªç®ã®ã¹ããªãŒã ãæ¥ç¶ããŸãïŒããã®ããã«Kafkaã忢ããkafka-logsãåé€ããŠåã³éå§ããŸãïŒãããŠæ°ãããããã¯ãå
¥åããŸãæåã®ïŒapp_01ïŒã¹ã¬ãããã«ãŠã³ããç¶ããæ°ãã3çªç®ã®ã¹ã¬ããããŒãããéå§ãããããšã確èªããŠãã ããã

ãã®åŸãapp_02ã¹ããªãŒã ãå®è¡ãããšãæåã®ã¹ããªãŒã ã«è¿œãã€ããå€ãçãããªããŸãã ãã®äŸãããKafkaãçŸåšã®ãã°ãåŠçããæ¹æ³ã以åã«ä¿åããç¶æ
ã«è¿œå ããæ¹æ³ãªã©ãæããã«ãªããŸããã
KafkaCountStreamã³ãŒã public class KafkaCountStream { public static void main(final String[] args) throws Exception {
ã«ãã«ã®ããŒãã¯éåžžã«åºç¯ã§ãããç§ã¯èªåèªèº«ã®ããã«æåã®äžè¬çãªãã¬ãŒã³ããŒã·ã§ã³ãè¡ããŸãã:-)
ææïŒ
éå§æ¹æ³ãšéå§å Žæ