(1)業務復雜度介紹
開門見山,假設一個直播間同時500W人在線,那么1秒鐘1000條彈幕,那么彈幕系統的推送頻率就是: 500W * 1000條/秒=50億條/秒
,想想B站2019跨年晚會那次彈幕系統得是多么的NB,況且一個大型網站不可能只有一個直播間!

使用Go做WebSocket開發無非就是三種情況:
- 使用Go原生自帶的庫,也就是
golang.org/x/net
,但是這個官方庫真是出了奇Bug多
- 使用GitHub大佬
gorilla/websocket
庫,可以結合到某些Web開發框架,比如Gin、iris等,只要使用的框架式基于 golang.org/net
的,那么這個庫就可以與這個框架結合
- 手擼一個WebSocket框架
根據估算結果,彈幕推送量很大的時候,Linux內核將會出現瓶頸,因為Linux內核發送TCP包的時候極限包發送頻率是100W。因此可以將同一秒內的彈幕消息合并為1條推送,減少網絡小數據包的發送,從而降低推送頻率。
彈幕系統需要維護在線的用戶長連接來實現定向推送到在線的用戶,通常是使用Hash字典結構,通常推送消息就是遍歷在線用的Hash字典。在彈幕推送期間用戶在不斷的上下線,為了維護上線用戶,那么就得不斷的修改Hash字典,不斷地進行鎖操作,用戶量過大導致鎖瓶頸。因此可以將整個Hash結構拆分為多個Hash結構,分別對多個Hash結構加不同的鎖,并且使用讀寫鎖替代互斥鎖。
通常服務器與客戶端交互使用JSON結構,那么需要不斷的編碼解碼JSON數據,這將會導致CPU瓶頸。將消息先進行合并,然后進行編碼,最后輪詢Hash結構進行推送。
以上是單體架構存在的問題,為了支持更多的用戶負載,通常彈幕系統采用分布式架構,進行彈性擴容縮容。
(2)推送還是拉取?
如果是客戶端拉取服務器端數據,那么將會存在以下幾個問題:
- 直播在線人數多就意味著消息數據更新頻率高,拉取消息意味著彈幕無法滿足時效性
- 如果很多客戶端同時拉取,那么服務器端的壓力無異于DDOS
- 一個彈幕系統應該是通用的,因此對于直播間彈幕較少的場景,意味著消息數據拉取請求都是無效的
因此我們考慮推送模式:當數據發生更新的時候服務器端主動推送到客戶端,這樣可以有效減少客戶端的請求次數。如果需要實現消息推送,那么就意味著服務器端維護大量的長連接。
(3)為什么使用WebSocket?
實現彈幕消息的實時更新一定是使用Socket的方式,那么為啥要使用WebSocket呢?現在大部分直播應用的開發都是跨平臺的,然而跨平臺的開發框架本質就是Web開發,那么一定離不開WebSocket,而且一部分用戶會選擇在Web端看視頻,比如Bilibili,現如今也有一些桌面應用是用Electron等跨平臺框架開發的,比如Lark飛書等,因此實現消息推送的最佳方案就是使用WebSocket。
使用WebSocket可以輕松的維持服務器端長連接,其次WebSocket是架構在HTTP協議之上的,并且也可以使用HTTPS方式,因此WebSocket是可靠傳輸,并且不需要開發者關注底層細節。

為啥要使用Go搞WebSocket呢?首先說到WebSocket你可能會想到Node.js,但是Node.js是單線程模型,如果實現高并發,不得不創建多個Node.js進程,但是這又不容易服務端遍歷整個連接集合;如果使用Java就會顯得比較笨重,Java項目的部署,編寫Dockerfile都不如Go的目標二進制更加簡潔,并且Go協程很容易實現高并發,上一章說到Go語言目前也有成熟的WebSocket輪子。
(4)服務端基本Demo
首先搭建好一個框架:
package main
import (
"fmt"
"net/http"
)
func main() {
fmt.Println("Listen localhost:8080")
// 注冊一個用于WebSocket的路由,實際業務中不可能只有一個路由
http.HandleFunc("/messages", messageHandler)
// 監聽8080端口,沒有實現服務異常處理器,因此第二個參數是nil
http.ListenAndServe("localhost:8080", nil)
}
func messageHandler(response http.ResponseWriter, request *http.Request) {
// TODO: 實現消息處理
response.Write([]byte("HelloWorld"))
}
然后完善messageHandler函數:
func messageHandler(response http.ResponseWriter, request *http.Request) {
var upgrader = websocket.Upgrader{
// 允許跨域
CheckOrigin: func(resquest *http.Request) bool {
return true
},
}
// 建立連接
conn, err := upgrader.Upgrade(response, request, nil)
if err != nil {
return
}
// 收發消息
for {
// 讀取消息
_, bytes, err := conn.ReadMessage()
if err != nil {
_ = conn.Close()
}
// 寫入消息
err = conn.WriteMessage(websocket.TextMessage, bytes)
if err != nil {
_ = conn.Close()
}
}
}
現在基本上實現了WebSocket功能,但是websocket的原生API不是線程安全的(Close方法是線程安全的,并且是可重入的),并且其他模塊無法復用業務邏輯,因此進行封裝:
- 封裝Connection對象描述一個WebSocket連接
- 為Connection對象提供線程安全的關閉、接收、發送API
// main.go
package main
import (
"bluemiaomiao.github.io/websocket-go/service"
"fmt"
"net/http"
"github.com/gorilla/websocket"
)
func main() {
fmt.Println("Listen localhost:8080")
http.HandleFunc("/messages", messageHandler)
_ = http.ListenAndServe("localhost:8080", nil)
}
func messageHandler(response http.ResponseWriter, request *http.Request) {
var upgrader = websocket.Upgrader{
// 允許跨域
CheckOrigin: func(resquest *http.Request) bool {
return true
},
}
// 建立連接
conn, err := upgrader.Upgrade(response, request, nil)
wsConn, err := service.Create(conn)
if err != nil {
return
}
// 收發消息
for {
// 讀取消息
msg, err := wsConn.ReadOne()
if err != nil {
wsConn.Close()
}
// 寫入消息
err = wsConn.WriteOne(msg)
if err != nil {
_ = conn.Close()
}
}
}
// service/messsage_service.go
package service
import (
"errors"
"github.com/gorilla/websocket"
"sync"
)
// 封裝的連接對象
//
// 由于websocket的Close()方法是可重入的,所以可以多次調用,但是關閉Channel的close()
// 方法不是可重入的,因此通過isClosed進行判斷
// isClosed可能發生資源競爭,因此通過互斥鎖避免
// 關閉websocket連接后,也要自動關閉輸入輸出消息流,因此通過signalCloseLoopChan實現
type Connection struct {
conn *websocket.Conn // 具體的連接對象
inputStream chan []byte // 輸入流,使用Channel模擬
outputStream chan []byte // 輸出流,使用chaneel模擬
signalCloseLoopChan chan byte // 關閉信號
isClosed bool // 是否調用過close()方法
lock sync.Mutex // 簡單的鎖
}
// 用于初始化一個連接對象
func Create(conn *websocket.Conn) (connection *Connection, err error) {
connection = Connection{
conn: conn,
inputStream: make(chan []byte, 1000),
outputStream: make(chan []byte, 1000),
signalCloseLoopChan: make(chan byte, 1),
isClosed: false,
}
// 啟動讀寫循環
go connection.readLoop()
go connection.writeLoop()
return
}
// 讀取一條消息
func (c *Connection) ReadOne() (msg []byte, err error) {
select {
case msg = -(*c).inputStream:
case -(*c).signalCloseLoopChan:
err = errors.New("connection is closed")
}
return
}
// 寫入一條消息
func (c *Connection) WriteOne(msg []byte) (err error) {
select {
case (*c).outputStream - msg:
case -(*c).signalCloseLoopChan:
err = errors.New("connection is closed")
}
return
}
// 關閉連接對象
func (c *Connection) Close() {
_ = (*c).conn.Close()
(*c).lock.Lock()
if !(*c).isClosed {
close((*c).signalCloseLoopChan)
}
(*c).lock.Unlock()
}
// 讀取循環
func (c *Connection) readLoop() {
// 不停的讀取長連接中的消息,只要存在消息就將其放到隊列中
for {
_, bytes, err := (*c).conn.ReadMessage()
if err != nil {
(*c).Close()
}
select {
case -(*c).signalCloseLoopChan:
(*c).Close()
case (*c).inputStream - bytes:
}
}
}
// 寫入循環
func (c *Connection) writeLoop() {
// 只要隊列中存在消息,就將其寫入
var data []byte
for {
select {
case data = -(*c).outputStream:
case -(*c).signalCloseLoopChan:
(*c).Close()
}
err := (*c).conn.WriteMessage(websocket.TextMessage, data)
if err != nil {
_ = (*c).conn.Close()
}
}
}
至此,你已經學會了如何使用Go構建WebSocket服務。
到此這篇關于使用Go基于WebSocket構建千萬級視頻直播彈幕系統的代碼詳解的文章就介紹到這了,更多相關go WebSocket視頻直播彈幕內容請搜索腳本之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持腳本之家!
您可能感興趣的文章:- golang 實現tcp server端和client端,并計算RTT時間操作
- golang websocket 服務端的實現
- golang socket斷點續傳大文件的實現方法
- golang基于websocket實現的簡易聊天室程序
- golang網絡socket粘包問題的解決方法
- Golang 實現Socket服務端和客戶端使用TCP協議通訊