
! , Mail.Ru.
, WebSocket- Go.
WebSocket , Go — , , .
1.
, , .
Mail.Ru , . , . — — . (polling), — — .
, , — . Polling — 50 HTTP- , 60% 304, .
, , publisher-subscriber ( bus, message-broker event-channel), , , , — .
:
+-----------+ +-----------+ +-----------+
| | ◄-------+ | | ◄-------+ | |
| Storage | | API | HTTP | Browser |
| | +-------► | | +-------► | |
+-----------+ +-----------+ +-----------+
:
+-------------+ +---------+ WebSocket +-----------+
| Storage | | API * | +-----------► | Browser |
+-------------+ +---------+ (3) +-----------+
+ (2) ▲
| |
(1) ▼ +
+---------------------------------+
| Bus |
+---------------------------------+
, . API Storage ( ).
— . WebSocket- API, Storage. API Bus ( ; , ). Storage Bus (1), Bus — (2). API , , (3).
, API, WebSocket-. , , 3 . .
2. Idiomatic way
, , Go, .
net/http, . , WebSocket (, json-), . Channel, WebSocket-.
2.1. Channel struct
type Packet struct {
...
}
type Channel struct {
conn net.Conn
send chan Packet
}
func NewChannel(conn net.Conn) *Channel {
c := &Channel{
conn: conn,
send: make(chan Packet, N),
}
go c.reader()
go c.writer()
return c
}
. , Go 2 8 . , (3 ), 24 ( 4 ). , Channel, ch.send .
2.2. I/O
«» :
func (c *Channel) reader() {
buf := bufio.NewReader(c.conn)
for {
pkt, _ := readPacket(buf)
c.handle(pkt)
}
}
, ? , syscall’ , buf. . : .
, , . buf : 4 12 . «»:
func (c *Channel) writer() {
buf := bufio.NewWriter(c.conn)
for pkt := range c.send {
_ := writePacket(buf, pkt)
buf.Flush()
}
}
c.send . , , 4 12 3 .
2.3. HTTP
Channel , WebSocket-, . Idiomatic way, .
, WebSocket, , WebSocket HTTP, Upgrade. Upgrade- TCP- WebSocket-.
.
import (
"net/http"
"some/websocket"
)
http.HandleFunc("/v1/ws", func(w http.ResponseWriter, r *http.Request) {
conn, _ := websocket.Upgrade(r, w)
ch := NewChannel(conn)
})
, http.ResponseWriter bufio.Writer 4 , *http.Request bufio.Reader 4 .
WebSocket Upgrade- I/O TCP- responseWriter.Hijack().
Hint: go:linkname net/http net/http.putBufio{Reader,Writer}.
, 24 3 .
72 , !
3.
, , , . WebSocket — . . . ( ping/pong) .
.
, Channel.reader() Channel.writer() . I/O, 4 .
, , ?
3.1. netpoll
Channel.reader(), , conn.Read() bufio.Reader? runtime go «» . . , runtime go , «».
conn.Read(), , net.netFD.Read():
func (fd *netFD) Read(p []byte) (n int, err error) {
for {
n, err = syscall.Read(fd.sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN {
if err = fd.pd.waitRead(); err == nil {
continue
}
}
}
break
}
}
go . EAGAIN , , , , .
, read() . EAGAIN, runtime pollDesc.waitRead():
func (pd *pollDesc) waitRead() error {
return pd.wait('r')
}
func (pd *pollDesc) wait(mode int) error {
res := runtime_pollWait(pd.runtimeCtx, mode)
}
, , Linux netpoll epoll. ? , : .
github.com/golang/go issue netpoll.
3.2.
, netpoll Go. Channel.reader() , «» :
ch := NewChannel(conn)
poller.Start(conn, netpoll.EventRead, func() {
go Receive(ch)
})
func (ch *Channel) Receive() {
buf := bufio.NewReader(ch.conn)
pkt := readPacket(buf)
c.handle(pkt)
}
Channel.writer() — , :
func (ch *Channel) Send(p Packet) {
if c.noWriterYet() {
go ch.writer()
}
ch.send <- p
}
ch.send ( ) writer .
! 48 — I/O «» .
3.3.
— . race condition’ deadlock’, self-DDoS — , .
, - ping/pong , idle- (, , ), , , , N .
, , (, nginx) .
, , , 48 — , .
3.3.1 Goroutine pool
. :
package gpool
func New(size int) *Pool {
return &Pool{
work: make(chan func()),
sem: make(chan struct{}, size),
}
}
func (p *Pool) Schedule(task func()) error {
select {
case p.work <- task:
case p.sem <- struct{}{}:
go p.worker(task)
}
}
func (p *Pool) worker(task func()) {
defer func() { <-p.sem }
for {
task()
task = <-p.work
}
}
netpoll :
pool := gpool.New(128)
poller.Start(conn, netpoll.EventRead, func() {
pool.Schedule(func() {
Receive(ch)
})
})
, .
Send():
pool := gpool.New(128)
func (ch *Channel) Send(p Packet) {
if c.noWriterYet() {
pool.Schedule(ch.writer)
}
ch.send <- p
}
go ch.writer() . , N , N N + 1 N + 1 . Accept() Upgrade() DDoS.
3.4. Zero-copy upgrade
WebSocket. , WebSocket HTTP- Upgrade. :
GET /ws HTTP/1.1
Host: mail.ru
Connection: Upgrade
Sec-Websocket-Key: A3xNe7sEB9HixkmBhVrYaA==
Sec-Websocket-Version: 13
Upgrade: websocket
HTTP/1.1 101 Switching Protocols
Connection: Upgrade
Sec-Websocket-Accept: ksu0wXWG+YmkVx+KQR2agP0cQn4=
Upgrade: websocket
HTTP- , WebSocket. , , http.Request, , , , HTTP- net/http.
http.Request , , Header, . , , Cookie.
?
3.4.1. WebSocket
, upgrade net/http-. , ( ) , . , API WebSocket. , :
func ReadFrame(io.Reader) (Frame, error)
func WriteFrame(io.Writer, Frame) error
API, ( ):
func getReadBuf(io.Reader) *bufio.Reader
func putReadBuf(*bufio.Reader)
// readPacket must be called when data could be read from conn.
func readPacket(conn io.Reader) error {
buf := getReadBuf()
defer putReadBuf(buf)
buf.Reset(conn)
frame, _ := ReadFrame(buf)
parsePacket(frame.Payload)
}
, .
3.4.2. github.com/gobwas/ws
ws , . io.Reader io.Writer, , I/O.
upgrade- net/http, ws zero-copy upgrade — upgrade- WebSocket . ws.Upgrade() io.ReadWriter (net.Conn ) — . . net.Listen() ln.Accept() ws.Upgrade(). (, Cookie ).
upgrade-: net/http- net.Listen() zero-copy upgrade:
BenchmarkUpgradeHTTP 5156 ns/op 8576 B/op 9 allocs/op
BenchmarkUpgradeTCP 973 ns/op 0 B/op 0 allocs/op
ws zero-copy upgrade 24 — , I/O net/http.
3.5.
, .
- — .
: netpoll (epoll, kqueue); . - — .
: , ; . - netpoll .
: . net/http Upgrade WebSocket.
: zero-copy upgrade «» TCP-.
:
import (
"net"
"github.com/gobwas/ws"
)
ln, _ := net.Listen("tcp", ":8080")
for {
err := pool.ScheduleTimeout(time.Millisecond, func() {
conn := ln.Accept()
_ = ws.Upgrade(conn)
ch := NewChannel(conn)
poller.Start(conn, netpoll.EventRead, func() {
pool.Schedule(func() {
ch.Recevie()
})
})
})
if err != nil {
time.Sleep(time.Millisecond)
}
}
4.
Premature optimization is the root of all evil (or at least most of it) in programming. Donald Knuth
, . , (, CPU) , , . , , , .
!
5.