From 36296b6af49faacf221babefd4baecdd82da8781 Mon Sep 17 00:00:00 2001 From: R2m1liA <15258427350@163.com> Date: Wed, 17 Dec 2025 15:54:26 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E8=A7=A3=E5=86=B3json=E6=97=A0=E6=B3=95?= =?UTF-8?q?=E6=AD=A3=E5=B8=B8=E8=A7=A3=E6=9E=90=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/client/main.go | 7 +++---- interval/protocol/message.go | 7 ++++--- interval/server/api/handler/push.go | 20 +++++++++++--------- interval/server/ws/client.go | 9 +++++---- interval/server/ws/hub.go | 4 ++-- 5 files changed, 25 insertions(+), 22 deletions(-) diff --git a/cmd/client/main.go b/cmd/client/main.go index b31602a..fa74b91 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -57,13 +57,12 @@ func main() { func ReadBroadCastLoop(ctx context.Context, c *websocket.Conn) { for { - // var msg protocol.BroadcastMessage - var msg []byte + var msg protocol.BroadcastMessage if err := wsjson.Read(ctx, c, &msg); err != nil { - log.Println("read broadcast error:", err) + log.Println("Read broadcast error:", err) return } - log.Println("Received broadcast message:", string(msg)) + log.Printf("Received broadcast message: [%s] %s", msg.Topic, msg.Payload) } } diff --git a/interval/protocol/message.go b/interval/protocol/message.go index 61d0b6f..67c9fb9 100644 --- a/interval/protocol/message.go +++ b/interval/protocol/message.go @@ -12,9 +12,10 @@ type ControlMessage struct { } type BroadcastMessage struct { - Type MessageType `json:"type"` - Topic Topic `json:"topic"` - Payload json.RawMessage `json:"payload"` + Type MessageType `json:"type"` + Topic Topic `json:"topic"` + Payload json.RawMessage `json:"payload"` + Timestamp int64 `json:"timestamp"` } func (m ControlMessage) Validate() error { diff --git a/interval/server/api/handler/push.go b/interval/server/api/handler/push.go index 63f11a0..b4cf039 100644 --- a/interval/server/api/handler/push.go +++ b/interval/server/api/handler/push.go @@ -2,8 +2,8 @@ package handler import ( "encoding/json" - "log" "net/http" + "time" "git.jinshen.cn/remilia/push-server/interval/protocol" "git.jinshen.cn/remilia/push-server/interval/server/ws" @@ -11,7 +11,7 @@ import ( ) type PublishRequest struct { - Content string `json:"content"` + Payload json.RawMessage `json:"payload"` } func PushHandler(hub *ws.Hub) http.HandlerFunc { @@ -24,23 +24,25 @@ func PushHandler(hub *ws.Hub) http.HandlerFunc { } var req PublishRequest - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + dec := json.NewDecoder(r.Body) + dec.DisallowUnknownFields() + if err := dec.Decode(&req); err != nil { http.Error(w, "invalid request body", http.StatusBadRequest) return } - if req.Content == "" { + + if len(req.Payload) == 0 { http.Error(w, "content cannot be empty", http.StatusBadRequest) return } msg := protocol.BroadcastMessage{ - Type: protocol.MsgBroadcast, - Topic: topic, - Payload: json.RawMessage(req.Content), + Type: protocol.MsgBroadcast, + Topic: topic, + Payload: req.Payload, + Timestamp: time.Now().Unix(), } - log.Printf("Received push request for topic %s: %s", topic, req.Content) - if err := hub.BroadcastMessage(r.Context(), msg); err != nil { http.Error(w, "request cancelled", http.StatusRequestTimeout) return diff --git a/interval/server/ws/client.go b/interval/server/ws/client.go index 8280557..ced0f0b 100644 --- a/interval/server/ws/client.go +++ b/interval/server/ws/client.go @@ -14,7 +14,7 @@ import ( type Client struct { ID string Conn *websocket.Conn - SendChan chan []byte + SendChan chan protocol.BroadcastMessage Hub *Hub Ctx context.Context @@ -29,7 +29,7 @@ func NewClient(id string, conn *websocket.Conn, hub *Hub, parentCtx context.Cont return &Client{ ID: id, Conn: conn, - SendChan: make(chan []byte, 32), + SendChan: make(chan protocol.BroadcastMessage, 32), Hub: hub, Ctx: ctx, @@ -49,7 +49,7 @@ func (c *Client) ReadLoop() { if websocket.CloseStatus(err) == websocket.StatusNormalClosure { log.Println("WebSocket closed normally:", err) } else { - log.Println("WebSocket read error:", err) + log.Println("[Server Client.ReadLoop] WebSocket read error:", err) } return } @@ -76,9 +76,10 @@ func (c *Client) WriteLoop() { if !ok { return } + log.Printf("Sending message to client %s: %+v", c.ID, msg) err := wsjson.Write(c.Ctx, c.Conn, msg) if err != nil { - log.Println("WebSocket write error:", err) + log.Println("[Server Client.WriteLoop] WebSocket write error:", err) return } } diff --git a/interval/server/ws/hub.go b/interval/server/ws/hub.go index 257678d..897e888 100644 --- a/interval/server/ws/hub.go +++ b/interval/server/ws/hub.go @@ -159,8 +159,8 @@ func (h *Hub) onBroadcast(msg protocol.BroadcastMessage) { log.Printf("Receiving message for topic %s: %s", msg.Topic, string(msg.Payload)) for _, c := range h.clientsByTopic[msg.Topic] { select { - case c.SendChan <- msg.Payload: - log.Printf("Sending message to client %s: %s", c.ID, string(msg.Payload)) + case c.SendChan <- msg: + log.Printf("[%d] Sending message to client %s: [%s] %v", msg.Timestamp, c.ID, msg.Type, string(msg.Payload)) default: h.UnregisterClient(c) if c.Conn != nil {