package rooms import ( "context" "encoding/json" "errors" "fmt" "log" "math/rand" "github.com/redis/go-redis/v9" ) type PersonId int func (p PersonId) MarshalBinary() ([]byte, error) { bytes, err := json.Marshal(p) return bytes, err } func (p *PersonId) UnmarshalBinary(data []byte) error { err := json.Unmarshal(data, p) return err } // TODO move to rooms i guess func RandomPersonId() PersonId { randInt := rand.Int() if randInt == 0 { randInt = 1 } return PersonId(randInt) } type Person struct { Id PersonId Name string PasswordHash string } // well, it seems that i'd better do marshalling into bytes then // see https://github.com/redis/go-redis/issues/2512 func (r *Room) MarshalBinary() (data []byte, err error) { return json.Marshal(r) } func (r *Room) UnmarshalBinary(data []byte) error { return json.Unmarshal(data, r) } // let's check whether it will be possible to save nested structs type RoomManager interface { Get(roomName string) (Room, bool, error) Save(room Room) error Update(ctx context.Context, roomName string, f func(fromRoom Room) (toRoom Room)) error Subscribe(ctx context.Context, roomName string) <-chan Room } const roomRedisPrefix = "room" func roomNameToRedisId(roomName string) string { return fmt.Sprintf("%s:%s", roomRedisPrefix, roomName) } type RedisRM struct { Rdb *redis.Client } func (redisRM RedisRM) Get(roomName string) (Room, bool, error) { var readRoom Room err := redisRM.Rdb.Get(context.TODO(), roomNameToRedisId(roomName)).Scan(&readRoom) if err == redis.Nil { return Room{}, false, nil } if err != nil { log.Printf("error reading room with id %s : %s", roomName, err) return Room{}, false, err } return readRoom, true, nil } func (redisRM RedisRM) Subscribe(ctx context.Context, roomName string) <-chan Room { // pubsub := redisRM.Rdb.Subscribe(ctx, roomNameToRedisId(roomName)) key := fmt.Sprintf( "__keyspace@0__:room:%s", roomName) log.Printf("><> about to subscribe to %s\n", key) pubsub := redisRM.Rdb.Subscribe(ctx, key) redisMsgChan := pubsub.Channel() roomChannel := make(chan Room) go func() { defer log.Printf(">>> stopping redis subscription to %s\n", roomName) defer pubsub.Close() // this was my problem, cooool var initialRoom Room err := redisRM.Rdb.Get(ctx, roomNameToRedisId(roomName)).Scan(&initialRoom) if err != nil { log.Printf("in stream for %s got initial error %s/n", roomName, err) close(roomChannel) return } roomChannel <- initialRoom for { select { case msg := <-redisMsgChan: log.Printf("> subscribe got message %v\n", msg) if msg == nil { log.Print("> subscribe got nil redis message for some reason") return } log.Printf(">>> chan: %s, patt: %s, payload: %s, payloadSlice: %v\n", msg.Channel, msg.Pattern, msg.Payload, msg.PayloadSlice) if msg.Payload == "del" { // room has been deleted, can stop the stream close(roomChannel) return } // for any other event - read the room state and put into room channel var room Room err := redisRM.Rdb.Get(ctx, roomNameToRedisId(roomName)).Scan(&room) if err != nil { log.Printf("in stream for %s got error %s/n", roomName, err) close(roomChannel) } roomChannel <- room case <-ctx.Done(): log.Println("got Done in subscribe") close(roomChannel) return } } }() return roomChannel } var maxRetries int = 20 func (redisRM RedisRM) Update(ctx context.Context, roomName string, f func(fromRoom Room) (toRoom Room)) error { // transactional function roomKey := roomNameToRedisId(roomName) txf := func(tx *redis.Tx) error { var savedRoom Room err := tx.Get(ctx, roomKey).Scan(&savedRoom) if err != nil { return err } savedRoom.InitMaps() room := f(savedRoom) _, err = tx.Pipelined(ctx, func(pipe redis.Pipeliner) error { log.Printf(">> about to Set %s to %v", roomName, room) pipe.Set(ctx, roomKey, &room, 0) return nil }) return err } for i := 0; i < maxRetries; i++ { err := redisRM.Rdb.Watch(ctx, txf, roomKey) if err == nil { return nil // success } if err == redis.TxFailedErr { // optimistic lock will keep spinning continue } // non tx errror are returned, including redis.Nil return err } return errors.New("update reached maximum amount of retries") } func (redisRM RedisRM) Save(room Room) error { err := redisRM.Rdb.Set(context.TODO(), roomNameToRedisId(room.Name), &room, 0).Err() // maybe even set expiration? return err }