feat: Hub接收RESTful API的消息

This commit is contained in:
2025-12-16 17:15:59 +08:00
parent 18874711ea
commit b824dc3792
6 changed files with 74 additions and 7 deletions

View File

@ -10,16 +10,20 @@ import (
"time"
"git.jinshen.cn/remilia/push-server/interval/api"
"git.jinshen.cn/remilia/push-server/interval/hub"
"git.jinshen.cn/remilia/push-server/interval/server"
)
func main() {
_, serverCancel := context.WithCancel(context.Background())
serverCtx, serverCancel := context.WithCancel(context.Background())
defer func() {
serverCancel()
}()
httpServer := server.NewHTTPServer(":8080", api.NewRouter())
h := hub.NewHub()
go h.Run(serverCtx)
httpServer := server.NewHTTPServer(":8080", api.NewRouter(h))
go func() {
log.Println("Starting HTTP server on :8080")

View File

@ -0,0 +1,5 @@
package dto
type PublishRequest struct {
Content string `json:"content"`
}

View File

@ -0,0 +1,47 @@
package handler
import (
"encoding/json"
"net/http"
"time"
"git.jinshen.cn/remilia/push-server/interval/api/dto"
"git.jinshen.cn/remilia/push-server/interval/hub"
"git.jinshen.cn/remilia/push-server/interval/model"
"github.com/go-chi/chi/v5"
)
func PushHandler(hub *hub.Hub) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
topicStr := chi.URLParam(r, "topic")
topic := model.Topic(topicStr)
if !topic.Valid() {
http.Error(w, "invalid topic", http.StatusBadRequest)
return
}
var req dto.PublishRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid request body", http.StatusBadRequest)
return
}
if req.Content == "" {
http.Error(w, "content cannot be empty", http.StatusBadRequest)
return
}
msg := model.Message{
Topic: topic,
Content: []byte(req.Content),
Timestamp: time.Now().Unix(),
}
if err := hub.BroadcastMessage(r.Context(), msg); err != nil {
http.Error(w, "request cancelled", http.StatusRequestTimeout)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusAccepted)
}
}

View File

@ -4,13 +4,15 @@ import (
"net/http"
"git.jinshen.cn/remilia/push-server/interval/api/handler"
"git.jinshen.cn/remilia/push-server/interval/hub"
"github.com/go-chi/chi/v5"
)
func NewRouter() http.Handler {
func NewRouter(h *hub.Hub) http.Handler {
r := chi.NewRouter()
r.Post("/health", handler.Health)
r.Post("/push/{topic}", handler.PushHandler(h))
return r
}

View File

@ -65,11 +65,18 @@ func (h *Hub) Unsubscribe(sub model.Subscription) {
h.unsubscribe <- sub
}
func (h *Hub) BroadcastMessage(message model.Message) {
h.broadcast <- message
func (h *Hub) BroadcastMessage(ctx context.Context, msg model.Message) error {
select {
case h.broadcast <- msg:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (h *Hub) Run(ctx context.Context) {
log.Println("Hub is running")
for {
select {
case c := <-h.register:
@ -155,6 +162,7 @@ func (h *Hub) onBroadcast(msg model.Message) {
log.Printf("Broadcast failed: invalid topic")
return
}
log.Printf("Receiving message for topic %s: %s", msg.Topic, string(msg.Content))
for _, c := range h.clientsByTopic[msg.Topic] {
select {
case c.SendChan <- msg.Content:

View File

@ -3,4 +3,5 @@ package model
type Message struct {
Topic Topic
Content []byte
Timestamp int64
}