package smile
import (
"errors"
"log"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
)
const (
// 允許等待的寫入時間
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
// Maximum message size allowed from peer.
maxMessageSize = 512
)
// 最大的連接ID,每次連接都加1 處理
var maxConnId int64
// 客戶端讀寫消息
type wsMessage struct {
// websocket.TextMessage 消息類型
messageType int
data []byte
}
// ws 的所有連接
// 用于廣播
var wsConnAll map[int64]*wsConnection
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
// 允許所有的CORS 跨域請求,正式環境可以關閉
CheckOrigin: func(r *http.Request) bool {
return true
},
}
// 客戶端連接
type wsConnection struct {
wsSocket *websocket.Conn // 底層websocket
inChan chan *wsMessage // 讀隊列
outChan chan *wsMessage // 寫隊列
mutex sync.Mutex // 避免重復關閉管道,加鎖處理
isClosed bool
closeChan chan byte // 關閉通知
id int64
}
func wsHandler(resp http.ResponseWriter, req *http.Request) {
// 應答客戶端告知升級連接為websocket
wsSocket, err := upgrader.Upgrade(resp, req, nil)
if err != nil {
log.Println("升級為websocket失敗", err.Error())
return
}
maxConnId++
// TODO 如果要控制連接數可以計算,wsConnAll長度
// 連接數保持一定數量,超過的部分不提供服務
wsConn := wsConnection{
wsSocket: wsSocket,
inChan: make(chan *wsMessage, 1000),
outChan: make(chan *wsMessage, 1000),
closeChan: make(chan byte),
isClosed: false,
id: maxConnId,
}
wsConnAll[maxConnId] = wsConn
log.Println("當前在線人數", len(wsConnAll))
// 處理器,發送定時信息,避免意外關閉
go wsConn.processLoop()
// 讀協程
go wsConn.wsReadLoop()
// 寫協程
go wsConn.wsWriteLoop()
}
// 處理隊列中的消息
func (wsConn *wsConnection) processLoop() {
// 處理消息隊列中的消息
// 獲取到消息隊列中的消息,處理完成后,發送消息給客戶端
for {
msg, err := wsConn.wsRead()
if err != nil {
log.Println("獲取消息出現錯誤", err.Error())
break
}
log.Println("接收到消息", string(msg.data))
// 修改以下內容把客戶端傳遞的消息傳遞給處理程序
err = wsConn.wsWrite(msg.messageType, msg.data)
if err != nil {
log.Println("發送消息給客戶端出現錯誤", err.Error())
break
}
}
}
// 處理消息隊列中的消息
func (wsConn *wsConnection) wsReadLoop() {
// 設置消息的最大長度
wsConn.wsSocket.SetReadLimit(maxMessageSize)
wsConn.wsSocket.SetReadDeadline(time.Now().Add(pongWait))
for {
// 讀一個message
msgType, data, err := wsConn.wsSocket.ReadMessage()
if err != nil {
websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure)
log.Println("消息讀取出現錯誤", err.Error())
wsConn.close()
return
}
req := wsMessage{
msgType,
data,
}
// 放入請求隊列,消息入棧
select {
case wsConn.inChan - req:
case -wsConn.closeChan:
return
}
}
}
// 發送消息給客戶端
func (wsConn *wsConnection) wsWriteLoop() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
}()
for {
select {
// 取一個應答
case msg := -wsConn.outChan:
// 寫給websocket
if err := wsConn.wsSocket.WriteMessage(msg.messageType, msg.data); err != nil {
log.Println("發送消息給客戶端發生錯誤", err.Error())
// 切斷服務
wsConn.close()
return
}
case -wsConn.closeChan:
// 獲取到關閉通知
return
case -ticker.C:
// 出現超時情況
wsConn.wsSocket.SetWriteDeadline(time.Now().Add(writeWait))
if err := wsConn.wsSocket.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
// 寫入消息到隊列中
func (wsConn *wsConnection) wsWrite(messageType int, data []byte) error {
select {
case wsConn.outChan - wsMessage{messageType, data}:
case -wsConn.closeChan:
return errors.New("連接已經關閉")
}
return nil
}
// 讀取消息隊列中的消息
func (wsConn *wsConnection) wsRead() (*wsMessage, error) {
select {
case msg := -wsConn.inChan:
// 獲取到消息隊列中的消息
return msg, nil
case -wsConn.closeChan:
}
return nil, errors.New("連接已經關閉")
}
// 關閉連接
func (wsConn *wsConnection) close() {
log.Println("關閉連接被調用了")
wsConn.wsSocket.Close()
wsConn.mutex.Lock()
defer wsConn.mutex.Unlock()
if wsConn.isClosed == false {
wsConn.isClosed = true
// 刪除這個連接的變量
delete(wsConnAll, wsConn.id)
close(wsConn.closeChan)
}
}
// 啟動程序
func StartWebsocket(addrPort string) {
wsConnAll = make(map[int64]*wsConnection)
http.HandleFunc("/ws", wsHandler)
http.ListenAndServe(addrPort, nil)
}