feat: stream room updates SSE endpoint

via subscription to redis with enriched channel
This commit is contained in:
efim 2023-11-12 18:01:42 +00:00
parent 34d610a8c8
commit b90fcc3f20
6 changed files with 163 additions and 88 deletions

View File

@ -29,11 +29,10 @@ func main() {
DB: 0, DB: 0,
}) })
fmt.Printf("Server will start on port %d\n", port)
roomsM := rooms.RedisRM { Rdb: rdb, } roomsM := rooms.RedisRM { Rdb: rdb, }
sessions := sessions.RedisSM{ Rdb: rdb, } sessions := sessions.RedisSM{ Rdb: rdb, }
log.Printf("Server will start on port %d\n", port)
routes.RegisterRoutes(sessions, roomsM) routes.RegisterRoutes(sessions, roomsM)
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil)) log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))

View File

@ -35,10 +35,11 @@ func (r *Room)InitMaps() {
// if you are speaking - change nothing // if you are speaking - change nothing
// if nobody is speaking, set this person as a first speaker // if nobody is speaking, set this person as a first speaker
func (r *Room) RaiseHand(p PersonId, gesture HandGesture) Room { func (r *Room) RaiseHand(p PersonId, gesture HandGesture) Room {
if (r.CurrentSpeaker == p) { // TODO This is temporary
// if person already speaking, should first end speaking // if (r.CurrentSpeaker == p) {
return *r // // if person already speaking, should first end speaking
} // return *r
// }
r.ParticipantHands[p] = gesture r.ParticipantHands[p] = gesture
if r.CurrentSpeaker == PersonId(0) { if r.CurrentSpeaker == PersonId(0) {
r.CurrentSpeaker = p r.CurrentSpeaker = p

View File

@ -5,8 +5,8 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"math/rand"
"log" "log"
"math/rand"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
) )
@ -15,26 +15,25 @@ type PersonId int
func (p PersonId) MarshalBinary() ([]byte, error) { func (p PersonId) MarshalBinary() ([]byte, error) {
bytes, err := json.Marshal(p) bytes, err := json.Marshal(p)
return bytes, err return bytes, err
} }
func (p *PersonId) UnmarshalBinary(data []byte) error { func (p *PersonId) UnmarshalBinary(data []byte) error {
err := json.Unmarshal(data, p) err := json.Unmarshal(data, p)
return err return err
} }
// TODO move to rooms i guess // TODO move to rooms i guess
func RandomPersonId() PersonId { func RandomPersonId() PersonId {
randInt := rand.Int() randInt := rand.Int()
if randInt == 0 { if randInt == 0 {
randInt = 1 randInt = 1
} }
return PersonId(randInt) return PersonId(randInt)
} }
type Person struct { type Person struct {
Id PersonId Id PersonId
Name string Name string
PasswordHash string PasswordHash string
} }
@ -48,20 +47,21 @@ func (r *Room) UnmarshalBinary(data []byte) error {
return json.Unmarshal(data, r) return json.Unmarshal(data, r)
} }
// let's check whether it will be possible to save nested structs // let's check whether it will be possible to save nested structs
type RoomManager interface { type RoomManager interface {
Get(roomName string) (Room, bool, error) Get(roomName string) (Room, bool, error)
Save(room Room) error Save(room Room) error
Update(ctx context.Context, roomName string, f func(fromRoom Room) (toRoom Room)) error Update(ctx context.Context, roomName string, f func(fromRoom Room) (toRoom Room)) error
Subscribe(ctx context.Context, roomName string) <-chan Room
} }
const roomRedisPrefix = "room" const roomRedisPrefix = "room"
func roomNameToRedisId(roomName string) string { func roomNameToRedisId(roomName string) string {
return fmt.Sprintf("%s:%s", roomRedisPrefix, roomName) return fmt.Sprintf("%s:%s", roomRedisPrefix, roomName)
} }
type RedisRM struct { type RedisRM struct {
Rdb *redis.Client Rdb *redis.Client
} }
@ -80,11 +80,70 @@ func (redisRM RedisRM) Get(roomName string) (Room, bool, error) {
return readRoom, true, nil return readRoom, true, nil
} }
func (redisRM RedisRM) Subscribe(ctx context.Context, roomName string) <-chan Room {
// pubsub := redisRM.Rdb.Subscribe(ctx, roomNameToRedisId(roomName))
key := fmt.Sprintf( "__keyspace@0__:room:%s", roomName)
log.Printf("><> about to subscribe to %s\n", key)
pubsub := redisRM.Rdb.Subscribe(ctx, key)
redisMsgChan := pubsub.Channel()
roomChannel := make(chan Room)
go func() {
defer log.Printf(">>> stopping redis subscription to %s\n", roomName)
defer pubsub.Close() // this was my problem, cooool
var initialRoom Room
err := redisRM.Rdb.Get(ctx, roomNameToRedisId(roomName)).Scan(&initialRoom)
if err != nil {
log.Printf("in stream for %s got initial error %s/n", roomName, err)
close(roomChannel)
return
}
roomChannel <- initialRoom
for {
select {
case msg := <-redisMsgChan:
log.Printf("> subscribe got message %v\n", msg)
if msg == nil {
log.Print("> subscribe got nil redis message for some reason")
return
}
log.Printf(">>> chan: %s, patt: %s, payload: %s, payloadSlice: %v\n", msg.Channel, msg.Pattern, msg.Payload, msg.PayloadSlice)
if msg.Payload == "del" {
// room has been deleted, can stop the stream
close(roomChannel)
return
}
// for any other event - read the room state and put into room channel
var room Room
err := redisRM.Rdb.Get(ctx, roomNameToRedisId(roomName)).Scan(&room)
if err != nil {
log.Printf("in stream for %s got error %s/n", roomName, err)
close(roomChannel)
}
roomChannel <- room
case <-ctx.Done():
log.Println("got Done in subscribe")
close(roomChannel)
return
}
}
}()
return roomChannel
}
var maxRetries int = 20 var maxRetries int = 20
func (redisRM RedisRM) Update(ctx context.Context, roomName string, f func(fromRoom Room) (toRoom Room)) error { func (redisRM RedisRM) Update(ctx context.Context, roomName string, f func(fromRoom Room) (toRoom Room)) error {
// transactional function // transactional function
roomKey := roomNameToRedisId(roomName) roomKey := roomNameToRedisId(roomName)
txf := func (tx *redis.Tx) error { txf := func(tx *redis.Tx) error {
var savedRoom Room var savedRoom Room
err := tx.Get(ctx, roomKey).Scan(&savedRoom) err := tx.Get(ctx, roomKey).Scan(&savedRoom)
if err != nil { if err != nil {
@ -95,6 +154,7 @@ func (redisRM RedisRM) Update(ctx context.Context, roomName string, f func(fromR
room := f(savedRoom) room := f(savedRoom)
_, err = tx.Pipelined(ctx, func(pipe redis.Pipeliner) error { _, err = tx.Pipelined(ctx, func(pipe redis.Pipeliner) error {
log.Printf(">> about to Set %s to %v", roomName, room)
pipe.Set(ctx, roomKey, &room, 0) pipe.Set(ctx, roomKey, &room, 0)
return nil return nil
}) })

View File

@ -1,15 +1,15 @@
package routes package routes
import ( import (
"bytes"
"context" "context"
"embed" "embed"
"fmt" "fmt"
"html/template" "html/template"
"log" "log"
"math/rand"
"net/http" "net/http"
"strconv" "strconv"
"time" "strings"
"sunshine.industries/some-automoderation/rooms" "sunshine.industries/some-automoderation/rooms"
"sunshine.industries/some-automoderation/sessions" "sunshine.industries/some-automoderation/sessions"
@ -17,6 +17,7 @@ import (
const roomPath = "/room/" const roomPath = "/room/"
const raiseHandPath = "/rooms/raise/" const raiseHandPath = "/rooms/raise/"
const subscribeRoomPath = "/rooms/subscribe"
// registering all routes for page and logic of /room/:roomName // registering all routes for page and logic of /room/:roomName
func registerPageRoutes( func registerPageRoutes(
@ -24,8 +25,6 @@ func registerPageRoutes(
sessionSM sessions.SessionManagement, sessionSM sessions.SessionManagement,
roomsM rooms.RoomManager, roomsM rooms.RoomManager,
) { ) {
http.HandleFunc("/rooms/random", streamingBsRoute())
http.Handle(roomPath, // ending in / captures all following path sections, i.e room name http.Handle(roomPath, // ending in / captures all following path sections, i.e room name
authedPageMiddleware( authedPageMiddleware(
sessionSM, sessionSM,
@ -35,36 +34,65 @@ func registerPageRoutes(
authedPageMiddleware( authedPageMiddleware(
sessionSM, sessionSM,
http.StripPrefix(raiseHandPath, raiseGestureHandRoute(templateFs, roomsM)))) http.StripPrefix(raiseHandPath, raiseGestureHandRoute(templateFs, roomsM))))
http.Handle(subscribeRoomPath,
authedPageMiddleware(
sessionSM,
http.StripPrefix(subscribeRoomPath, streamingRoomStates(templateFs, roomsM))))
} }
func streamingBsRoute() http.HandlerFunc { func streamingRoomStates(
templateFs *embed.FS,
roomsM rooms.RoomManager,
) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
r.ParseForm() r.ParseForm()
queryParam := r.FormValue("mobile") roomName := r.FormValue("roomName")
defer log.Printf("/rooms/subscribe/%s stream ended\n", roomName)
session, found := getContextSession(r.Context())
if !found {
log.Printf("/rooms/raiseGesture session not found, should be impossible")
// TODO return error i guess
return
}
if session.RoomId != roomName {
// not authorized
log.Printf("/rooms/streamingRoom got unauth with session.RoomId (%s) != roomName (%s)", session.RoomId, roomName)
w.WriteHeader(http.StatusUnauthorized)
return
}
log.Printf("Starting stream for room %s for %d\n", roomName, session.PersonId)
w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive") w.Header().Set("Connection", "keep-alive")
w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Content-Type", "text/event-stream")
startTime, endTime := 0, 0
for { templFile := "templates/room.gohtml"
select { tmpl := template.Must(template.ParseFS(templateFs, templFile))
case <-r.Context().Done():
log.Printf("canlecced streaming!") roomStream := roomsM.Subscribe(r.Context(), roomName)
return for room := range roomStream {
default: // log.Printf("/rooms/streamingRoom iterating with %+v", room)
log.Printf("another step in streaming bs") fmt.Fprint(w, "data: ")
data := "data: <div>hello with data %d! waited %d. mobile is %s</div> \n\n"
startTime = time.Now().Nanosecond() var buffer bytes.Buffer
diff := endTime - startTime
fmt.Fprintf(w, data, rand.Intn(100), diff, queryParam) err := tmpl.ExecuteTemplate(&buffer, "simpleRoomShow", room)
w.(http.Flusher).Flush() if err != nil {
time.Sleep(3 * time.Second) log.Printf("/rooms/subscribe/%s got error on template %s", roomName, err)
endTime = time.Now().Nanosecond()
} }
templateStr := buffer.String()
templateLine := strings.ReplaceAll(templateStr, "\n", "")
fmt.Fprint(w, templateLine)
fmt.Fprint(w, "\n\n")
w.(http.Flusher).Flush()
} }
} }
} }
// if currently speaking? i guess first lower the hand and then raise new // if currently speaking? i guess first lower the hand and then raise new
func raiseGestureHandRoute( func raiseGestureHandRoute(
templateFs *embed.FS, templateFs *embed.FS,
roomsM rooms.RoomManager, roomsM rooms.RoomManager,
@ -131,6 +159,14 @@ func roomPageRoute(
return return
} }
room, found, err := roomsM.Get(roomName)
if err != nil || !found {
log.Printf("/room room for name %s not found or err: %s / found %d", roomName, err, found)
// TODO here should be append to error place
w.Header().Add("HX-Redirect", "/")
return
}
// now we should have a session for this specific room // now we should have a session for this specific room
fmt.Printf("all checks for room %s passed with %+v", roomName, session) fmt.Printf("all checks for room %s passed with %+v", roomName, session)
@ -150,14 +186,14 @@ func roomPageRoute(
} }
pageData := struct { pageData := struct {
RoomName string Room rooms.Room
Gestures []GestureData Gestures []GestureData
}{ }{
RoomName: roomName, Room: room,
Gestures: gesturesData, Gestures: gesturesData,
} }
err := tmpl.Execute(w, pageData) err = tmpl.Execute(w, pageData)
if err != nil { if err != nil {
log.Printf("/room/%s my error in executing template, huh\n %s", roomName, err) log.Printf("/room/%s my error in executing template, huh\n %s", roomName, err)
} }

View File

@ -550,10 +550,6 @@ video {
display: grid; display: grid;
} }
.hidden {
display: none;
}
.h-full { .h-full {
height: 100%; height: 100%;
} }
@ -617,14 +613,9 @@ video {
background-color: rgb(251 191 36 / var(--tw-bg-opacity)); background-color: rgb(251 191 36 / var(--tw-bg-opacity));
} }
.bg-blue-100 { .bg-blue-200 {
--tw-bg-opacity: 1; --tw-bg-opacity: 1;
background-color: rgb(219 234 254 / var(--tw-bg-opacity)); background-color: rgb(191 219 254 / var(--tw-bg-opacity));
}
.bg-blue-300 {
--tw-bg-opacity: 1;
background-color: rgb(147 197 253 / var(--tw-bg-opacity));
} }
.bg-green-300 { .bg-green-300 {
@ -663,13 +654,3 @@ video {
--tw-bg-opacity: 1; --tw-bg-opacity: 1;
background-color: rgb(185 28 28 / var(--tw-bg-opacity)); background-color: rgb(185 28 28 / var(--tw-bg-opacity));
} }
@media (min-width: 768px) {
.md\:block {
display: block;
}
.md\:hidden {
display: none;
}
}

View File

@ -3,7 +3,7 @@
<head> <head>
<meta charset="utf-8" /> <meta charset="utf-8" />
<meta http-equiv="x-ua-compatible" content="ie=edge" /> <meta http-equiv="x-ua-compatible" content="ie=edge" />
<title>Room {{ .RoomName }} : Some Automoderation</title> <title>Room {{ .Room.Name }} : Some Automoderation</title>
<meta name="description" content="" /> <meta name="description" content="" />
<meta name="viewport" content="width=device-width, initial-scale=1" /> <meta name="viewport" content="width=device-width, initial-scale=1" />
@ -30,32 +30,30 @@
</p> </p>
<![endif]--> <![endif]-->
<div class="h-full w-full grid"> <div class="h-full w-full grid">
<script src="https://unpkg.com/htmx.org/dist/ext/sse.js"></script> <script src="https://unpkg.com/htmx.org/dist/ext/sse.js"></script>
<!-- <div id="state-receival" class="bg-blue-300 hidden md:block" --> <div
<!-- hx-ext="sse" sse-connect="/rooms/random" --> id="roomTextContainer"
<!-- > --> class="bg-blue-200"
<!-- <div> yoyo </div> --> hx-ext="sse"
<!-- <div sse-swap="message"> qweopop </div> --> sse-connect="/rooms/subscribe?roomName=test"
<!-- </div> --> >
<!-- should be done with custom event for screen change --> {{ block "simpleRoomShow" .Room }}
<!-- <div id="state-receival" class="bg-blue-100 md:hidden" --> <!-- TODO use template, not block, have only 'loader' in base place -->
<!-- hx-ext="sse" sse-connect="/rooms/random?mobile=true" --> <!-- use different template based on 'mobile' query param -->
<!-- > --> <div sse-swap="message">{{ . }}</div>
<!-- <div> qoqo </div> --> {{ end }}
<!-- <div sse-swap="message"> qweopop </div> --> </div>
<!-- </div> --> <div id="controls" class="bg-green-300">
<div id="controls" class="bg-green-300"> <p>Room name is "{{ .Room.Name }}"</p>
koko {{ range .Gestures }}
{{ range .Gestures }} <button
<button hx-get="{{ .Url }}"
hx-get="{{ .Url }}" class="bg-white rounded border-blue-700 border-2"
class="bg-white rounded border-blue-700 border-2" >
> {{ .Name }}
{{ .Name }} </button>
</button> {{ end }}
{{ end }} </div>
</div>
</div> </div>
</body> </body>
</html> </html>