From b824dc3792dea3c0f908703f0ee94d3f48462032 Mon Sep 17 00:00:00 2001 From: R2m1liA <15258427350@163.com> Date: Tue, 16 Dec 2025 17:15:59 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20Hub=E6=8E=A5=E6=94=B6RESTful=20API?= =?UTF-8?q?=E7=9A=84=E6=B6=88=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/server/main.go | 8 ++++-- interval/api/dto/publish.go | 5 ++++ interval/api/handler/push.go | 47 ++++++++++++++++++++++++++++++++++++ interval/api/router.go | 4 ++- interval/hub/hub.go | 12 +++++++-- interval/model/message.go | 5 ++-- 6 files changed, 74 insertions(+), 7 deletions(-) create mode 100644 interval/api/dto/publish.go create mode 100644 interval/api/handler/push.go diff --git a/cmd/server/main.go b/cmd/server/main.go index f73a33f..3682192 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -10,16 +10,20 @@ import ( "time" "git.jinshen.cn/remilia/push-server/interval/api" + "git.jinshen.cn/remilia/push-server/interval/hub" "git.jinshen.cn/remilia/push-server/interval/server" ) func main() { - _, serverCancel := context.WithCancel(context.Background()) + serverCtx, serverCancel := context.WithCancel(context.Background()) defer func() { serverCancel() }() - httpServer := server.NewHTTPServer(":8080", api.NewRouter()) + h := hub.NewHub() + go h.Run(serverCtx) + + httpServer := server.NewHTTPServer(":8080", api.NewRouter(h)) go func() { log.Println("Starting HTTP server on :8080") diff --git a/interval/api/dto/publish.go b/interval/api/dto/publish.go new file mode 100644 index 0000000..54124cc --- /dev/null +++ b/interval/api/dto/publish.go @@ -0,0 +1,5 @@ +package dto + +type PublishRequest struct { + Content string `json:"content"` +} diff --git a/interval/api/handler/push.go b/interval/api/handler/push.go new file mode 100644 index 0000000..5140847 --- /dev/null +++ b/interval/api/handler/push.go @@ -0,0 +1,47 @@ +package handler + +import ( + "encoding/json" + "net/http" + "time" + + "git.jinshen.cn/remilia/push-server/interval/api/dto" + "git.jinshen.cn/remilia/push-server/interval/hub" + "git.jinshen.cn/remilia/push-server/interval/model" + "github.com/go-chi/chi/v5" +) + +func PushHandler(hub *hub.Hub) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + topicStr := chi.URLParam(r, "topic") + topic := model.Topic(topicStr) + if !topic.Valid() { + http.Error(w, "invalid topic", http.StatusBadRequest) + return + } + + var req dto.PublishRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "invalid request body", http.StatusBadRequest) + return + } + if req.Content == "" { + http.Error(w, "content cannot be empty", http.StatusBadRequest) + return + } + + msg := model.Message{ + Topic: topic, + Content: []byte(req.Content), + Timestamp: time.Now().Unix(), + } + + if err := hub.BroadcastMessage(r.Context(), msg); err != nil { + http.Error(w, "request cancelled", http.StatusRequestTimeout) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusAccepted) + } +} diff --git a/interval/api/router.go b/interval/api/router.go index d0ba6bc..5f200ea 100644 --- a/interval/api/router.go +++ b/interval/api/router.go @@ -4,13 +4,15 @@ import ( "net/http" "git.jinshen.cn/remilia/push-server/interval/api/handler" + "git.jinshen.cn/remilia/push-server/interval/hub" "github.com/go-chi/chi/v5" ) -func NewRouter() http.Handler { +func NewRouter(h *hub.Hub) http.Handler { r := chi.NewRouter() r.Post("/health", handler.Health) + r.Post("/push/{topic}", handler.PushHandler(h)) return r } diff --git a/interval/hub/hub.go b/interval/hub/hub.go index 4376a28..6b77443 100644 --- a/interval/hub/hub.go +++ b/interval/hub/hub.go @@ -65,11 +65,18 @@ func (h *Hub) Unsubscribe(sub model.Subscription) { h.unsubscribe <- sub } -func (h *Hub) BroadcastMessage(message model.Message) { - h.broadcast <- message +func (h *Hub) BroadcastMessage(ctx context.Context, msg model.Message) error { + select { + case h.broadcast <- msg: + return nil + case <-ctx.Done(): + return ctx.Err() + } + } func (h *Hub) Run(ctx context.Context) { + log.Println("Hub is running") for { select { case c := <-h.register: @@ -155,6 +162,7 @@ func (h *Hub) onBroadcast(msg model.Message) { log.Printf("Broadcast failed: invalid topic") return } + log.Printf("Receiving message for topic %s: %s", msg.Topic, string(msg.Content)) for _, c := range h.clientsByTopic[msg.Topic] { select { case c.SendChan <- msg.Content: diff --git a/interval/model/message.go b/interval/model/message.go index cf4c31d..b9ab29b 100644 --- a/interval/model/message.go +++ b/interval/model/message.go @@ -1,6 +1,7 @@ package model type Message struct { - Topic Topic - Content []byte + Topic Topic + Content []byte + Timestamp int64 }