From b90fcc3f208fd76f433dee29828b9dd26f82f1e5 Mon Sep 17 00:00:00 2001
From: efim
Date: Sun, 12 Nov 2023 18:01:42 +0000
Subject: [PATCH] feat: stream room updates SSE endpoint
via subscription to redis with enriched channel
---
main.go | 3 +-
rooms/room.go | 9 ++--
rooms/rooms_manager.go | 78 ++++++++++++++++++++++++++++----
routes/room_page.go | 86 +++++++++++++++++++++++++-----------
routes/static/out.css | 23 +---------
routes/templates/room.gohtml | 52 +++++++++++-----------
6 files changed, 163 insertions(+), 88 deletions(-)
diff --git a/main.go b/main.go
index a10fdf8..cf5035e 100644
--- a/main.go
+++ b/main.go
@@ -29,11 +29,10 @@ func main() {
DB: 0,
})
- fmt.Printf("Server will start on port %d\n", port)
-
roomsM := rooms.RedisRM { Rdb: rdb, }
sessions := sessions.RedisSM{ Rdb: rdb, }
+ log.Printf("Server will start on port %d\n", port)
routes.RegisterRoutes(sessions, roomsM)
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
diff --git a/rooms/room.go b/rooms/room.go
index 5c9d61d..8777fde 100644
--- a/rooms/room.go
+++ b/rooms/room.go
@@ -35,10 +35,11 @@ func (r *Room)InitMaps() {
// if you are speaking - change nothing
// if nobody is speaking, set this person as a first speaker
func (r *Room) RaiseHand(p PersonId, gesture HandGesture) Room {
- if (r.CurrentSpeaker == p) {
- // if person already speaking, should first end speaking
- return *r
- }
+ // TODO This is temporary
+ // if (r.CurrentSpeaker == p) {
+ // // if person already speaking, should first end speaking
+ // return *r
+ // }
r.ParticipantHands[p] = gesture
if r.CurrentSpeaker == PersonId(0) {
r.CurrentSpeaker = p
diff --git a/rooms/rooms_manager.go b/rooms/rooms_manager.go
index ed1be8a..b5f881d 100644
--- a/rooms/rooms_manager.go
+++ b/rooms/rooms_manager.go
@@ -5,8 +5,8 @@ import (
"encoding/json"
"errors"
"fmt"
- "math/rand"
"log"
+ "math/rand"
"github.com/redis/go-redis/v9"
)
@@ -15,26 +15,25 @@ type PersonId int
func (p PersonId) MarshalBinary() ([]byte, error) {
bytes, err := json.Marshal(p)
- return bytes, err
+ return bytes, err
}
func (p *PersonId) UnmarshalBinary(data []byte) error {
err := json.Unmarshal(data, p)
- return err
+ return err
}
// TODO move to rooms i guess
func RandomPersonId() PersonId {
randInt := rand.Int()
if randInt == 0 {
- randInt = 1
+ randInt = 1
}
return PersonId(randInt)
}
-
type Person struct {
- Id PersonId
+ Id PersonId
Name string
PasswordHash string
}
@@ -48,20 +47,21 @@ func (r *Room) UnmarshalBinary(data []byte) error {
return json.Unmarshal(data, r)
}
-
// let's check whether it will be possible to save nested structs
type RoomManager interface {
Get(roomName string) (Room, bool, error)
Save(room Room) error
Update(ctx context.Context, roomName string, f func(fromRoom Room) (toRoom Room)) error
+ Subscribe(ctx context.Context, roomName string) <-chan Room
}
+
const roomRedisPrefix = "room"
+
func roomNameToRedisId(roomName string) string {
return fmt.Sprintf("%s:%s", roomRedisPrefix, roomName)
}
-
type RedisRM struct {
Rdb *redis.Client
}
@@ -80,11 +80,70 @@ func (redisRM RedisRM) Get(roomName string) (Room, bool, error) {
return readRoom, true, nil
}
+func (redisRM RedisRM) Subscribe(ctx context.Context, roomName string) <-chan Room {
+ // pubsub := redisRM.Rdb.Subscribe(ctx, roomNameToRedisId(roomName))
+ key := fmt.Sprintf( "__keyspace@0__:room:%s", roomName)
+ log.Printf("><> about to subscribe to %s\n", key)
+ pubsub := redisRM.Rdb.Subscribe(ctx, key)
+
+ redisMsgChan := pubsub.Channel()
+
+ roomChannel := make(chan Room)
+
+ go func() {
+ defer log.Printf(">>> stopping redis subscription to %s\n", roomName)
+ defer pubsub.Close() // this was my problem, cooool
+
+ var initialRoom Room
+ err := redisRM.Rdb.Get(ctx, roomNameToRedisId(roomName)).Scan(&initialRoom)
+ if err != nil {
+ log.Printf("in stream for %s got initial error %s/n", roomName, err)
+ close(roomChannel)
+ return
+ }
+ roomChannel <- initialRoom
+
+ for {
+ select {
+ case msg := <-redisMsgChan:
+ log.Printf("> subscribe got message %v\n", msg)
+
+ if msg == nil {
+ log.Print("> subscribe got nil redis message for some reason")
+ return
+ }
+ log.Printf(">>> chan: %s, patt: %s, payload: %s, payloadSlice: %v\n", msg.Channel, msg.Pattern, msg.Payload, msg.PayloadSlice)
+ if msg.Payload == "del" {
+ // room has been deleted, can stop the stream
+ close(roomChannel)
+ return
+ }
+ // for any other event - read the room state and put into room channel
+ var room Room
+ err := redisRM.Rdb.Get(ctx, roomNameToRedisId(roomName)).Scan(&room)
+ if err != nil {
+ log.Printf("in stream for %s got error %s/n", roomName, err)
+ close(roomChannel)
+ }
+ roomChannel <- room
+ case <-ctx.Done():
+ log.Println("got Done in subscribe")
+ close(roomChannel)
+ return
+ }
+
+ }
+ }()
+
+ return roomChannel
+}
+
var maxRetries int = 20
+
func (redisRM RedisRM) Update(ctx context.Context, roomName string, f func(fromRoom Room) (toRoom Room)) error {
// transactional function
roomKey := roomNameToRedisId(roomName)
- txf := func (tx *redis.Tx) error {
+ txf := func(tx *redis.Tx) error {
var savedRoom Room
err := tx.Get(ctx, roomKey).Scan(&savedRoom)
if err != nil {
@@ -95,6 +154,7 @@ func (redisRM RedisRM) Update(ctx context.Context, roomName string, f func(fromR
room := f(savedRoom)
_, err = tx.Pipelined(ctx, func(pipe redis.Pipeliner) error {
+ log.Printf(">> about to Set %s to %v", roomName, room)
pipe.Set(ctx, roomKey, &room, 0)
return nil
})
diff --git a/routes/room_page.go b/routes/room_page.go
index b372248..19bf940 100644
--- a/routes/room_page.go
+++ b/routes/room_page.go
@@ -1,15 +1,15 @@
package routes
import (
+ "bytes"
"context"
"embed"
"fmt"
"html/template"
"log"
- "math/rand"
"net/http"
"strconv"
- "time"
+ "strings"
"sunshine.industries/some-automoderation/rooms"
"sunshine.industries/some-automoderation/sessions"
@@ -17,6 +17,7 @@ import (
const roomPath = "/room/"
const raiseHandPath = "/rooms/raise/"
+const subscribeRoomPath = "/rooms/subscribe"
// registering all routes for page and logic of /room/:roomName
func registerPageRoutes(
@@ -24,8 +25,6 @@ func registerPageRoutes(
sessionSM sessions.SessionManagement,
roomsM rooms.RoomManager,
) {
- http.HandleFunc("/rooms/random", streamingBsRoute())
-
http.Handle(roomPath, // ending in / captures all following path sections, i.e room name
authedPageMiddleware(
sessionSM,
@@ -35,36 +34,65 @@ func registerPageRoutes(
authedPageMiddleware(
sessionSM,
http.StripPrefix(raiseHandPath, raiseGestureHandRoute(templateFs, roomsM))))
+
+ http.Handle(subscribeRoomPath,
+ authedPageMiddleware(
+ sessionSM,
+ http.StripPrefix(subscribeRoomPath, streamingRoomStates(templateFs, roomsM))))
}
-func streamingBsRoute() http.HandlerFunc {
+func streamingRoomStates(
+ templateFs *embed.FS,
+ roomsM rooms.RoomManager,
+) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
- queryParam := r.FormValue("mobile")
+ roomName := r.FormValue("roomName")
+ defer log.Printf("/rooms/subscribe/%s stream ended\n", roomName)
+
+ session, found := getContextSession(r.Context())
+ if !found {
+ log.Printf("/rooms/raiseGesture session not found, should be impossible")
+ // TODO return error i guess
+ return
+ }
+ if session.RoomId != roomName {
+ // not authorized
+ log.Printf("/rooms/streamingRoom got unauth with session.RoomId (%s) != roomName (%s)", session.RoomId, roomName)
+ w.WriteHeader(http.StatusUnauthorized)
+ return
+ }
+ log.Printf("Starting stream for room %s for %d\n", roomName, session.PersonId)
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Content-Type", "text/event-stream")
- startTime, endTime := 0, 0
- for {
- select {
- case <-r.Context().Done():
- log.Printf("canlecced streaming!")
- return
- default:
- log.Printf("another step in streaming bs")
- data := "data: hello with data %d! waited %d. mobile is %s
\n\n"
- startTime = time.Now().Nanosecond()
- diff := endTime - startTime
- fmt.Fprintf(w, data, rand.Intn(100), diff, queryParam)
- w.(http.Flusher).Flush()
- time.Sleep(3 * time.Second)
- endTime = time.Now().Nanosecond()
+
+ templFile := "templates/room.gohtml"
+ tmpl := template.Must(template.ParseFS(templateFs, templFile))
+
+ roomStream := roomsM.Subscribe(r.Context(), roomName)
+ for room := range roomStream {
+ // log.Printf("/rooms/streamingRoom iterating with %+v", room)
+ fmt.Fprint(w, "data: ")
+
+ var buffer bytes.Buffer
+
+ err := tmpl.ExecuteTemplate(&buffer, "simpleRoomShow", room)
+ if err != nil {
+ log.Printf("/rooms/subscribe/%s got error on template %s", roomName, err)
}
+
+ templateStr := buffer.String()
+ templateLine := strings.ReplaceAll(templateStr, "\n", "")
+ fmt.Fprint(w, templateLine)
+
+ fmt.Fprint(w, "\n\n")
+ w.(http.Flusher).Flush()
}
}
}
-// if currently speaking? i guess first lower the hand and then raise new
+// if currently speaking? i guess first lower the hand and then raise new
func raiseGestureHandRoute(
templateFs *embed.FS,
roomsM rooms.RoomManager,
@@ -131,6 +159,14 @@ func roomPageRoute(
return
}
+ room, found, err := roomsM.Get(roomName)
+ if err != nil || !found {
+ log.Printf("/room room for name %s not found or err: %s / found %d", roomName, err, found)
+ // TODO here should be append to error place
+ w.Header().Add("HX-Redirect", "/")
+ return
+ }
+
// now we should have a session for this specific room
fmt.Printf("all checks for room %s passed with %+v", roomName, session)
@@ -150,14 +186,14 @@ func roomPageRoute(
}
pageData := struct {
- RoomName string
+ Room rooms.Room
Gestures []GestureData
}{
- RoomName: roomName,
+ Room: room,
Gestures: gesturesData,
}
- err := tmpl.Execute(w, pageData)
+ err = tmpl.Execute(w, pageData)
if err != nil {
log.Printf("/room/%s my error in executing template, huh\n %s", roomName, err)
}
diff --git a/routes/static/out.css b/routes/static/out.css
index 5e7ff64..e213bda 100644
--- a/routes/static/out.css
+++ b/routes/static/out.css
@@ -550,10 +550,6 @@ video {
display: grid;
}
-.hidden {
- display: none;
-}
-
.h-full {
height: 100%;
}
@@ -617,14 +613,9 @@ video {
background-color: rgb(251 191 36 / var(--tw-bg-opacity));
}
-.bg-blue-100 {
+.bg-blue-200 {
--tw-bg-opacity: 1;
- background-color: rgb(219 234 254 / var(--tw-bg-opacity));
-}
-
-.bg-blue-300 {
- --tw-bg-opacity: 1;
- background-color: rgb(147 197 253 / var(--tw-bg-opacity));
+ background-color: rgb(191 219 254 / var(--tw-bg-opacity));
}
.bg-green-300 {
@@ -663,13 +654,3 @@ video {
--tw-bg-opacity: 1;
background-color: rgb(185 28 28 / var(--tw-bg-opacity));
}
-
-@media (min-width: 768px) {
- .md\:block {
- display: block;
- }
-
- .md\:hidden {
- display: none;
- }
-}
diff --git a/routes/templates/room.gohtml b/routes/templates/room.gohtml
index 8623f0d..7f7834d 100644
--- a/routes/templates/room.gohtml
+++ b/routes/templates/room.gohtml
@@ -3,7 +3,7 @@
- Room {{ .RoomName }} : Some Automoderation
+ Room {{ .Room.Name }} : Some Automoderation
@@ -30,32 +30,30 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- koko
- {{ range .Gestures }}
-
- {{ end }}
-
-
+
+
+ {{ block "simpleRoomShow" .Room }}
+
+
+
{{ . }}
+ {{ end }}
+
+
+
Room name is "{{ .Room.Name }}"
+ {{ range .Gestures }}
+
+ {{ end }}
+