Golang实现websocket服务


需要到三方库:gorilla/websocket
采用Hub&Client结构管理websocket客户端。

Hub(链接管理中心)逻辑

package ws

type Hub struct {
    Clients    map[*Client]bool
    Register   chan *Client
    Unregister chan *Client
    Broadcast  chan []byte
}

func NewHub() *Hub {
    return &Hub{
        Clients:    make(map[*Client]bool),
        Register:   make(chan *Client),
        Unregister: make(chan *Client),
        Broadcast:  make(chan []byte),
    }
}

func (h *Hub) Run() {
    for {
        select {
        case c := <-h.Register:
            h.Clients[c] = true

        case c := <-h.Unregister:
            if _, ok := h.Clients[c]; ok {
                delete(h.Clients, c)
                close(c.Send)
            }

        case msg := <-h.Broadcast:
            for c := range h.Clients {
                select {
                case c.Send <- msg:
                default:
                    delete(h.Clients, c)
                    close(c.Send)
                }
            }
        }
    }
}

Client客户端读写协程

package ws

import (
    "time"

    "github.com/gorilla/websocket"
)

const (
    writeWait  = 10 * time.Second
    pongWait   = 60 * time.Second
    pingPeriod = (pongWait * 9) / 10
)

type Client struct {
    Conn *websocket.Conn
    Send chan []byte
}

func (c *Client) readPump(hub *Hub) {
    defer func() {
        hub.Unregister <- c
        c.Conn.Close()
    }()

    c.Conn.SetReadDeadline(time.Now().Add(pongWait))
    c.Conn.SetPongHandler(func(string) error {
        c.Conn.SetReadDeadline(time.Now().Add(pongWait))
        return nil
    })

    for {
        _, msg, err := c.Conn.ReadMessage()
        if err != nil {
            break
        }
        // 收到消息后广播
        hub.Broadcast <- msg
    }
}

func (c *Client) writePump() {
    ticker := time.NewTicker(pingPeriod)
    defer func() {
        ticker.Stop()
        c.Conn.Close()
    }()

    for {
        select {
        case msg, ok := <-c.Send:
            c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
            if !ok {
                c.Conn.WriteMessage(websocket.CloseMessage, []byte{})
                return
            }
            if err := c.Conn.WriteMessage(websocket.TextMessage, msg); err != nil {
                return
            }

        case <-ticker.C:
            c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
            if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
                return
            }
        }
    }
}

websocket handler

package ws

import (
    "net/http"

    "github.com/gin-gonic/gin"
    "github.com/gorilla/websocket"
)

var upgrader = websocket.Upgrader{
    ReadBufferSize:  1024,
    WriteBufferSize: 1024,
    CheckOrigin: func(r *http.Request) bool {
        return true // 生产环境请校验 origin
    },
}

func ServeWS(hub *Hub) gin.HandlerFunc {
    return func(c *gin.Context) {

        // 示例:简单 token 鉴权(可删)
        // token := c.Query("token")
        // if token != "123" {
        //  c.AbortWithStatus(http.StatusUnauthorized)
        //  return
        // }

        conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
        if err != nil {
            return
        }

        client := &Client{
            Conn: conn,
            Send: make(chan []byte, 256),
        }

        hub.Register <- client

        go client.writePump()
        go client.readPump(hub)
    }
}

main入口函数

package main

import (
    "log"

    "ws-demo/ws"

    "github.com/gin-gonic/gin"
)

func main() {
    hub := ws.NewHub()
    go hub.Run()

    r := gin.Default()

    r.GET("/ws", ws.ServeWS(hub))

    log.Println("WebSocket server running at :8080")
    r.Run(":8080")
}

本文地址:https://www.blear.cn/article/golang-websocket

转载时请以链接形式注明出处

评论
受监管部门要求,个人网站不允许评论功能,评论已关闭,抱歉!