diff --git a/backend/src/main/scala/industries/sunshine/planningpoker/RoomService.scala b/backend/src/main/scala/industries/sunshine/planningpoker/RoomService.scala index cc4f7a7..1dd8f85 100644 --- a/backend/src/main/scala/industries/sunshine/planningpoker/RoomService.scala +++ b/backend/src/main/scala/industries/sunshine/planningpoker/RoomService.scala @@ -1,8 +1,11 @@ package industries.sunshine.planningpoker import industries.sunshine.planningpoker.common.Models.* -import cats.effect.{Ref, Sync} +import cats.effect.{Ref, Concurrent} import cats.syntax.all._ +import fs2.Stream +import fs2.concurrent.Topic +import cats.data.EitherT enum RoomError { case RoomAlreadyExists(name: String) @@ -22,38 +25,69 @@ trait RoomService[F[_]] { ): F[Either[RoomError, PlayerID]] def deleteRoom(roomID: RoomID): F[Unit] def getRoom(roomID: RoomID): F[Option[Room]] + def subscribe(roomID: RoomID): Stream[F, Room] } -class InMemoryRoomService[F[_]: Sync](stateRef: Ref[F, Map[RoomID, Room]]) extends RoomService[F] { +class InMemoryRoomService[F[_]: Concurrent](stateRef: Ref[F, Map[RoomID, (Room, Topic[F, Room])]]) + extends RoomService[F] { + override def createRoom(newRoom: Room): F[Either[RoomError, Room]] = { - stateRef.modify { rooms => - rooms.get(newRoom.id) match { - case Some(_) => - rooms -> RoomError.RoomAlreadyExists(newRoom.id.name).asLeft[Room] - case None => - (rooms.updated(newRoom.id, newRoom)) -> newRoom.asRight[RoomError] + for { + updatesTopic <- Topic[F, Room] + room <- stateRef.modify { rooms => + rooms.get(newRoom.id) match { + case Some(_) => + rooms -> RoomError.RoomAlreadyExists(newRoom.id.name).asLeft[Room] + case None => + (rooms.updated(newRoom.id, (newRoom, updatesTopic))) -> newRoom.asRight[RoomError] + } } - } + } yield room } - override def updateRoom(room: Room): F[Unit] = stateRef.update { state => - state.get(room.id).fold(state)(oldRoom => state.updated(room.id, room)) + override def updateRoom(room: Room): F[Unit] = { + for { + // modify is function to update state and compute auxillary value to return, here - topic + topic <- stateRef.modify[Topic[F, Room]] { state => + state.get(room.id) match { + case Some((oldRoom, topic)) => state.updated(room.id, (room, topic)) -> topic + case None => + throw new IllegalStateException(s"updateRoom with ${room.id} on nonexistent room") + } + } + _ <- topic.publish1(room) // update and publish are not atomic, sadly races can happen + } yield () } - override def deleteRoom(roomID: RoomID): F[Unit] = stateRef.update(_.removed(roomID)) + override def deleteRoom(roomID: RoomID): F[Unit] = { + for { + topic <- stateRef.modify[Topic[F, Room]](state => + state.get(roomID) match { + case Some((oldRoom, topic)) => state.removed(roomID) -> topic + case None => + throw new IllegalStateException(s"call to delete with $roomID on nonexistent room") + // TODO - i'd prefer to swallow these errors + } + ) + _ <- topic.close + } yield () + } - override def getRoom(roomID: RoomID): F[Option[Room]] = stateRef.get.map(_.get(roomID)) + override def getRoom(roomID: RoomID): F[Option[Room]] = { + stateRef.get.map(_.get(roomID).map(_._1)) + } override def joinRoom( id: RoomID, nickName: String, nickPassword: String, roomPassword: String - ): F[Either[RoomError, PlayerID]] = stateRef.modify { rooms => - // need to cover cases: - // - player already present, then return as is, i guess - // - nick not known - add new player and new nick-password mapping - // - nick known - add new player + ): F[Either[RoomError, PlayerID]] = { + /** pure function that adds the player to the room need to cover cases: + * - player already present, then return as is, i guess + * - nick not known - add new player and new nick-password mapping + * - nick known - add new player + */ def addPlayer(room: Room): (PlayerID, Room) = { room.players.find(_.name == nickName) match { case Some(player) => player.id -> room @@ -70,8 +104,19 @@ class InMemoryRoomService[F[_]: Sync](stateRef: Ref[F, Map[RoomID, Room]]) exten } } - val joiningWithChecks = for { - room <- rooms.get(id).toRight(RoomError.RoomMissing(id.name)) + /** to be executed under Ref.modify (i.e with state acquired) checks of whether player can be + * added to the room: + * - room password is correct + * - nickname is either not taken, or correct password was provided + * @returns + * playerId (new or existing), updatedRoom (to be put into state), topic (to send the udpdate + * notification) + */ + def getWithChecks( + rooms: Map[RoomID, (Room, Topic[F, Room])] + ): Either[RoomError, (PlayerID, Room, Topic[F, Room])] = for { + roomAndTopic <- rooms.get(id).toRight(RoomError.RoomMissing(id.name)) + (room, topic) = roomAndTopic _ <- Either.cond(room.password == roomPassword, (), RoomError.RoomPassIncorrect) isNickPassCorrect = room.playersPasswords .get(nickName) @@ -82,14 +127,54 @@ class InMemoryRoomService[F[_]: Sync](stateRef: Ref[F, Map[RoomID, Room]]) exten RoomError.NickPassIncorrect ) (playerId, updatedRoom) = addPlayer(room) - } yield playerId + } yield (playerId, updatedRoom, topic) - rooms -> joiningWithChecks + // modify returns tuple (updatedState, valueToReturn) + // this particular update either keeps room as is, or adds the player + // and returns playerId and topic to be used outside + // + // NOTE here i have a lot of handwaving to pass topic outside + // because it's not possible to send F[Unit] update + // inside of the stateRef update (which works with pure functions?), + // so room notification change has to be returned to outside + val maybeUpdatedStateAndNotification = stateRef.modify { rooms => + val maybeAddedUser = getWithChecks(rooms) + val updatedState = + maybeAddedUser.fold( + _ => rooms, + { case (playerId, updatedRoom, topic) => rooms.updated(id, (updatedRoom, topic)) } + ) + val toReturn = maybeAddedUser.map { case (id, updatedRoom, topic) => + (id, topic.publish1(updatedRoom).void) + } + updatedState -> toReturn + } + + // now combining the effects : getting (updatedState & notificationEffect) or error + // executing notification + // returning only playerId + val result = for { + updatedState <- EitherT(maybeUpdatedStateAndNotification) + (playerId, notification) = updatedState + _ <- EitherT.liftF(notification) + } yield (playerId) + + result.value } + override def subscribe(roomID: RoomID): Stream[F, Room] = + Stream + .eval(stateRef.get) + .flatMap(rooms => + rooms.get(roomID) match { + case Some((room, topic)) => topic.subscribe(10) + case None => Stream.empty + } + ) + } object RoomService { - def make[F[_]: Sync]: F[RoomService[F]] = { - Ref.of[F, Map[RoomID, Room]](TestModels.testRooms).map(new InMemoryRoomService[F](_)) + def make[F[_]: Concurrent]: F[RoomService[F]] = { + Ref.of[F, Map[RoomID, (Room, Topic[F, Room])]](Map.empty).map(new InMemoryRoomService[F](_)) } } diff --git a/common/src/main/scala/industries/sunshine/planningpoker/Models.scala b/common/src/main/scala/industries/sunshine/planningpoker/Models.scala index f509bf5..39dd083 100644 --- a/common/src/main/scala/industries/sunshine/planningpoker/Models.scala +++ b/common/src/main/scala/industries/sunshine/planningpoker/Models.scala @@ -63,7 +63,7 @@ object Models { password: String, allowedCards: List[String], round: RoundState, - playersPasswords: Map[String, String] = Map.empty // nickname into password + playersPasswords: Map[String, String] = Map.empty, // nickname into password ) { def getViewFor(playerId: PlayerID): RoomStateView = { players