
ãã®ããã¥ã¡ã³ãã¯ãRustããã°ã©ãã³ã°èšèªã§ããfutures
ã®ã³ã³ãããåŠç¿ããã®ã«åœ¹ç«ã¡ãŸããããã¯ãfuturesãšãŒãã³ã¹ãã¹ã¬ããã®å®è£
ãæäŸããŸãã Futuresã¯ã C++
ã Java
ã Scala
ãªã©ãä»ã®å€ãã®ããã°ã©ãã³ã°èšèªã§å©çšã§ãã futures
ã³ã³ããã¯ãããã®èšèªã®ã©ã€ãã©ãªããã€ã³ã¹ãã¬ãŒã·ã§ã³ãåŸãŠããŸãã ãã ãã人éå·¥åŠã«åºã¥ããŠãããRustã«åºæã®ãŒãã³ã¹ãæœè±¡åã®å²åŠã«ãæºæ ããŠããŸããã€ãŸããæªæ¥ãäœæããã³æ§æããããã«ã¡ã¢ãªå²ãåœãŠãå¿
èŠãšããããããã管çããTask
å¿
èŠãªå²ãåœãŠã¯1ã€ã ãã§ãã Futureã¯Rustã®éåæã§æ§æå¯èœãªé«æ§èœI / Oã®åºç€ã§ããå¿
èŠããããåæã®ããã©ãŒãã³ã¹æž¬å®ã§ã¯ãfutureã§æ§ç¯ãããåçŽãªHTTPãµãŒããŒãéåžžã«é«éã§ããããšã瀺ããŠããŸãã
ãã®æçš¿ã¯ã futures-rsã®å
¬åŒãã¥ãŒããªã¢ã«ã®ç¿»èš³ã§ãã
ãã®ããã¥ã¡ã³ãã¯ããã€ãã®ã»ã¯ã·ã§ã³ã«åãããŠããŸãã
- ãHello worldïŒã;
- ç¹æ§ã®æªæ¥;
- ã¿ã€ã
Stream
; - å
·äœçãªå
ç©ãšã¹ããªãŒã ïŒ
Stream
ïŒ; - å
ç©ãè¿ããŸãã
Task
ãšå°æ¥;- ããŒã«ã«ã¿ã¹ã¯ããŒã¿ã
ããã«ã¡ã¯äžçïŒ
futures
ã³ã³ããã«ã¯RustããŒãžã§ã³1.10.0以éãå¿
èŠã§ããããã¯Rustup
ã䜿çšããŠç°¡åã«ã€ã³ã¹ããŒã«ã§ããŸãã ã³ã³ããã¯ãã¹ãæžã¿ã§ãWindowsãmacOSãLinuxã§æ£åžžã«åäœããŸãããä»ã®ãã©ãããã©ãŒã ã®PRã¯ãã€ã§ãæè¿ããŸãã æ¬¡ã®ããCargo.toml
ããããžã§ã¯ãã®Cargo.toml
远å ã§ããŸãã
[dependencies] futures = { git = "https://github.com/alexcrichton/futures-rs" } tokio-core = { git = "https://github.com/tokio-rs/tokio-core" } tokio-tls = { git = "https://github.com/tokio-rs/tokio-tls" }
泚ïŒãã®ã©ã€ãã©ãªã¯ã¢ã¯ãã£ãã«éçºäžã§ãããgitã§ãœãŒã¹ãçŽæ¥ååŸããå¿
èŠããããŸãããåŸã§ã³ã³ãããååŸããå¿
èŠããããŸã
crates.ioã§å
¬éãããŸãã
ããã§ã¯ã3ã€ã®ã³ã³ããã远å ããŸãïŒ
- futures -
Future
ããã³Stream
å®è£
ã®å®çŸ©ãšã³ã¢ã - tokio-core-ç¹å®ãæäŸãã
mio
ã³ã³ãããŒãã€ã³ãã£ã³ã°
TCPããã³UDPã®Future
ããã³Stream
å®è£
ã - tokio-tls-å
ç©ããŒã¹ã®SSL / TLSå®è£
ã
futuresã³ã³ããã¯ãã©ã³ã¿ã€ã ãå
¥å/åºåã¬ã€ã€ãŒãäžåæããªãfuturesã®äœã¬ãã«å®è£
ã§ãã 以äžã®äŸã§ã¯ã tokio-coreã§å©çšå¯èœãªç¹å®ã®å®è£
ã䜿çšããŠã futureãšã¹ã¬ããã䜿çšããŠããªãŒããŒããããªãã§è€éãªI / Oãå®è¡ããæ¹æ³ã瀺ããŸãã
å¿
èŠãªãã®ã¯ãã¹ãŠæã£ãã®ã§ãæåã®ããã°ã©ã ãäœæããŸãã ãããŒã¯ãŒã«ãã®äŸãšããŠãããŒã ãããŠã³ããŒãããŠãã ãã
éããŒãžïŒ
extern crate futures; extern crate tokio_core; extern crate tokio_tls; use std::net::ToSocketAddrs; use futures::Future; use tokio_core::reactor::Core; use tokio_core::net::TcpStream; use tokio_tls::ClientContext; fn main() { let mut core = Core::new().unwrap(); let addr = "www.Rust-lang.org:443".to_socket_addrs().unwrap().next().unwrap(); let socket = TcpStream::connect(&addr, &core.handle()); let tls_handshake = socket.and_then(|socket| { let cx = ClientContext::new().unwrap(); cx.handshake("www.Rust-lang.org", socket) }); let request = tls_handshake.and_then(|socket| { tokio_core::io::write_all(socket, "\ GET / HTTP/1.0\r\n\ Host: www.Rust-lang.org\r\n\ \r\n\ ".as_bytes()) }); let response = request.and_then(|(socket, _)| { tokio_core::io::read_to_end(socket, Vec::new()) }); let (_, data) = core.run(response).unwrap(); println!("{}", String::from_utf8_lossy(&data)); }
src/main.rs
沿ã£ãŠãã®ã³ã³ãã³ããå«ããã¡ã€ã«ãäœæãã cargo run
ã³ãã³ããå®è¡ãããšãRustã¡ã€ã³ããŒãžã®HTMLã衚瀺ãããŸãã
泚ïŒRustc 1.10ã¯ãã®äŸããã£ããã³ã³ãã€ã«ããŸãã 1.11ã§ã¯ã³ã³ãã€ã«ãé«éã«ãªããŸããã
ãã®ã³ãŒãã¯å€§ããããŠããã«çè§£ã§ããªããããè¡ãèŠãŠãããŸãããã
main()
颿°ãèŠãŠãã ããïŒ
let mut core = Core::new().unwrap(); let addr = "www.Rust-lang.org:443".to_socket_addrs().unwrap().next().unwrap();
ããã«ããããã¹ãŠã®å
¥å/åºåãå®è¡ãããã€ãã³ãã«ãŒããäœæãããŸãã æšæºã©ã€ãã©ãªã®to_socket_addrs
ã¡ãœããã䜿çšããŠããã¹ãåãwww.Rust-lang.orgãã倿ããåŸã
次ïŒ
let socket = TcpStream::connect(&addr, &core.handle());
ã€ãã³ãã«ãŒããã³ãã«ãååŸã ã TcpStream :: connectã䜿çšããŠãã¹ãã«æ¥ç¶ããŸãã ç¹ã«ã TcpStream :: connectã¯futureãè¿ããŸãã å®éããœã±ããã¯æ¥ç¶ãããŠããŸããããæ¥ç¶ã¯åŸã§è¡ãããŸãã
ãœã±ããã䜿çšå¯èœã«ãªã£ããã3ã€ã®æé ã«åŸã£ãŠRust-lang.orgã®ããŒã ããŒãžãããŒãããå¿
èŠããããŸãã
TLSãã³ãã·ã§ã€ã¯ãå®è¡ããŸãã ãã®Webãµã€ãã¯HTTPSçµç±ã§ã®ã¿ã¢ã¯ã»ã¹ã§ãããããããŒã443ã«æ¥ç¶ããTLSãããã³ã«ã«åŸãå¿
èŠããããŸãã
HTTP GET
ãªã¯ãšã¹ããéä¿¡ããŸãã ãã®ã¬ã€ãã®äžéšãšããŠããªã¯ãšã¹ããæåã§äœæããŸãããæŠéããã°ã©ã ã§ã¯ã futures
æ§ç¯ãããHTTPã¯ã©ã€ã¢ã³ãã䜿çšããå¿
èŠããããŸãã
- çµè«ãšããŠããœã±ãããããã¹ãŠã®ããŒã¿ãèªã¿åã£ãŠå¿çãããŠã³ããŒãããŸãã
ãããã®åã¹ããããè©³çŽ°ã«æ€èšããŠãã ããã
æåã®ã¹ãããïŒ
let tls_handshake = socket.and_then(|socket| { let cx = ClientContext::new().unwrap(); cx.handshake("www.Rust-lang.org", socket) });
ããã§ã¯ãå°æ¥ã®ç¹æ§ã®and_thenã¡ãœããã䜿çšããã TcpStream :: connectã¡ãœããã®çµæã§åŒã³åºããŸãã and_thenã¡ãœããã¯ãåã®ãã¥ãŒãã£ãŒã®å€ãåãåãã¯ããŒãžã£ãŒãåãå
¥ããŸãã ãã®å Žåã socket
ã®ã¿ã€ãã¯TcpStreamã«ãªããŸãã
TcpStream :: connectããšã©ãŒãè¿ããå Žåã and_thenã«æž¡ãããã¯ããŒãžã£ãŒã¯å®è¡ãããªãããšã«æ³šæããŠãã ããã
socket
åä¿¡ãããã ClientContext :: newã䜿çšããŠã¯ã©ã€ã¢ã³ãTLSã³ã³ããã¹ããäœæããŸãã tokio-tls
ãã®ã¿ã€ãã¯ãTLSæ¥ç¶ã®ã¯ã©ã€ã¢ã³ãåŽã衚ããŸãã æ¬¡ã«ã ãã³ãã·ã§ã€ã¯ã¡ãœãããåŒã³åºããŠTLS ãã³ãã·ã§ã€ã¯ãå®è¡ããŸãã æåã®åŒæ°ã¯æ¥ç¶ãããã¡ã€ã³åã§ã2çªç®ã¯I / Oãªããžã§ã¯ãïŒãã®å Žåã¯socket
ãªããžã§ã¯ãïŒã§ãã
TcpStream ::以åã«æ¥ç¶ããããã«ã ãã³ãã·ã§ã€ã¯ã¡ãœããã¯futureãè¿ããŸãã ã¯ã©ã€ã¢ã³ããšãµãŒããŒã¯I / OãèšŒææžã®ç¢ºèªãªã©ãè¡ãå¿
èŠããããããTLSãã³ãã·ã§ã€ã¯ã«ã¯æéããããå ŽåããããŸãã å®è¡åŸã futureã¯äžèšã®TcpStreamã«äŒŒãTlsStreamãè¿ããŸãã
and_thenã³ã³ãããŒã¿ã¯å€ãã®ç§å¯ã®äœæ¥ãè¡ããå
ç©ãæ£ããé åºã§å®è¡ããããã®å Žã§è¿œè·¡ãããããã«ããŸãã åæã«ã and_thenã«ãã£ãŠè¿ãããå€ã¯Future ãã¬ã€ããå®è£
ããããã äžé£ã®èšç®ãæ§æã§ããŸãã
次ã«ãHTTPãªã¯ãšã¹ããéä¿¡ããŸãã
let request = tls_handshake.and_then(|socket| { tokio_core::io::write_all(socket, "\ GET / HTTP/1.0\r\n\ Host: www.Rust-lang.org\r\n\ \r\n\ ".as_bytes()) });
ããã§ãåã®ã¹ãããïŒ tls_handshake
ïŒããæªæ¥ãååŸããå床and_thenã䜿çšããŠèšç®ãç¶è¡ããŸããã write_allã³ã³ãããŒã¿ãŒã¯å®å
šãªHTTPèŠæ±ãæžã蟌ã¿ãå¿
èŠã«å¿ããŠè€æ°ã®æžã蟌ã¿ãçæããŸãã
write_allã¡ãœããã«ãã£ãŠè¿ãããfutureã¯ããã¹ãŠã®ããŒã¿ããœã±ããã«æžã蟌ãŸãããšããã«å®è¡ãããŸãã TlsStreamããœã±ããã«éä¿¡ããåã«æžã蟌ãã ãã¹ãŠã®ããŒã¿ãç§å¯ã«æå·åããããšã¯æ³šç®ã«å€ããŸãã
ãªã¯ãšã¹ãã®3çªç®ãšæåŸã®éšåã¯æ¬¡ã®ããã«ãªããŸãã
let response = request.and_then(|(socket, _)| { tokio_core::io::read_to_end(socket, Vec::new()) });
åã®å°æ¥ã®request
ãä»åºŠã¯read_to_endã³ã³ãããŒã¿ãŒã®çµæã«åã³ãªã³ã¯ãããŸãã ãã®Futureã¯ããœã±ãããããã¹ãŠã®ããŒã¿ãèªã¿åããæäŸããããããã¡ãŒã«å
¥ããåŠçäžã®æ¥ç¶ãEOFãéä¿¡ãããšãã«ãããã¡ãŒãè¿ããŸãã
åãšåãããã«ããœã±ããããã®èªã¿åãã¯å®éã«ãµãŒããŒããåä¿¡ããããŒã¿ãç§å¯ã«è§£èªãããããè§£èªãããããŒãžã§ã³ãèªã¿åããŸãã
ãã®å Žæã§äžæãäžæããããšãäœãèµ·ãããªãã®ã§é©ãã§ãããã ããã¯ãç§ãã¡ãè¡ã£ãããšã¯ãã¹ãŠå°æ¥ã®èšç®ã«åºã¥ããŠãããå®éã«å®è¡ããŠããªãããã§ãã ãããŸã§ã¯ãI / Oãå®è¡ããŠããããHTTPèŠæ±ãªã©ãå®è¡ããŠããŸããã
å
ç©ãæ¬åœã«å®è¡ããããããå®äºãŸã§ç®¡çããã«ã¯ãã€ãã³ãã«ãŒããå®è¡ããå¿
èŠããããŸãã
let (_, data) = core.run(response).unwrap(); println!("{}", String::from_utf8_lossy(&data));
ããã§ãå°æ¥ã®response
ã¯ã€ãã³ãã«ãŒãã«é
眮ãããfutureãå®è¡ããããã«èŠæ±ããŸã ã çµæãååŸããããŸã§ãã€ãã³ãã«ãŒããå®è¡ãããŸãã
ç¹ã«ã core.run(..)
ã®åŒã³åºãã¯ãfutureãè¿ããããŸã§åŒã³åºãã¹ã¬ããããããã¯ããŸãã ããã¯ã data
ã®ã¿ã€ããVec<u8>
ã§ããããšãæå³ãdata
ã ãã®åŸãéåžžã©ããæšæºåºåã§å°å·ã§ããŸãã
ãµãïŒ TCPæ¥ç¶ãåæåãã èšç®ãã§ãŒã³ãäœæ ãããœã±ããããããŒã¿ãèªã¿åã Futureã調ã¹ãŸããã ããããããã¯å
ç©ã®å¯èœæ§ã®ã»ãã®äžäŸã«éããããã¥ã¢ã³ã¹ãèæ
®ããŸãã
æªæ¥ã®ãã£ã©ã¯ã¿ãŒ
futureã¯ã futures
ã³ã³ããã®äžæ žã§ãã ãã®ç¹æ§ã¯ãéåæã³ã³ãã¥ãŒãã£ã³ã°ãšãã®çµæã衚ããŸãã
次ã®ã³ãŒããèŠãŠãã ããã
trait Future { type Item; type Error; fn poll(&mut self) -> Poll<Self::Item, Self::Error>;
ãã®å®çŸ©ã«ã¯ãçåãæããããå€ãã®ãã€ã³ããå«ãŸããŠãããšç¢ºä¿¡ããŠããŸãã
Item
ãšError
;poll
- å°æ¥ã®ã³ã³ãããŒã¿ãŒã
ãããã詳现ã«åæããŸãã
Item
ãšError
type Item; type Error;
ãåãã®ãšãããå°æ¥ã®ç¹æ§ã®æåã®ç¹åŸŽã¯ã2ã€ã®é¢é£ããã¿ã€ããå«ãŸããŠããããšã§ãã ãããã¯æªæ¥ãåãåãããšãã§ãã䟡å€ã®ã¿ã€ãã§ãã åFuture
ã€ã³ã¹ã¿ã³ã¹ã¯ã Result<Self::Item, Self::Error>
ãšããŠåŠçã§ããŸãã
ããã2ã€ã®ã¿ã€ãã¯ãfutureãæž¡ãwhere
æ¡ä»¶ãšãfutureãè¿ãããã¿ã€ãã·ã°ããã£where
éåžžã«é »ç¹ã«äœ¿çšãããŸãã
ããšãã°ãfutureãè¿ãå Žåãæ¬¡ã®ããã«èšè¿°ã§ããŸãã
fn foo() -> Box<Future<Item = u32, Error = io::Error>> {
ãŸãã¯ãæªæ¥ãåãå
¥ãããšãïŒ
fn foo<F>(future: F) where F: Future<Error = io::Error>, F::Item: Clone, {
poll
fn poll(&mut self) -> Poll<Self::Item, Self::Error>;
å°æ¥ã®ç¹æ§ã¯ããã®ã¡ãœããã«åºã¥ããŠããŸãã pollã¡ãœããã¯ãå°æ¥èšç®ãããå€ãååŸããããã®å¯äžã®ãšã³ããªãã€ã³ãã§ãã å°æ¥ã®ãŠãŒã¶ãŒãšããŠããã®ã¡ãœãããçŽæ¥åŒã³åºãå¿
èŠã¯ã»ãšãã©ãããŸããã ã»ãšãã©ã®å Žåãå
ç©ã®åšãã«é«ã¬ãã«ã®æœè±¡åãäœæããã³ã³ãããŒã¿ãä»ããŠå
ç©ãšããåãããŸãã ãã ããå
éšã§å
ç©ãã©ã®ããã«æ©èœããããç¥ãããšã¯åœ¹ç«ã¡ãŸãã
pollã¡ãœããã詳ããèŠãŠã¿ãŸãããã
&mut self
åŒæ°ã«æ³šæããŠãã ãããããã«ãããå€ãã®å¶éãšããããã£ãçºçããŸãã
- å
ç©ã¯ãäžåºŠã«1ã€ã®ã¹ã¬ããã«ãã£ãŠã®ã¿ããŒãªã³ã°ã§ããŸãã
poll
ã¡ãœããã®å®è¡äžãfutureã¯ç¶æ
ã倿Žã§ããŸããpoll
ã³ããããããåŸãå
ç©ã®æææš©ãå¥ã®ãšã³ãã£ãã£ã«è²æž¡ã§ããŸãã
å®éã«ã¯ã ããŒãªã³ã°ã¿ã€ãã¯ãšã€ãªã¢ã¹ã§ãã
type Poll<T, E> = Result<Async<T>, E>;
ãŸãã éåæåæãäœã§ããããèŠãŠãã ããïŒ
pub enum Async<T> { Ready(T), NotReady, }
ãã®åæã«ãããfutureã¯ãå°æ¥ã®å€ã䜿çšããæºåãã§ãããšãã«çžäºäœçšã§ããŸãã ãšã©ãŒãçºçããå Žåã Err
ãããã«è¿ãããŸãã ãã以å€ã®å ŽåãFutureå€ãå®å
šã«åä¿¡ããããããŸã æºåãã§ããŠããªããšãã«ã éåæåæã衚瀺ãããŸãã
Iterator
ãããªFutureãã¬ã€ãã¯ãfutureããã§ã«åŠçãããŠããå Žåã«pollã¡ãœãããåŒã³åºãããåŸã«äœãèµ·ããããæ±ºå®ããŸããã ã€ãŸãã Futureãã¬ã€ããå®è£
ãã人ã¯ã pollã¡ãœãããæ£åžžã«è¿ããããã©ããã確èªããããã«ç¶æ
ãç¶æããå¿
èŠã¯ãããŸããã
ããŒãªã³ã°åŒã³åºããNotReady
è¿ããå Žåãå°æ¥ã¯åã³å®è¡ããã¿ã€ãã³ã°ãç¥ãå¿
èŠããããŸãã ãã®ç®æšãéæããããã«ãå°æ¥ã¯æ¬¡ã®ã¡ã«ããºã ãæäŸããå¿
èŠããããŸãNotReady
ãåä¿¡NotReady
çŸåšã®ã¿ã¹ã¯ã¯å€ã䜿çšå¯èœã«ãªã£ããšãã«éç¥ãåä¿¡ã§ããå¿
èŠããããŸãã
ããŒã¯ã¡ãœããã¯ãéç¥é
ä¿¡ã®äž»èŠãªãšã³ããªãã€ã³ãã§ãã ãã®é¢æ°ã¯ã Send
ããã³'static
åãå®è£
ããã¿ã¹ã¯ãè¿ããã¡ã€ã³ã¡ãœããunparkãæã¡ãŸãã unparkã¡ãœããã®åŒã³åºãã¯ãfutureãèšç®ãå®è¡ããŠå€ãè¿ãããšãã§ããããšã瀺ããŸãã
ãã詳现ãªããã¥ã¡ã³ãã¯ãã¡ãã«ãããŸã ã
å°æ¥ã®ã³ã³ãããŒã¿ãŒ
ä»ã§ã¯pollã¡ãœããã¯ã¯ãŒã¯ãããŒã«å°ãèŠçãå ããããšãã§ããããã§ãã String
ãè¿ãu32
ãããã u32
ãè¿ãu32
ã«å€æãããå Žåã¯ã©ãããŸããïŒ ãã®ãããªæ§æãåŸãããã«ãå°æ¥ã®ãã£ã©ã¯ã¿ãŒã¯å€æ°ã®ã³ã³ãããŒã¿ãŒãæäŸããŸãã
ãããã®ã³ã³ãããŒã¿ã¯ã€ãã¬ãŒã¿ã³ã³ãããŒã¿ã«äŒŒãŠããããã¹ãŠæªæ¥ãåãå
¥ããæ°ããæªæ¥ãè¿ããŸãã
ããšãã°ã次ã®ããã«æžãããšãã§ããŸãã
fn parse<F>(future: F) -> Box<Future<Item=u32, Error=F::Error>> where F: Future<Item=String> + 'static, { Box::new(future.map(|string| { string.parse::<u32>().unwrap() })) }
ããã§ã¯ãå°æ¥ã倿ããããã«ã String
åãè¿ããå°æ¥çã«ã¯u32
è¿ãmapã䜿çšãããŸãã ããã¯ã¹ã®ããã±ãŒãžã¯å¿
ãããå¿
èŠã§ã¯ãªãã å
ç©è¿åã®ã»ã¯ã·ã§ã³ã§è©³çްã«èª¬æããŸã ã
ã³ã³ãããŒã¿ã䜿çšãããšãæ¬¡ã®æŠå¿µã衚çŸã§ããŸãã
- ã¿ã€ãfutureã®å€æŽïŒ map ã map_err ïŒ;
- ãœãŒã¹ãå®è¡ããããšãã«å¥ã®ãã¥ãŒãã£ãŒãèµ·åããïŒ then ã and_then ã or_else ïŒ;
- å°ãªããšã1ã€ã®å
ç©ãå®äºãããšãã®å®è¡ã®ç¶ç¶ïŒ éžæ ïŒ;
- 2ã€ã®æªæ¥ïŒ join ïŒãå®äºããã®ãåŸ
ã£ãŠããŸãã
- èšç®åŸã®
poll
åäœã®å®çŸ©ïŒ fuse ïŒã
ã³ã³ãããŒã¿ã®äœ¿çšã¯ãRustã§Iterator
ã䜿çšããããScalaã§futures
ã䜿çšããããšã«äŒŒãŠããŸãã å
ç©ã§ã®ã»ãšãã©ã®æäœã¯ããããã®ã³ã³ãããŒã¿ã䜿çšããããšã«ãªããŸãã ãã¹ãŠã®ã³ã³ãããŒã¿ã®ã³ã¹ãã¯ãŒãã§ããã€ãŸããã¡ã¢ãªå²ãåœãŠã¯ãªããå®è£
ã¯æåã§èšè¿°ãããããªæ¹æ³ã§æé©åãããŸãã
Stream
ã¿ã€ã
以åã¯ã Future traitã調ã¹ãŸãããããã¯ã1ã€ã®å€ã®ã¿ãèšç®ãããšãã«åœ¹ç«ã¡ãŸãã ãã ããèšç®ãå€ã®ã¹ããªãŒã ãšããŠæç€ºããæ¹ãè¯ãå ŽåããããŸãã ããšãã°ãTCPãªã¹ããŒã¯ããã®åç¶æéã«ããã£ãŠå€ãã®TCPæ¥ç¶ã確ç«ããŸãã æšæºã©ã€ãã©ãªã®ã©ã®ãšã³ãã£ãã£ãFutureããã³StreamãšåçããèŠãŠã¿ãŸãããïŒ
ïŒåã®ã¢ã€ãã | åæãã | éåæ | äžè¬çãªæäœ |
---|
1 | [çµæ] | [ä»åŸ] | [map]ã[and_then] |
â | [ã€ãã¬ãŒã¿ãŒ] | [ã¹ããªãŒã ] | [map] [stream-map]ã[fold]ã[collect] |
ã¹ããªãŒã ã¿ã€ããèŠãŠã¿ãŸãããã
trait Stream { type Item; type Error; fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>; }
Streamã®ã¿ã€ããFutureã®ã¿ã€ããšéåžžã«äŒŒãŠããããšã«æ°ã¥ãããããããŸããã äž»ãªéãã¯ã pollã¡ãœãããOption<Self::Item>
ã§ã¯ãªãOption<Self::Item>
è¿ãããšã§ãã
æéã®çµéã«äŒŽãã¹ããªãŒã ã¯å€ãã®ãªãã·ã§ã³å€ãçæãã Ready(None)
è¿ãããšã§ã¹ããªãŒã ã®çµãããéç¥ããŸãã ã³ã¢ã¯ãç¹å®ã®é åºã§å€ãçæããéåæã¹ããªãŒã ã§ãã
å®éã Streamã¯Future ãã¬ã€ãã®ç¹å¥ãªã€ã³ã¹ã¿ã³ã¹ã§ããã into_futureã¡ãœããã䜿çšããŠFutureã«å€æã§ããŸãã
è¿ããããã¥ãŒãã£ãŒã¯ãã¹ããªãŒã ãšã¹ããªãŒã èªäœããæ¬¡ã®å€ãååŸããŸããããã«ãããåŸã§ããå€ãã®å€ãååŸã§ããŸãã ãŸããåºæ¬çãªãã¥ãŒãã£ãŒã³ã³ãããŒã¿ãŒã䜿çšããŠãã¹ããªãŒã ããã®ä»ã®ä»»æã®ãã¥ãŒãã£ãŒãäœæã§ããŸãã
Futureãã¬ã€ããšåæ§ã«ã Streamãã¬ã€ãã¯å€æ°ã®ã³ã³ãããŒã¿ãŒãæäŸããŸãã futureã®ãããªã³ã³ãããŒã¿ãŒïŒäŸïŒ then ïŒã«å ããŠã foldãªã©ã®ã¹ããªãŒã åºæã®ã³ã³ãããŒã¿ãŒããµããŒããããŠããŸãã
Stream
ç¹æ§ã®äŸ
å
ç©ã®äœ¿çšäŸã«ã€ããŠã¯ããã®ãã¥ãŒããªã¢ã«ã®æåã§æ€èšããŸãããæ¬¡ã«ã åä¿¡ã¡ãœããã®å®è£
ã䜿çšããŠã¹ããªãŒã ã䜿çšããäŸãèŠãŠã¿ãŸãããã æ¥ç¶ãåãå
¥ãããã®åçŽãªãµãŒããŒã¯ããHelloïŒããšããåèªãæžã蟌ã¿ãŸãã ãœã±ãããéããŸãïŒ
extern crate futures; extern crate tokio_core; use futures::stream::Stream; use tokio_core::reactor::Core; use tokio_core::net::TcpListener; fn main() { let mut core = Core::new().unwrap(); let address = "127.0.0.1:8080".parse().unwrap(); let listener = TcpListener::bind(&address, &core.handle()).unwrap(); let addr = listener.local_addr().unwrap(); println!("Listening for connections on {}", addr); let clients = listener.incoming(); let welcomes = clients.and_then(|(socket, _peer_addr)| { tokio_core::io::write_all(socket, b"Hello!\n") }); let server = welcomes.for_each(|(_socket, _welcome)| { Ok(()) }); core.run(server).unwrap(); }
åãšåãããã«ãè¡ãèŠãŠã¿ãŸãããã
let mut core = Core::new().unwrap(); let address = "127.0.0.1:8080".parse().unwrap(); let listener = TcpListener::bind(&address, &core.handle()).unwrap();
ããã§ã¯ã LoopHandleã§TcpListener :: bindã¡ãœãããåŒã³åºããŠããœã±ãããåãå
¥ããTCPãªã¹ããŒãäœæããããšã«ãããã€ãã³ãã«ãŒããåæåããŸããã
次ã«ã次ã®ã³ãŒããèŠãŠãã ããã
let server = listener.and_then(|listener| {
ããã§ã¯ã TcpStream::connect
ãããªTcpListener::bind
ãTcpListener::bind
è¿ãTcpListener
ããããå°æ¥ããããèšç®ããããšãTcpListener
ãŸãã æ¬¡ã«ã Futureã® and_thenã¡ãœããã䜿çšããŠãTCPãªã¹ããŒã䜿çšå¯èœã«ãªã£ããšãã«äœãèµ·ãããã倿ããŸãã
TCPãªã¹ããŒãååŸãããã®ç¶æ
ã倿ã§ããŸãã
let addr = listener.local_addr().unwrap(); println!("Listening for connections on {}", addr);
local_addrã¡ãœãããåŒã³åºã㊠ããªã¹ããŒãé¢é£ä»ããããŠããã¢ãã¬ã¹ãåºåããŸãã ãã®æç¹ãããã¯ã©ã€ã¢ã³ããæ¥ç¶ã§ããããã«ããŒããæ£åžžã«æ¥ç¶ãããŸãã
次ã«ã StreamãäœæããŸã ã
let clients = listener.incoming();
ããã§ã çä¿¡ã¡ãœããã¯TcpListenerãšSocketAddrã® Streamãã¢ãè¿ããŸãã ããã¯ãæšæºã©ã€ãã©ãªã®TcpListenerããã³acceptã¡ãœããã«äŒŒãŠããŸããããã®å Žåã®ã¿ããã¹ãŠã®ã€ãã³ããã¹ããªãŒã ãšããŠåä¿¡ãããœã±ãããæåã§åãå
¥ããªãå¯èœæ§ãé«ããªããŸãã
clients
ã¹ã¬ããã¯ãåžžã«ãœã±ãããçæãclients
ã ããã¯ãµãŒããŒã®åäœãåæ ããŸã-ã¯ã©ã€ã¢ã³ããã«ãŒãã«å
¥ããŠçŽæ¥
ããããåŠçã®ããã«ã·ã¹ãã ã®æ®ãã«æž¡ããŸãã
ã¯ã©ã€ã¢ã³ãæ¥ç¶ã®ã¹ããªãŒã ãã§ããã®ã§ãæšæºã®Streamç¹æ§ã䜿çšããŠæäœã§ããŸãã
let welcomes = clients.and_then(|(socket, _peer_addr)| { tokio_core::io::write_all(socket, b"Hello!\n") });
ããã§ã¯ã Streamã¿ã€ãã®and_thenã¡ãœããã䜿çšããŠã ã¹ããªãŒã ã®åèŠçŽ ã«å¯ŸããŠã¢ã¯ã·ã§ã³ãå®è¡ããŸãã ãã®å Žåãã¹ããªãŒã ã®åèŠçŽ ïŒ TcpStream
ïŒã®èšç®ã®ãã§ãŒã³ã圢æããŸãã 以åã«write_allã¡ãœãããèŠãŸãããããã¯ãéä¿¡ãããããŒã¿ãããã¡ãéä¿¡ããããœã±ããã«æžã蟌ã¿ãŸãã
ãã®ãããã¯ã¯ã welcomes
äžé£ã®æåãHelloïŒããæžã蟌ãŸãããœã±ããã®ã¹ããªãŒã ã§ããããšwelcomes
æå³ããŸãã ãã®ãã¥ãŒããªã¢ã«ã®äžéšãšããŠãæ¥ç¶ãçµäºããããã for_eachã¡ãœããã䜿çšããŠã welcomes
ã¹ããªãŒã å
šäœwelcomes
å°æ¥ã«å€æããŸãã
welcomes.for_each(|(_socket, _welcome)| { Ok(()) })
ããã§ãåã®æªæ¥ã®çµæwrite_allãååŸã ãããããç Žæ£ããŠããœã±ãããéããããããã«ããŸãã
ãã®ãµãŒããŒã®éèŠãªå¶éã¯ãäžŠåæ§ã®æ¬ åŠã§ããããšã«æ³šæããŠãã ããã ã¹ããªãŒã ã¯ããŒã¿ã®æŽç¶ãšããåŠçã§ããããã®å ŽåããœãŒã¹ã¹ããªãŒã ã®é åºã¯ãœã±ãããåä¿¡ãããé åºã§ããã and_thenããã³for_eachã¡ãœããã¯ãã®é åºãç¶æããŸãã ãããã£ãŠããã§ãŒã³ã¯ãåãœã±ãããã¹ããªãŒã ããååŸãããããã«é¢é£ãããã¹ãŠã®æäœã次ã®ãœã±ããã«ç§»åããåã«åŠçããããšãã«å¹æãäœæããŸãã
代ããã«ããã¹ãŠã®ã¯ã©ã€ã¢ã³ãã䞊è¡ããŠç®¡çããå Žåã spawnã¡ãœããã䜿çšã§ããŸãã
let clients = listener.incoming(); let welcomes = clients.map(|(socket, _peer_addr)| { tokio_core::io::write_all(socket, b"hello!\n") }); let handle = core.handle(); let server = welcomes.for_each(|future| { handle.spawn(future.then(|_| Ok(()))); Ok(()) });
and_thenã¡ãœããã®ä»£ããã«ãã¯ã©ã€ã¢ã³ãã¹ããªãŒã ãfuturesã¹ããªãŒã ã«å€æããmapã¡ãœããã䜿çšãããŸãã æ¬¡ã«ã spawnã¡ãœããã䜿çšããŠfor_eachã«æž¡ãããã¯ããŒãžã£ãŒã倿ŽããŸã ãããã«ãããã€ãã³ãã«ãŒãã§futureã䞊åã«å®è¡ã§ããŸãã spawnã¯type ()
ãæã€item / errorãæã€futureãå¿
èŠãšããããšã«æ³šæããŠãã ããã
å
ç©ãšã¹ã¬ããã®å
·äœçãªå®è£
ãã®æ®µéã§ã¯ã Future
ããã³Stream
ã¿ã€ãããããã®å®è£
æ¹æ³ãããã³ããããçµã¿åãããæ¹æ³ã«ã€ããŠæç¢ºã«çè§£ããŠããŸãã ããããããããã¹ãŠã®æªæ¥ã¯ã©ãããæ¥ãã®ã§ããããïŒ
å
ç©ãšã¹ã¬ããã®ããã€ãã®ç¹å®ã®å®è£
ãèŠãŠãã ããã
ãŸããå©çšå¯èœãªå°æ¥ã®äŸ¡å€ã¯ãã¹ãŠãæºåå®äºãç¶æ
ã§ãã ããã«ã¯ã done ã failedãããã³finishedã®æ©èœã§ååã§ãã done颿°ã¯Result<T,E>
ãåãåãResult<T,E>
Future<Item=T, Error=E>
ãè¿ããŸãã 倱æãfinished
æ©èœãšfinished
æ©èœã«ã€ããŠã¯ ã T
ãŸãã¯E
ãæå®ããå¥ã®é¢é£ããã¿ã€ãããã³ãã¬ãŒãïŒã¯ã€ã«ãã«ãŒãïŒã®ãŸãŸã«ããããšãã§ããŸãã
ã¹ã¬ããã®å Žåãã宿ãããã¹ããªãŒã å€ã®åçã®æŠå¿µã¯iter颿°ã§ãããçµæã®ã€ãã¬ãŒã¿ã®èŠçŽ ãè¿ãã¹ããªãŒã ãäœæããŸãã å€ããæºåå®äºãç¶æ
ã«ãªãç¶æ³ã§ã¯ã Future
ããã³Stream
å€ãã®äžè¬çãªå®è£
ããããŸããæåã®å®è£
ã¯oneshot颿°ã§ãã
extern crate futures; use std::thread; use futures::Future; fn expensive_computation() -> u32 {
, oneshot , , , mpsc::channel . tx
("transmitter") Complete oneshot
, future . Complete::complete .
, rx
("receiver"), Oneshot , Future . Item
T
, Oneshot
. Error
Canceled
, , Complete .
future ( ) . Send
. , , , future , .
Stream channel . , , , Stream
, .
Sender : , , future, , , . , .
futures
futures â Future . Iterator , .
:
-
, , - :
fn foo() -> Box<Future<Item = u32, Error = io::Error>> {
. future, future , .
, boxed BoxFuture
, Box<Future + Send>
:
fn foo() -> BoxFuture<u32, u32> { finished(1).boxed() }
, future . Box
, future . , , , future . , , future (. , , ), Box
.
Box
, future .
äŸïŒ
struct MyFuture { inner: Oneshot<i32>, } fn foo() -> MyFuture { let (tx, rx) = oneshot();
MyFuture
Future
. future Oneshot<i32>
, future .
, , Box
- . MyFuture
, .
, . , futures .
â :
fn add_10<F>(f: F) -> Map<F, fn(i32) -> i32> where F: Future<Item = i32>, { fn do_map(i: i32) -> i32 { i + 10 } f.map(do_map) }
, . map map , future , map
.
future, Box
, .
. - . ( fn(i32) -> i32
), . , .
impl Trait
Rust, impl Trait , future.
äŸïŒ
fn add_10<F>(f: F) -> impl Future<Item = i32, Error = F::Error> where F: Future<Item = i32>, { f.map(|i| i + 10) }
, â ", Future
" . future .
: Box
, , future , Box
.
, impl Trait Rust. , , , futures, . -, Box
impl Trait
.
Task
Future
, futures, , . , poll
, , poll
NotReady
, , ? , poll
?
Task .
Task , futures. future , . ", !" future, . Task , " " , future .
future . (poll), , future. spawn , puPool::spawn Handle::spawn . spawn poll
.
Task
futures
: Task
, Future
. , futures
. Task
, , .
, future . , futures , , , .
Futures 'static
, futures:
future , future, ;
- , (
Arc / Rc
) , , ( Arc<Mutex>
), .
, , .
Task
Future
, Task
, poll
, . API Task
Task
. Task
:
task_local!
, thread_local!
. , , Task
, , Task
;
- TaskRc , . ,
Rc
.
, , , .
!
"" â - "", . "" . â .
: "future" ?