fix: 解决json无法正常解析的问题
This commit is contained in:
@ -57,13 +57,12 @@ func main() {
|
|||||||
|
|
||||||
func ReadBroadCastLoop(ctx context.Context, c *websocket.Conn) {
|
func ReadBroadCastLoop(ctx context.Context, c *websocket.Conn) {
|
||||||
for {
|
for {
|
||||||
// var msg protocol.BroadcastMessage
|
var msg protocol.BroadcastMessage
|
||||||
var msg []byte
|
|
||||||
if err := wsjson.Read(ctx, c, &msg); err != nil {
|
if err := wsjson.Read(ctx, c, &msg); err != nil {
|
||||||
log.Println("read broadcast error:", err)
|
log.Println("Read broadcast error:", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Println("Received broadcast message:", string(msg))
|
log.Printf("Received broadcast message: [%s] %s", msg.Topic, msg.Payload)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -12,9 +12,10 @@ type ControlMessage struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type BroadcastMessage struct {
|
type BroadcastMessage struct {
|
||||||
Type MessageType `json:"type"`
|
Type MessageType `json:"type"`
|
||||||
Topic Topic `json:"topic"`
|
Topic Topic `json:"topic"`
|
||||||
Payload json.RawMessage `json:"payload"`
|
Payload json.RawMessage `json:"payload"`
|
||||||
|
Timestamp int64 `json:"timestamp"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m ControlMessage) Validate() error {
|
func (m ControlMessage) Validate() error {
|
||||||
|
|||||||
@ -2,8 +2,8 @@ package handler
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"log"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.jinshen.cn/remilia/push-server/interval/protocol"
|
"git.jinshen.cn/remilia/push-server/interval/protocol"
|
||||||
"git.jinshen.cn/remilia/push-server/interval/server/ws"
|
"git.jinshen.cn/remilia/push-server/interval/server/ws"
|
||||||
@ -11,7 +11,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type PublishRequest struct {
|
type PublishRequest struct {
|
||||||
Content string `json:"content"`
|
Payload json.RawMessage `json:"payload"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func PushHandler(hub *ws.Hub) http.HandlerFunc {
|
func PushHandler(hub *ws.Hub) http.HandlerFunc {
|
||||||
@ -24,23 +24,25 @@ func PushHandler(hub *ws.Hub) http.HandlerFunc {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var req PublishRequest
|
var req PublishRequest
|
||||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
dec := json.NewDecoder(r.Body)
|
||||||
|
dec.DisallowUnknownFields()
|
||||||
|
if err := dec.Decode(&req); err != nil {
|
||||||
http.Error(w, "invalid request body", http.StatusBadRequest)
|
http.Error(w, "invalid request body", http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if req.Content == "" {
|
|
||||||
|
if len(req.Payload) == 0 {
|
||||||
http.Error(w, "content cannot be empty", http.StatusBadRequest)
|
http.Error(w, "content cannot be empty", http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
msg := protocol.BroadcastMessage{
|
msg := protocol.BroadcastMessage{
|
||||||
Type: protocol.MsgBroadcast,
|
Type: protocol.MsgBroadcast,
|
||||||
Topic: topic,
|
Topic: topic,
|
||||||
Payload: json.RawMessage(req.Content),
|
Payload: req.Payload,
|
||||||
|
Timestamp: time.Now().Unix(),
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("Received push request for topic %s: %s", topic, req.Content)
|
|
||||||
|
|
||||||
if err := hub.BroadcastMessage(r.Context(), msg); err != nil {
|
if err := hub.BroadcastMessage(r.Context(), msg); err != nil {
|
||||||
http.Error(w, "request cancelled", http.StatusRequestTimeout)
|
http.Error(w, "request cancelled", http.StatusRequestTimeout)
|
||||||
return
|
return
|
||||||
|
|||||||
@ -14,7 +14,7 @@ import (
|
|||||||
type Client struct {
|
type Client struct {
|
||||||
ID string
|
ID string
|
||||||
Conn *websocket.Conn
|
Conn *websocket.Conn
|
||||||
SendChan chan []byte
|
SendChan chan protocol.BroadcastMessage
|
||||||
Hub *Hub
|
Hub *Hub
|
||||||
|
|
||||||
Ctx context.Context
|
Ctx context.Context
|
||||||
@ -29,7 +29,7 @@ func NewClient(id string, conn *websocket.Conn, hub *Hub, parentCtx context.Cont
|
|||||||
return &Client{
|
return &Client{
|
||||||
ID: id,
|
ID: id,
|
||||||
Conn: conn,
|
Conn: conn,
|
||||||
SendChan: make(chan []byte, 32),
|
SendChan: make(chan protocol.BroadcastMessage, 32),
|
||||||
Hub: hub,
|
Hub: hub,
|
||||||
|
|
||||||
Ctx: ctx,
|
Ctx: ctx,
|
||||||
@ -49,7 +49,7 @@ func (c *Client) ReadLoop() {
|
|||||||
if websocket.CloseStatus(err) == websocket.StatusNormalClosure {
|
if websocket.CloseStatus(err) == websocket.StatusNormalClosure {
|
||||||
log.Println("WebSocket closed normally:", err)
|
log.Println("WebSocket closed normally:", err)
|
||||||
} else {
|
} else {
|
||||||
log.Println("WebSocket read error:", err)
|
log.Println("[Server Client.ReadLoop] WebSocket read error:", err)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -76,9 +76,10 @@ func (c *Client) WriteLoop() {
|
|||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
log.Printf("Sending message to client %s: %+v", c.ID, msg)
|
||||||
err := wsjson.Write(c.Ctx, c.Conn, msg)
|
err := wsjson.Write(c.Ctx, c.Conn, msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("WebSocket write error:", err)
|
log.Println("[Server Client.WriteLoop] WebSocket write error:", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -159,8 +159,8 @@ func (h *Hub) onBroadcast(msg protocol.BroadcastMessage) {
|
|||||||
log.Printf("Receiving message for topic %s: %s", msg.Topic, string(msg.Payload))
|
log.Printf("Receiving message for topic %s: %s", msg.Topic, string(msg.Payload))
|
||||||
for _, c := range h.clientsByTopic[msg.Topic] {
|
for _, c := range h.clientsByTopic[msg.Topic] {
|
||||||
select {
|
select {
|
||||||
case c.SendChan <- msg.Payload:
|
case c.SendChan <- msg:
|
||||||
log.Printf("Sending message to client %s: %s", c.ID, string(msg.Payload))
|
log.Printf("[%d] Sending message to client %s: [%s] %v", msg.Timestamp, c.ID, msg.Type, string(msg.Payload))
|
||||||
default:
|
default:
|
||||||
h.UnregisterClient(c)
|
h.UnregisterClient(c)
|
||||||
if c.Conn != nil {
|
if c.Conn != nil {
|
||||||
|
|||||||
Reference in New Issue
Block a user