diff --git a/main.go b/main.go index a10fdf8..cf5035e 100644 --- a/main.go +++ b/main.go @@ -29,11 +29,10 @@ func main() { DB: 0, }) - fmt.Printf("Server will start on port %d\n", port) - roomsM := rooms.RedisRM { Rdb: rdb, } sessions := sessions.RedisSM{ Rdb: rdb, } + log.Printf("Server will start on port %d\n", port) routes.RegisterRoutes(sessions, roomsM) log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil)) diff --git a/rooms/room.go b/rooms/room.go index 5c9d61d..8777fde 100644 --- a/rooms/room.go +++ b/rooms/room.go @@ -35,10 +35,11 @@ func (r *Room)InitMaps() { // if you are speaking - change nothing // if nobody is speaking, set this person as a first speaker func (r *Room) RaiseHand(p PersonId, gesture HandGesture) Room { - if (r.CurrentSpeaker == p) { - // if person already speaking, should first end speaking - return *r - } + // TODO This is temporary + // if (r.CurrentSpeaker == p) { + // // if person already speaking, should first end speaking + // return *r + // } r.ParticipantHands[p] = gesture if r.CurrentSpeaker == PersonId(0) { r.CurrentSpeaker = p diff --git a/rooms/rooms_manager.go b/rooms/rooms_manager.go index ed1be8a..b5f881d 100644 --- a/rooms/rooms_manager.go +++ b/rooms/rooms_manager.go @@ -5,8 +5,8 @@ import ( "encoding/json" "errors" "fmt" - "math/rand" "log" + "math/rand" "github.com/redis/go-redis/v9" ) @@ -15,26 +15,25 @@ type PersonId int func (p PersonId) MarshalBinary() ([]byte, error) { bytes, err := json.Marshal(p) - return bytes, err + return bytes, err } func (p *PersonId) UnmarshalBinary(data []byte) error { err := json.Unmarshal(data, p) - return err + return err } // TODO move to rooms i guess func RandomPersonId() PersonId { randInt := rand.Int() if randInt == 0 { - randInt = 1 + randInt = 1 } return PersonId(randInt) } - type Person struct { - Id PersonId + Id PersonId Name string PasswordHash string } @@ -48,20 +47,21 @@ func (r *Room) UnmarshalBinary(data []byte) error { return json.Unmarshal(data, r) } - // let's check whether it will be possible to save nested structs type RoomManager interface { Get(roomName string) (Room, bool, error) Save(room Room) error Update(ctx context.Context, roomName string, f func(fromRoom Room) (toRoom Room)) error + Subscribe(ctx context.Context, roomName string) <-chan Room } + const roomRedisPrefix = "room" + func roomNameToRedisId(roomName string) string { return fmt.Sprintf("%s:%s", roomRedisPrefix, roomName) } - type RedisRM struct { Rdb *redis.Client } @@ -80,11 +80,70 @@ func (redisRM RedisRM) Get(roomName string) (Room, bool, error) { return readRoom, true, nil } +func (redisRM RedisRM) Subscribe(ctx context.Context, roomName string) <-chan Room { + // pubsub := redisRM.Rdb.Subscribe(ctx, roomNameToRedisId(roomName)) + key := fmt.Sprintf( "__keyspace@0__:room:%s", roomName) + log.Printf("><> about to subscribe to %s\n", key) + pubsub := redisRM.Rdb.Subscribe(ctx, key) + + redisMsgChan := pubsub.Channel() + + roomChannel := make(chan Room) + + go func() { + defer log.Printf(">>> stopping redis subscription to %s\n", roomName) + defer pubsub.Close() // this was my problem, cooool + + var initialRoom Room + err := redisRM.Rdb.Get(ctx, roomNameToRedisId(roomName)).Scan(&initialRoom) + if err != nil { + log.Printf("in stream for %s got initial error %s/n", roomName, err) + close(roomChannel) + return + } + roomChannel <- initialRoom + + for { + select { + case msg := <-redisMsgChan: + log.Printf("> subscribe got message %v\n", msg) + + if msg == nil { + log.Print("> subscribe got nil redis message for some reason") + return + } + log.Printf(">>> chan: %s, patt: %s, payload: %s, payloadSlice: %v\n", msg.Channel, msg.Pattern, msg.Payload, msg.PayloadSlice) + if msg.Payload == "del" { + // room has been deleted, can stop the stream + close(roomChannel) + return + } + // for any other event - read the room state and put into room channel + var room Room + err := redisRM.Rdb.Get(ctx, roomNameToRedisId(roomName)).Scan(&room) + if err != nil { + log.Printf("in stream for %s got error %s/n", roomName, err) + close(roomChannel) + } + roomChannel <- room + case <-ctx.Done(): + log.Println("got Done in subscribe") + close(roomChannel) + return + } + + } + }() + + return roomChannel +} + var maxRetries int = 20 + func (redisRM RedisRM) Update(ctx context.Context, roomName string, f func(fromRoom Room) (toRoom Room)) error { // transactional function roomKey := roomNameToRedisId(roomName) - txf := func (tx *redis.Tx) error { + txf := func(tx *redis.Tx) error { var savedRoom Room err := tx.Get(ctx, roomKey).Scan(&savedRoom) if err != nil { @@ -95,6 +154,7 @@ func (redisRM RedisRM) Update(ctx context.Context, roomName string, f func(fromR room := f(savedRoom) _, err = tx.Pipelined(ctx, func(pipe redis.Pipeliner) error { + log.Printf(">> about to Set %s to %v", roomName, room) pipe.Set(ctx, roomKey, &room, 0) return nil }) diff --git a/routes/room_page.go b/routes/room_page.go index b372248..19bf940 100644 --- a/routes/room_page.go +++ b/routes/room_page.go @@ -1,15 +1,15 @@ package routes import ( + "bytes" "context" "embed" "fmt" "html/template" "log" - "math/rand" "net/http" "strconv" - "time" + "strings" "sunshine.industries/some-automoderation/rooms" "sunshine.industries/some-automoderation/sessions" @@ -17,6 +17,7 @@ import ( const roomPath = "/room/" const raiseHandPath = "/rooms/raise/" +const subscribeRoomPath = "/rooms/subscribe" // registering all routes for page and logic of /room/:roomName func registerPageRoutes( @@ -24,8 +25,6 @@ func registerPageRoutes( sessionSM sessions.SessionManagement, roomsM rooms.RoomManager, ) { - http.HandleFunc("/rooms/random", streamingBsRoute()) - http.Handle(roomPath, // ending in / captures all following path sections, i.e room name authedPageMiddleware( sessionSM, @@ -35,36 +34,65 @@ func registerPageRoutes( authedPageMiddleware( sessionSM, http.StripPrefix(raiseHandPath, raiseGestureHandRoute(templateFs, roomsM)))) + + http.Handle(subscribeRoomPath, + authedPageMiddleware( + sessionSM, + http.StripPrefix(subscribeRoomPath, streamingRoomStates(templateFs, roomsM)))) } -func streamingBsRoute() http.HandlerFunc { +func streamingRoomStates( + templateFs *embed.FS, + roomsM rooms.RoomManager, +) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { r.ParseForm() - queryParam := r.FormValue("mobile") + roomName := r.FormValue("roomName") + defer log.Printf("/rooms/subscribe/%s stream ended\n", roomName) + + session, found := getContextSession(r.Context()) + if !found { + log.Printf("/rooms/raiseGesture session not found, should be impossible") + // TODO return error i guess + return + } + if session.RoomId != roomName { + // not authorized + log.Printf("/rooms/streamingRoom got unauth with session.RoomId (%s) != roomName (%s)", session.RoomId, roomName) + w.WriteHeader(http.StatusUnauthorized) + return + } + log.Printf("Starting stream for room %s for %d\n", roomName, session.PersonId) w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("Content-Type", "text/event-stream") - startTime, endTime := 0, 0 - for { - select { - case <-r.Context().Done(): - log.Printf("canlecced streaming!") - return - default: - log.Printf("another step in streaming bs") - data := "data:
Room name is "{{ .Room.Name }}"
+ {{ range .Gestures }} + + {{ end }} +