186 lines
4.5 KiB
Go
186 lines
4.5 KiB
Go
package rooms
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"math/rand"
|
|
"time"
|
|
|
|
"github.com/redis/go-redis/v9"
|
|
)
|
|
|
|
type PersonId int
|
|
|
|
func (p PersonId) MarshalBinary() ([]byte, error) {
|
|
bytes, err := json.Marshal(p)
|
|
return bytes, err
|
|
}
|
|
|
|
func (p *PersonId) UnmarshalBinary(data []byte) error {
|
|
err := json.Unmarshal(data, p)
|
|
return err
|
|
}
|
|
|
|
// TODO move to rooms i guess
|
|
func RandomPersonId() PersonId {
|
|
randInt := rand.Int()
|
|
if randInt == 0 {
|
|
randInt = 1
|
|
}
|
|
return PersonId(randInt)
|
|
}
|
|
|
|
type Person struct {
|
|
Id PersonId
|
|
Name string
|
|
PasswordHash string
|
|
}
|
|
|
|
// well, it seems that i'd better do marshalling into bytes then
|
|
// see https://github.com/redis/go-redis/issues/2512
|
|
func (r *Room) MarshalBinary() (data []byte, err error) {
|
|
return json.Marshal(r)
|
|
}
|
|
func (r *Room) UnmarshalBinary(data []byte) error {
|
|
return json.Unmarshal(data, r)
|
|
}
|
|
|
|
// let's check whether it will be possible to save nested structs
|
|
|
|
type RoomManager interface {
|
|
Get(ctx context.Context, roomName string) (Room, bool, error)
|
|
Save(ctx context.Context, room Room) error
|
|
Update(ctx context.Context, roomName string, f func(fromRoom Room) (toRoom Room)) error
|
|
Subscribe(ctx context.Context, roomName string) <-chan Room
|
|
}
|
|
|
|
const roomRedisPrefix = "room"
|
|
const roomTtl = 24 * time.Hour
|
|
|
|
func roomNameToRedisId(roomName string) string {
|
|
return fmt.Sprintf("%s:%s", roomRedisPrefix, roomName)
|
|
}
|
|
|
|
type RedisRM struct {
|
|
Rdb *redis.Client
|
|
}
|
|
|
|
func (redisRM RedisRM) Get(ctx context.Context, roomName string) (Room, bool, error) {
|
|
var readRoom Room
|
|
err := redisRM.Rdb.Get(ctx, roomNameToRedisId(roomName)).Scan(&readRoom)
|
|
if err == redis.Nil {
|
|
return Room{}, false, nil
|
|
}
|
|
if err != nil {
|
|
log.Printf("error reading room with id %s : %s", roomName, err)
|
|
return Room{}, false, err
|
|
}
|
|
|
|
return readRoom, true, nil
|
|
}
|
|
|
|
func (redisRM RedisRM) Subscribe(ctx context.Context, roomName string) <-chan Room {
|
|
// pubsub := redisRM.Rdb.Subscribe(ctx, roomNameToRedisId(roomName))
|
|
key := fmt.Sprintf( "__keyspace@0__:room:%s", roomName)
|
|
log.Printf("><> about to subscribe to %s\n", key)
|
|
pubsub := redisRM.Rdb.Subscribe(ctx, key)
|
|
|
|
redisMsgChan := pubsub.Channel()
|
|
|
|
roomChannel := make(chan Room)
|
|
|
|
go func() {
|
|
defer log.Printf(">>> stopping redis subscription to %s\n", roomName)
|
|
defer pubsub.Close() // this was my problem, cooool
|
|
|
|
var initialRoom Room
|
|
err := redisRM.Rdb.Get(ctx, roomNameToRedisId(roomName)).Scan(&initialRoom)
|
|
if err != nil {
|
|
log.Printf("in stream for %s got initial error %s/n", roomName, err)
|
|
close(roomChannel)
|
|
return
|
|
}
|
|
roomChannel <- initialRoom
|
|
|
|
for {
|
|
select {
|
|
case msg := <-redisMsgChan:
|
|
log.Printf("> subscribe got message %v\n", msg)
|
|
|
|
if msg == nil {
|
|
log.Print("> subscribe got nil redis message for some reason")
|
|
return
|
|
}
|
|
log.Printf(">>> chan: %s, patt: %s, payload: %s, payloadSlice: %v\n", msg.Channel, msg.Pattern, msg.Payload, msg.PayloadSlice)
|
|
if msg.Payload == "del" {
|
|
// room has been deleted, can stop the stream
|
|
close(roomChannel)
|
|
return
|
|
}
|
|
// for any other event - read the room state and put into room channel
|
|
var room Room
|
|
err := redisRM.Rdb.Get(ctx, roomNameToRedisId(roomName)).Scan(&room)
|
|
if err != nil {
|
|
log.Printf("in stream for %s got error %s/n", roomName, err)
|
|
close(roomChannel)
|
|
}
|
|
roomChannel <- room
|
|
case <-ctx.Done():
|
|
log.Println("got Done in subscribe")
|
|
close(roomChannel)
|
|
return
|
|
}
|
|
|
|
}
|
|
}()
|
|
|
|
return roomChannel
|
|
}
|
|
|
|
var maxRetries int = 20
|
|
|
|
func (redisRM RedisRM) Update(ctx context.Context, roomName string, f func(fromRoom Room) (toRoom Room)) error {
|
|
// transactional function
|
|
roomKey := roomNameToRedisId(roomName)
|
|
txf := func(tx *redis.Tx) error {
|
|
var savedRoom Room
|
|
err := tx.Get(ctx, roomKey).Scan(&savedRoom)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
savedRoom.InitMaps()
|
|
room := f(savedRoom)
|
|
|
|
_, err = tx.Pipelined(ctx, func(pipe redis.Pipeliner) error {
|
|
log.Printf(">> about to Set %s to %v", roomName, room)
|
|
pipe.Set(ctx, roomKey, &room, roomTtl)
|
|
return nil
|
|
})
|
|
|
|
return err
|
|
}
|
|
for i := 0; i < maxRetries; i++ {
|
|
err := redisRM.Rdb.Watch(ctx, txf, roomKey)
|
|
if err == nil {
|
|
return nil // success
|
|
}
|
|
if err == redis.TxFailedErr {
|
|
// optimistic lock will keep spinning
|
|
continue
|
|
}
|
|
// non tx errror are returned, including redis.Nil
|
|
return err
|
|
}
|
|
|
|
return errors.New("update reached maximum amount of retries")
|
|
}
|
|
|
|
func (redisRM RedisRM) Save(ctx context.Context, room Room) error {
|
|
err := redisRM.Rdb.Set(ctx, roomNameToRedisId(room.Name), &room, roomTtl).Err()
|
|
return err
|
|
}
|