GParsã©ã€ãã©ãªã®APIãšãè€é床ãäžçšåºŠã®ãã«ãã¹ã¬ããã¿ã¹ã¯ã®ãœãªã¥ãŒã·ã§ã³ïŒãã®çµæããåœæ°çµæžãã«åœ¹ç«ã€å¯èœæ§ãããïŒã«ã€ããŠç°¡åã«æ€èšããŸãã
ãã®èšäºã¯ã
ãJavaã§ã®ãã«ãã³ã¢ããã°ã©ãã³ã°ãã³ãŒã¹ãèªãããã®æºåãšããŠãJavaããã°ã©ããŒãå©çšã§ããããŸããŸãªã¢ã¯ã¿ãŒã©ã€ãã©ãªã®ç 究äžã«æžãããŸããã
ãªã³ã©ã€ã³æè²ãã©ãããã©ãŒã udemy.comã§
Scala for Java Developersã³ãŒã¹ãæã
ãŠããŸãïŒCoursera / EdXã«äŒŒãŠããŸãïŒã
ããã¯ãAkkaã¢ã¯ã¿ãŒã®APIãããã©ãŒãã³ã¹ãããã³å®è£
ããã¢ãã«ã®åé¡ã«é¢ããä»ã®ã©ã€ãã©ãªã®å®è£
ãšæ¯èŒããããšãç®çãšããäžé£ã®èšäºã®æåã®èšäºã§ãã ãã®èšäºã§ã¯ãGParsã§ãã®ãããªåé¡ãšè§£æ±ºçãæäŸããŸãã
GParsã¯Clojureçšã«äœæãããã©ã€ãã©ãªã§ãããŸããŸãªäžŠåã³ã³ãã¥ãŒãã£ã³ã°ã¢ãããŒããå¹
åºããµããŒãããŠããŸãã
GParsã®é·æ
- ãœãŒã¹ã³ãŒãã¯Javaã§èšè¿°ãããŠããŸãïŒAkkaã¯Scalaã§èšè¿°ãããŠããŸãïŒã ããã€ãã£ããããã°ã©ãã³ã°èšèªã§ããã³ãããã®äžã«ãããã®ããèŠãã®ã¯åžžã«èå³æ·±ã
- GParsã¯ã¢ãããŒãïŒä¿³åªããšãŒãžã§ã³ããSTMãCSPãããŒã¿ãããŒïŒã®ãåç©åãã§ãã
- GParsã¯ãJavaã§èšè¿°ãããClojureã©ã³ã¿ã€ã ã©ã€ãã©ãªã®ã¯ã©ã¹ã䜿çšããŸãã èšæ€ã«èå³ããã
ãã€ã³ã¹ããŒã«ãGPar
Maven GParsãšGroovyãæ¥ç¶ãã
<dependency> <groupId>org.codehaus.gpars</groupId> <artifactId>gpars</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>org.codehaus.groovy</groupId> <artifactId>groovy-all</artifactId> <version>2.2.2</version> </dependency>
Mavenããªããã°ããªããžããªãã
GPars-1.1.0 ïŒ
sources ïŒãš
Groovy-2.2.2 ïŒ
sources ïŒãããŠã³ããŒãããŠããããžã§ã¯ãã«æ¥ç¶ããŸãã
ã¹ããŒãã¬ã¹ä¿³åª
ç°¡åãªäŸããå§ããŸãããã
ã¢ã¯ã¿ãŒã«ã¡ãã»ãŒãžãéä¿¡ããŠããŸãã
import groovyx.gpars.actor.*; public class Demo { public static void main(String[] args) throws Exception { Actor actor = new DynamicDispatchActor() { public void onMessage(String msg) { System.out.println("receive: " + msg); } }.start(); actor.send("Hello!"); System.in.read(); } } >> receive: Hello!
ã¡ãã»ãŒãžãéä¿¡ããŠå¿çãåŸ
ã€
import groovyx.gpars.actor.*; public class Demo { public static void main(String[] args) throws Exception { Actor actor = new DynamicDispatchActor() { public void onMessage(String msg) { System.out.println("ping: " + msg); getSender().send(msg.toUpperCase()); } }.start(); System.out.println("pong: " + actor.sendAndWait("Hello!")); } } >> ping: Hello! >> pong: HELLO!
ã¡ãã»ãŒãžãéä¿¡ããéåæã³ãŒã«ããã¯ãåããŸã
import groovyx.gpars.MessagingRunnable; import groovyx.gpars.actor.*; public class Demo { public static void main(String[] args) throws Exception { Actor actor = new DynamicDispatchActor() { public void onMessage(String msg) { System.out.println("ping: " + msg); getSender().send(msg.toUpperCase()); } }.start(); actor.sendAndContinue("Hello!", new MessagingRunnable<String>() { protected void doRun(String msg) { System.out.println("pong: " + msg); } }); System.in.read(); } } >> ping: Hello! >> pong: HELLO!
åä¿¡ã¡ãã»ãŒãžã®çš®é¡ããšã«ãã¿ãŒã³ãããã³ã°ãè¡ã
import groovyx.gpars.actor.*; public class Demo { public static void main(String[] args) throws Exception { Actor actor = new DynamicDispatchActor() { public void onMessage(String arg) { getSender().send(arg.toUpperCase()); } public void onMessage(Long arg) { getSender().send(1000 + arg); } }.start(); System.out.println("42.0 -> " + actor.sendAndWait(42.0)); } } >> Hello! -> HELLO! >> 42 -> 1042
ãã¿ãŒã³ãããã³ã°ã§é©åãªãã³ãã©ãèŠã€ãããŸããã§ãã
import groovyx.gpars.actor.*; public class Demo { public static void main(String[] args) throws Exception { Actor actor = new DynamicDispatchActor() { public void onMessage(String arg) { getSender().send(arg.toUpperCase()); } public void onMessage(Long arg) { getSender().send(1000 + arg); } }.start(); System.out.println("42.0 -> " + actor.sendAndWait(42.0)); } } >> An exception occurred in the Actor thread Actor Thread 1 >> groovy.lang.MissingMethodException: No signature of method: >> net.golovach.Demo_4$1.onMessage() is applicable for argument types: (java.lang.Double) values: [42.0] >> Possible solutions: onMessage(java.lang.Long), onMessage(java.lang.String) >> at org.codehaus.groovy.runtime.ScriptBytecodeAdapter ... >> ...
ç®ã«èŠãããã®
-ããã¿ãŒã³ãããã³ã°ãã¯ãé©åãªãªãŒããŒããŒãããŒãžã§ã³ã®onMessageïŒ<one-arg>ïŒã¡ãœãããéžæããŸããååšããªãå Žåã¯ãäŸå€ãçºçããŸãã
-ã¢ã¯ã¿ãŒã¯ãããŒã¢ã³ãã¹ã¬ããã®ããŒã«ã«åºã¥ããŠåäœãããããJVMãæ©æã«ã·ã£ããããŠã³ããªãããã«ãäœããã®æ¹æ³ã§mainïŒïŒã¡ãœããïŒSystem.in.readïŒïŒã䜿çšïŒã®æäœãäžæåæ¢ããå¿
èŠããããŸãã
-replyïŒïŒã¡ãœããã®äŸã§ã¯ãDynamicDispatchActorããç¶æ¿ãããšãå€ãã®ã¡ãœãããã¢ã¯ã¿ãŒã®ãåå空éãã«åé¡ãããããšãããããŸãïŒreplyãreplyIfExistsãgetSenderãterminateã...ïŒ
GParsã®äœæè
ã¯DynamicDispatchActorã¯ã©ã¹ã®
ã¹ããŒãã¬ã¹ã¢ã¯ã¿ãŒã®çžç¶äººãåŒã³åºããŸããããããã¯å€æŽãã£ãŒã«ããæã€ããšãã§ãããã®äžã«ç¶æ
ãæ ŒçŽã§ããjavaã¯ã©ã¹ã®éåžžã®ã€ã³ã¹ã¿ã³ã¹ã§ãã ãããå®èšŒãã
import groovyx.gpars.actor.*; import java.util.ArrayList; import java.util.List; public class StatelessActorTest { public static void main(String[] args) throws InterruptedException { Actor actor = new DynamicDispatchActor() { private final List<Double> state = new ArrayList<>(); public void onMessage(final Double msg) { state.add(msg); reply(state); } }.start(); System.out.println("answer: " + actor.sendAndWait(1.0)); System.out.println("answer: " + actor.sendAndWait(2.0)); System.out.println("answer: " + actor.sendAndWait(3.0)); System.out.println("answer: " + actor.sendAndWait(4.0)); System.out.println("answer: " + actor.sendAndWait(5.0)); } } >> answer: [1.0] >> answer: [1.0, 2.0] >> answer: [1.0, 2.0, 3.0] >> answer: [1.0, 2.0, 3.0, 4.0] >> answer: [1.0, 2.0, 3.0, 4.0, 5.0]
ã¹ããŒããã«ã¢ã¯ã¿ãŒ
ã¹ããŒãã¬ã¹/ã¹ããŒããã«éšéã®çŽ¹ä»ã«ãããèè
ã¯ã¹ããŒããã«ã¢ã¯ã¿ãŒã«ããã¹ããŒããã³ãã¬ãŒãã®å®è£
ãææ©çã«äœæã§ããããšãæå³ããŸãã ç°¡åãªäŸãèŠãŠã¿ãŸãããïŒDefaultActorã®åå«-ã¹ããŒããã«ã¢ã¯ã¿ãŒïŒ
import groovyx.gpars.MessagingRunnable; import groovyx.gpars.actor.*; import static java.util.Arrays.asList; public class StatefulActorTest { public static void main(String[] args) throws Exception { Actor actor = new MyStatefulActor().start(); actor.send("A"); actor.send(1.0); actor.send(Arrays.asList(1, 2, 3)); actor.send("B"); actor.send(2.0); actor.send(Arrays.asList(4, 5, 6)); System.in.read(); } private static class MyStatefulActor extends DefaultActor { protected void act() { loop(new Runnable() { public void run() { react(new MessagingRunnable<Object>(this) { protected void doRun(final Object msg) { System.out.println("react: " + msg); } }); } }); } } } >> react: A >> react: 1.0 >> react: [1, 2, 3] >> react: B >> react: 2.0 >> react: [4, 5, 6]
ãã ããStateãã³ãã¬ãŒãã®çŽæãããå®è£
ã¯ãŸã£ããèããããŸããã ãã®æ¹æ³ã§è¡ããïŒJavaã¯ãã®ãããªããªãã¯ã«æé©ãªèšèªã§ã¯ãããŸãããClojure/ Scalaã§ã¯ããã®ã³ãŒãã¯ã¯ããã«ã³ã³ãã¯ãã«èŠããŸãïŒ
import groovyx.gpars.MessagingRunnable; import groovyx.gpars.actor.*; import java.util.List; import static java.util.Arrays.asList; public class StatefulActorTest { public static void main(String[] args) throws Exception { Actor actor = new MyStatefulActor().start(); actor.send("A"); actor.send(1.0); actor.send(asList(1, 2, 3)); actor.send("B"); actor.send(2.0); actor.send(asList(4, 5, 6)); System.in.read(); } private static class MyStatefulActor extends DefaultActor { protected void act() { loop(new Runnable() { public void run() { react(new MessagingRunnable<String>(this) { protected void doRun(final String msg) { System.out.println("Stage #0: " + msg); react(new MessagingRunnable<Double>() { protected void doRun(final Double msg) { System.out.println(" Stage #1: " + msg); react(new MessagingRunnable<List<Integer>>() { protected void doRun(final List<Integer> msg) { System.out.println(" Stage #2: " + msg + "\n"); } }); } }); } }); } }); } } } >> Stage #0: A >> Stage #1: 1.0 >> Stage #2: [1, 2, 3] >> >> Stage #0: B >> Stage #1: 2.0 >> Stage #2: [4, 5, 6]
ããŠããŸãã¯ãã®å¿åã¯ã©ã¹ã®ã²ã©ããã¹ããåãé€ãããç¶æ
ãå
·äœåãããŸããã
import groovyx.gpars.MessagingRunnable; import groovyx.gpars.actor.*; import java.util.List; import static java.util.Arrays.asList; public class StatefulActorTest { public static void main(String[] args) throws Exception { Actor actor = new MyStatefulActor().start(); actor.send("A"); actor.send(1.0); actor.send(asList(1, 2, 3)); actor.send("B"); actor.send(2.0); actor.send(asList(4, 5, 6)); System.in.read(); } private static class MyStatefulActor extends DefaultActor { protected void act() { loop(new Runnable() { public void run() { react(new Stage0(MyStatefulActor.this)); } }); } } private static class Stage0 extends MessagingRunnable<String> { private final DefaultActor owner; private Stage0(DefaultActor owner) {this.owner = owner;} protected void doRun(final String msg) { System.out.println("Stage #0: " + msg); owner.react(new Stage1(owner)); } } private static class Stage1 extends MessagingRunnable<Double> { private final DefaultActor owner; private Stage1(DefaultActor owner) {this.owner = owner;} protected void doRun(final Double msg) { System.out.println(" Stage #1: " + msg); owner.react(new Stage2()); } } private static class Stage2 extends MessagingRunnable<List<Integer>> { protected void doRun(final List<Integer> msg) { System.out.println(" Stage #2: " + msg + "\n"); } } }
ã¯ããã¯ããç§ã¯ããªãã«å®å
šã«åæããŸããJavaã¯éåžžã«åé·ãªèšèªã§ãã
é·ç§»å³ã¯æ¬¡ã®ããã«ãªããŸãïŒåŒæ°ãåå²ããŸããã§ããïŒ
// START // ----- // | // | // | // | +--------+ // +->| Stage0 | ---String----+ // +--------+ | // ^ v // | +--------+ // | | Stage1 | // List<Integer> +--------+ // | | // | +--------+ Double // +--| Stage2 |<-------+ // +--------+
ã¿ã€ããŒ
ç§ã®åé¡ã解決ããã«ã¯ãã¿ã€ããŒãå¿
èŠã«ãªããŸããã¿ã€ããŒã¯ãäžå®æéã®çµäºãéç¥ããããã«ããã°ã©ã ã§ããŸãã ãéåžžã®ãJavaã§ã¯ãææªã§ãjava.util.concurrent.ScheduledThreadPoolExecutorãŸãã¯java.util.Timerã䜿çšããŸãã ããããç§ãã¡ã¯ä¿³åªã®äžçã«ããŸãïŒ
ããã¯ãã¿ã€ã ã¢ãŠãä»ãã®reactïŒïŒã¡ãœããã§ã¡ãã»ãŒãžãåŸ
ã£ãŠãã³ã°ããã¹ããŒããã«ã¢ã¯ã¿ãŒã§ãã ãã®æéäžã«ã¡ãã»ãŒãžãå±ããªãå ŽåãGParsã€ã³ãã©ã¹ãã©ã¯ãã£ã¯Actor.TIMEOUTã¡ãã»ãŒãžïŒããã¯åãªããTIMEOUTãè¡ã§ãïŒãéä¿¡ããtimeoutMsgã³ã³ã¹ãã©ã¯ã¿ãŒããäœæè
ã«ã¡ãã»ãŒãžããè¿ããŸããã ã¿ã€ããŒãããªããã«ããå Žåã¯ãä»ã®ã¡ãã»ãŒãžãéä¿¡ããŸãïŒãKILLããšããæååãéä¿¡ããŸãïŒ
import groovyx.gpars.MessagingRunnable; import groovyx.gpars.actor.*; import groovyx.gpars.actor.impl.MessageStream; import static java.util.concurrent.TimeUnit.MILLISECONDS; public class Timer<T> extends DefaultActor { private final long timeout; private final T timeoutMsg; private final MessageStream replyTo; public Timer(long timeout, T timeoutMsg, MessageStream replyTo) { this.timeout = timeout; this.timeoutMsg = timeoutMsg; this.replyTo = replyTo; } protected void act() { loop(new Runnable() { public void run() { react(timeout, MILLISECONDS, new MessagingRunnable() { protected void doRun(Object argument) { if (Actor.TIMEOUT.equals(argument)) { replyTo.send(timeoutMsg); } terminate(); } }); } }); } }
ã¿ã€ããŒã®äœ¿çšäŸã
2ã€ã®ã¿ã€ããŒtimerXãštimerYãäœæããŸãããããã¯ãããã1000msã®é
延ã§ã¡ãã»ãŒãžãXããšãYããéä¿¡ããŸãã ãããã500msåŸã«ç§ã¯æ°ãå€ãã£ãŠtimerXããéä»ããããŸããã
import groovyx.gpars.actor.Actor; import groovyx.gpars.actor.impl.MessageStream; public class TimerDemo { public static void main(String[] args) throws Exception { Actor timerX = new Timer<>(1000, "X", new MessageStream() { public MessageStream send(Object msg) { System.out.println("timerX send timeout message: '" + msg + "'"); return this; } }).start(); Actor timerY = new Timer<>(1000, "Y", new MessageStream() { public MessageStream send(Object msg) { System.out.println("timerY send timeout message: '" + msg + "'"); return this; } }).start(); Thread.sleep(500); timerX.send("KILL"); System.in.read(); } } >> timerY send timeout message: 'Y'
åé¡æãšè§£æ±ºç
次ã®éåžžã«äžè¬çãªåé¡ãèæ
®ããŠãã ããã
1.ããªãã®é »åºŠã§äœããã®æ©èœãåŒãèµ·ããå€ãã®ã¹ã¬ããããããŸãã
2.ãã®é¢æ°ã«ã¯2ã€ã®ãªãã·ã§ã³ããããŸãã1ã€ã®åŒæ°ã®åŠçãšåŒæ°ã®ãªã¹ãã®åŠçã§ãã
3.ãã®é¢æ°ã¯ãåŒæ°ãªã¹ãã®åŠçããåã
ã®åŠçã®åèšããå°ãªãã·ã¹ãã ãªãœãŒã¹ãæ¶è²»ãããããªãã®ã§ãã
4.ã¿ã¹ã¯ã¯ããããŒãšé¢æ°ã®éã«ãããã£ãŒãé
眮ãããããŒããåŒæ°ãããã³ãã«ãã«åéããé¢æ°ã«æž¡ãããªã¹ããåŠçãããããã£ãŒãéä¿¡è
ãããŒã«çµæããåé
ãããŸãã
5.ãããã£ãŒã¯ã2ã€ã®å Žåã«åŒæ°ã®ãªã¹ããæž¡ããŸããååãªãµã€ãºã®ããã³ãã«ããåéããããå®å
šãªããã³ãã«ããåéã§ããªãã£ãã¿ã€ã ã¢ãŠãæéã®åŸãã¹ã¬ãããçµæãè¿ããšãã§ãã
ãœãªã¥ãŒã·ã§ã³ã¹ããŒã ãèŠãŠã¿ãŸãããã
ã¿ã€ã ã¢ãŠã100ããªç§ãããã³ãã«ãã®æ倧ãµã€ãº-3ã€ã®åŒæ°
æé0ã§ããããŒT-0ã¯åŒæ°ãAããéä¿¡ããŸãã ãããã£ãŒã¯ãã¯ãªãŒã³ãç¶æ
ãäžä»£0ã§ã
//time:0 // // T-0 --"A"-----> +-------+ generationId=0 // T-1 |Batcher| argList=[] // T-2 +-------+ replyToList=[]
ãã°ãããããšãBatcherã¯ãAããçãããŠT-0ã«æ»ãå¿
èŠãããããšãèªèããŸãã äžä»£0ã®ã¿ã€ããŒãéå§ããŸãã
// +-----+ timeoutMsg=0 // |Timer| timeout=100 //time:0.001 +-----+ // // T-0 +-------+ generationId=0 // T-1 |Batcher| argList=["A"] // T-2 +-------+ replyToList=[T-0]
æé25ããªç§ã§ãT-1ã¹ããªãŒã ã¯åŠçã®ããã«ãBããéä¿¡ããŸã
// +-----+ timeoutMsg=0 // |Timer| timeout=100 //time:25 +-----+ // // T-0 +-------+ generationId=0 // T-1 ---"B"----> |Batcher| argList=["A"] // T-2 +-------+ replyToList=[T-0]
ãã°ãããããšãBatcherã¯ããAããšãBããçãããŠããããŒãT-0ãšT-1ã«æ»ãå¿
èŠãããããšãèªèããŸã
// +-----+ timeoutMsg=0 // |Timer| timeout=100 //time:25.001 +-----+ // // T-0 +-------+ generationId=0 // T-1 |Batcher| argList=["A","B"] // T-2 +-------+ replyToList=[T-0,T-1]
50ããªç§ã®æç¹ã§ãT-2ã¹ããªãŒã ã¯åŠçã®ããã«ãCããéä¿¡ããŸã
// +-----+ timeoutMsg=0 // |Timer| timeout=100 //time:50 +-----+ // // T-0 +-------+ generationId=0 // T-1 |Batcher| argList=["A","B"] // T-2 ----"C"---> +-------+ replyToList=[T-0,T-1]
ãã°ãããããšãBatcherã¯ããAãããBããããã³ãCããèšç®ããããããããŒT-0ãT-1ãããã³T-2ã«æ»ãå¿
èŠãããããšãèªèããŸãã ããã³ãã«ããäžæ¯ã§ãã¿ã€ããŒãã殺ããããšãããããŸã
// +-----+ timeoutMsg=0 // +-"KILL"->|Timer| timeout=100 //time:50.001 | +-----+ // | // T-0 +-------+ generationId=0 // T-1 |Batcher| argList=["A","B","C"] // T-2 +-------+ replyToList=[T-0,T-1,T-2]
ãã°ãããããšãBatcherã¯èšç®çšã®ããŒã¿ãå¥ã®ã¢ã¯ã¿ãŒïŒå¿åïŒã«æž¡ããç¶æ
ãã¯ãªã¢ããŠãäžä»£ã0ãã1ã«å€æŽããŸã
//time:50.002 // // T-0 +-------+ generationId=1 // T-1 |Batcher| argList=[] // T-2 +-------+ replyToList=[] // // +---------+ argList=["A","B","C"] // |anonymous| replyToList=[T-0,T-1,T-2] // +---------+
ãã°ãããããšïŒãã¹ããŒãªãŒããŒããã®å Žåãèšç®ã¯ç¬æã§ãããšæ³å®ããŸãïŒãå¿åã®ã¢ã¯ã¿ãŒãåŒæ°ã®ãªã¹ãã«å¯ŸããŠã¢ã¯ã·ã§ã³ãå®è¡ããŸã[ãAãããBãããCã]-> [ãresïŒAãããresïŒBãã resïŒC "]
//time:50.003 // // T-0 +-------+ generationId=1 // T-1 |Batcher| argList=[] // T-2 +-------+ replyToList=[] // // +---------+ resultList=["res#A","res#B","res#B"] // |anonymous| replyToList=[T-0,T-1,T-2] // +---------+
ãã°ãããããšãå¿åã®ã¢ã¯ã¿ãŒãèšç®çµæãã¹ã¬ããã«é
åžããŸã
//time:50.004 // // T-0 <-----------+ +-------+ generationId=1 // T-1 <---------+ | |Batcher| argList=[] // T-2 <-------+ | | +-------+ replyToList=[] // | | | // | | +---"res#A"--- +---------+ // | +---"res#B"----- |anonymous| // +--"res#C"-------- +---------+
ãã°ãããããšãã·ã¹ãã ã¯å
ã®ãçŽç²ãªãç¶æ
ã«æ»ããŸã
//time:50.005 // // T-0 +-------+ generationId=1 // T-1 |Batcher| argList=[] // T-2 +-------+ replyToList=[]
ãã®åŸãæé75ã§ãT-2ã¹ããªãŒã ã¯ãDãã®èšç®ã«æž¡ãããŸãã
//time:75 // // T-0 +-------+ generationId=1 // T-1 |Batcher| argList=[] // T-2 ----"D"---> +-------+ replyToList=[]
ãã°ãããããšãBatcherã¯ãDããçãããŠT-2ã¹ããªãŒã ã«æ»ãå¿
èŠãããããšãèªèããããã«ç¬¬1äžä»£ã®ã¿ã€ããŒãéå§ãããŸããã
// +-----+ timeoutMsg=1 // |Timer| timeout=100 //time:75.001 +-----+ // // T-0 +-------+ generationId=1 // T-1 |Batcher| argList=["D"] // T-2 +-------+ replyToList=[T-2]
100ããªç§åŸïŒ175ããªç§ïŒãGParsã€ã³ãã©ã¹ãã©ã¯ãã£ã¯ã¿ã€ããŒã«åŸ
æ©æéã®æºäºãéç¥ããŸã
// +--"TIMEOUT"-- // | // v // +-----+ timeoutMsg=1 // |Timer| timeout=100 //time:175 +-----+ // // T-0 +-------+ generationId=1 // T-1 |Batcher| argList=["D"] // T-2 +-------+ replyToList=[T-2]
ãã°ãããããšãã¿ã€ããŒã¯Batcherã«ç¬¬1äžä»£ãã¿ã€ã ã¢ãŠãããããšãéç¥ããterminateïŒïŒãåŒã³åºããŠèªæ®ºããŸãã
// +-----+ timeoutMsg=1 // +----1-----|Timer| timeout=100 //time:175.001 | +-----+ // v // T-0 +-------+ generationId=1 // T-1 |Batcher| argList=["D"] // T-2 +-------+ replyToList=[T-2]
åŒæ°ãªã¹ãïŒåŒæ°ã1ã€ãããªãïŒã§èšç®ãå®è¡ããå¿åã¢ã¯ã¿ãŒãäœæãããŸãã ãžã§ãã¬ãŒã·ã§ã³1ãã2ãžã®å€æŽ
//time:175.002 // // T-0 +-------+ generationId=2 // T-1 |Batcher| argList=[] // T-2 +-------+ replyToList=[] // // +---------+ argList=["D"] // |anonymous| replyToList=[T-2] // +---------+
俳åªã¯ä»äºãããŸãã
//time:175.003 // // T-0 +-------+ generationId=2 // T-1 |Batcher| argList=[] // T-2 +-------+ replyToList=[] // // +---------+ resultList=["res#D"] // |anonymous| replyToList=[T-2] // +---------+
俳åªã¯çµæãäžãã
//time:175.004 // // T-0 +-------+generationId=2 // T-1 |Batcher|argList=[] // T-2 <-------+ +-------+replyToList=[] // | // | +---------+ // +--"res#C"----- |anonymous| // +---------+
å
ã®ãçŽç²ãªãç¶æ
ã®ã·ã¹ãã
//time:175.005 // // T-0 +-------+ generationId=2 // T-1 |Batcher| argList=[] // T-2 +-------+ replyToList=[]
åé¡è§£æ±º
BatchProcessor-ãé¢æ°ãã®ã€ã³ã¿ãŒãã§ãŒã¹ã ããããã¢ãŒããåŠç
import java.util.List; public interface BatchProcessor<ARG, RES> { List<RES> onBatch(List<ARG> argList) throws Exception; }
Batcher-åŒæ°ããããã¯ãããã¯ã©ã¹ã ã³ã¢ãœãªã¥ãŒã·ã§ã³
import groovyx.gpars.actor.*; import groovyx.gpars.actor.impl.MessageStream; import java.util.*; public class Batcher<ARG, RES> extends DynamicDispatchActor { // fixed parameters private final BatchProcessor<ARG, RES> processor; private final int maxBatchSize; private final long batchWaitTimeout; // current state private final List<ARG> argList = new ArrayList<>(); private final List<MessageStream> replyToList = new ArrayList<>(); private long generationId = 0; private Actor lastTimer; public Batcher(BatchProcessor<ARG, RES> processor, int maxBatchSize, long batchWaitTimeout) { this.processor = processor; this.maxBatchSize = maxBatchSize; this.batchWaitTimeout = batchWaitTimeout; } public void onMessage(final ARG elem) { argList.add(elem); replyToList.add(getSender()); if (argList.size() == 1) { lastTimer = new Timer<>(batchWaitTimeout, ++generationId, this).start(); } else if (argList.size() == maxBatchSize) { lastTimer.send("KILL"); lastTimer = null; nextGeneration(); } } public void onMessage(final long timeOutId) { if (generationId == timeOutId) {nextGeneration();} } private void nextGeneration() { new DynamicDispatchActor() { public void onMessage(final Work<ARG, RES> work) throws Exception { List<RES> resultList = work.batcher.onBatch(work.argList); for (int k = 0; k < resultList.size(); k++) { work.replyToList.get(k).send(resultList.get(k)); } terminate(); } }.start().send(new Work<>(processor, new ArrayList<>(argList), new ArrayList<>(replyToList))); argList.clear(); replyToList.clear(); generationId = generationId + 1; } private static class Work<ARG, RES> { public final BatchProcessor<ARG, RES> batcher; public final List<ARG> argList; public final List<MessageStream> replyToList; public Work(BatchProcessor<ARG, RES> batcher, List<ARG> argList, List<MessageStream> replyToList) { this.batcher = batcher; this.argList = argList; this.replyToList = replyToList; } } }
BatcherDemoã¯ãBatcherã¯ã©ã¹ã®ãã¢ã§ãã åè·¯å³ãšåã
import groovyx.gpars.actor.Actor; import java.io.IOException; import java.util.*; import java.util.concurrent.*; import static java.util.concurrent.Executors.newCachedThreadPool; public class BatcherDemo { public static final int BATCH_SIZE = 3; public static final long BATCH_TIMEOUT = 100; public static void main(String[] args) throws InterruptedException, IOException { final Actor actor = new Batcher<>(new BatchProcessor<String, String>() { public List<String> onBatch(List<String> argList) { System.out.println("onBatch(" + argList + ")"); ArrayList<String> result = new ArrayList<>(argList.size()); for (String arg : argList) { result.add("res#" + arg); } return result; } }, BATCH_SIZE, BATCH_TIMEOUT).start(); ExecutorService exec = newCachedThreadPool(); exec.submit(new Callable<Void>() { // T-0 public Void call() throws Exception { System.out.println(actor.sendAndWait(("A"))); return null; } }); exec.submit(new Callable<Void>() { // T-1 public Void call() throws Exception { Thread.sleep(25); System.out.println(actor.sendAndWait(("B"))); return null; } }); exec.submit(new Callable<Void>() { // T-2 public Void call() throws Exception { Thread.sleep(50); System.out.println(actor.sendAndWait(("C"))); Thread.sleep(25); System.out.println(actor.sendAndWait(("D"))); return null; } }); exec.shutdown(); } } >> onBatch([A, B, C]) >> res#A >> res#B >> res#C >> onBatch([D]) >> res#D
ãããã«
ç§ã®æèŠã§ã¯ãã¢ã¯ã¿ãŒã¯ãã«ãã¹ã¬ããããªããã£ãã®ããã°ã©ãã³ã°ã«é©ããŠããŸããããã¯ãç¹ã«é·ç§»åŒæ°ã«äŸåããè€éãªé·ç§»å³ãæã€æéç¶æ
ãã·ã³ã§ãã
ãã®èšäºã®äŸã«ã¯ã
gpars.org/guideãªã©ãããŸããŸãªå Žæã§ãªã³ã©ã€ã³ã§èŠã€ãã£ãã³ãŒãã®ããªãšãŒã·ã§ã³ããããŸãã
第äºéšã§ã¯
- ææ¡ããããœãªã¥ãŒã·ã§ã³ã®é床ã枬å®ããŸã
- åã
ã®ãã©ã³ã¶ã¯ã·ã§ã³ããã®ããŸããŸãªã¹ã¬ããããã®èŠæ±ã1ã€ã®å€§ããªRDBMSãã©ã³ã¶ã¯ã·ã§ã³ã«çµåããããšã«ãããJDBCãšã®äœæ¥ãå éããŸãã ã€ãŸããåãConnectionå
ã§ã¯ãªããç°ãªãConnectionéã§ããããå®è¡ããŸãã
UPDãã¥ãŒãªãŒã³ã¡ã³ããããããšãïŒ
GParsã¯Java + Groovyã®ããã¯ã¹ã§æžãããŠããŸãã
ãœãŒã¹ã³ãŒãã¯ãGroovyããã±ãŒãžãèšè¿°ãããŠããããšã瀺ããŠããŸã
-groovyx.gpars.csp *
-groovyx.gpars.paã*
-groovyx.gpars *ïŒéšåçã«ïŒ
é£çµ¡å
Javaãã¬ãŒãã³ã°ããªã³ã©ã€ã³ã§è¡ãïŒ
ããã°ã©ãã³ã°ã³ãŒã¹ã¯ãã¡ãïŒ
ãJava Coreã³ãŒã¹ã®åèšèš
ã®äžç°ãšããŠãã¬ãŒãã³ã°è³æã®äžéšãå
¬éã
ãŠããŸã ã
ãã®èšäºã§ã¯ãèŠèŽè
ã®è¬çŸ©ã®ãããªé²ç»ã
youtubeãã£ã³ãã«ã§èŠãããšãã§ããŸããããããã
ãã£ã³ãã«ã®ãããªãããäœç³»åãã
ãŠããŸã ã
ã¹ã«ã€ãïŒGolovachCourses
ã¡ãŒã«ïŒGolovachCourses@gmail.com