class: center, middle, inverse # Fake it till you make it with FS2 James Collier @JRCCollier  --- ## Playback Session Telemetry events will be emitted from devices. ```scala import java.time.Instant import java.util.UUID sealed trait PlaybackEvent { def sessionId: UUID } final case class Started(timestamp: Instant, sessionId: UUID) extends PlaybackEvent final case class Paused(timestamp: Instant, sessionId: UUID) extends PlaybackEvent final case class Resumed(timestamp: Instant, sessionId: UUID) extends PlaybackEvent final case class Ended(timestamp: Instant, sessionId: UUID) extends PlaybackEvent ``` --- ## 1000ft view  --- class: inverse ## The catch 😱 it's going to be a while till devices are instrumented! -- The show must go on! -- Need to develop while application teams are instrumenting. --- ## Fake it till we instrument it We have the ~~technology~~ schema, we can build the events. --  --- ## Let's just use scalacheck! ```scala import org.scalacheck.ScalacheckShapeless._ import org.scalacheck.{Arbitrary, Gen} private implicit val arbInstant: Arbitrary[Instant] = Arbitrary(Gen.calendar.map(_.toInstant)) val arbPlaybackEvt = implicitly[Arbitrary[PlaybackEvent]] ``` Now let's generate some events... -- ```scala List.fill(10)(arbPlaybackEvt.arbitrary.sample).flatten // res0: List[PlaybackEvent] = List( // Ended(0790-12-27T17:17:26.001Z, a26c2e5f-e937-4c29-b884-4770c8e6b2d6), // Ended(+128333713-01-01T00:00:06.820Z, 3177559b-0c6d-43bc-86b7-ff8fb5d76360), // Resumed(1970-01-01T00:00:00.001Z, 50b2bd0a-9fac-44d3-b745-9e81907dea7b), // Resumed(+202817544-02-25T09:56:13.340Z, d6f80a6a-0986-4a9a-8d12-6f402bdda826), // Paused(1496-04-09T06:19:21.324Z, 3f3eea27-7a26-4d74-b984-b8e03c398e6a), // Started(+292269054-01-01T13:34:10.329Z, 9288bc72-1e38-4670-bfb2-e6a4d2074d1b), // Started(1349-05-03T11:46:05.811Z, a4c1faf1-e3cf-44ff-989b-5355af817267), // Paused(4000-11-03T00:00:00Z, 4072c826-65fc-498f-80dc-12b5a7bec466), // Ended(3735-03-26T07:32:04.720Z, ed52e921-1097-4537-abd6-6a3bcd00a4b3), // Resumed(+292269054-12-31T18:42:44.308Z, c1d27125-1b4c-4932-a97d-fe3e09a8b044) // ) ``` 😱 not useful! ??? * timestamp is nonsense. * each event has it's own session. * not a representative lifecycle. --- ## Requirements * Sessions modelled. * Representative delays between events. * Configure the number of concurrent sessions. -- Maybe this can be modelled as streams? -- Step in FS2:  ```scala import cats.effect._ import fs2.Stream import scala.concurrent.ExecutionContext import scala.concurrent.duration._ ``` --- ## Model a playback scenario ```scala def session: Stream[IO, PlaybackEvent] = for { now <- Stream.emit(Instant.parse("2019-11-26T12:00:00Z")) sessionId <- Stream.eval(IO(UUID.randomUUID())) s <- Stream.emits(List( Started(now, sessionId), Ended(now.plusSeconds(60), sessionId) )) } yield s ``` -- ```scala session.compile.toList.unsafeRunSync() // res2: List[PlaybackEvent] = List( // Started(2019-11-26T12:00:00Z, 920a89ac-1c5c-465f-b7f7-85a4d98e6ef7), // Ended(2019-11-26T12:01:00Z, 920a89ac-1c5c-465f-b7f7-85a4d98e6ef7) // ) ``` --- ## Nap between events 💤💤💤 Useful to model the progress of time between emitting events for a session. -- ```scala def nowS: Stream[IO, Instant] = Stream.eval(timer.clock.realTime(MILLISECONDS).map(Instant.ofEpochMilli)) def idealSession: Stream[IO, PlaybackEvent] = for { sessionId <- Stream.eval(IO(UUID.randomUUID())) s <- nowS.map(Started(_, sessionId)) ++ nowS.map(Ended(_, sessionId)).delayBy(30.minutes) } yield s ``` -- ```scala idealSession .evalMap(event => IO(println(event))) .compile .drain .unsafeRunAsyncAndForget() // Started(2018-12-20T00:00:00Z,27be149c-8b6f-4153-91de-349721532588) testContext.tick(1.hour) // Ended(2018-12-20T00:30:00Z,27be149c-8b6f-4153-91de-349721532588) ``` --- ## A stream of sessions  ??? Par join to choose how many concurrent sessions to have. --- ## Concurrent sessions ```scala val numConcurrent = 3 Stream.emit(idealSession) .repeat .take(5) // limit to 5 for example .parJoin(numConcurrent) .evalMap(event => IO(println(event))) .compile .drain .unsafeRunAsyncAndForget() ``` -- ```scala testContext.tick(1.hour) // Started(2018-12-20T01:00:00Z,f8a54e64-9985-4cf3-9865-9b2b196ab1eb) // Started(2018-12-20T01:00:00Z,1f67e892-cc33-463b-a6ff-5562704a3967) // Started(2018-12-20T01:00:00Z,935c3b01-a28e-4b69-8c00-d4244b0e604a) // Ended(2018-12-20T01:30:00Z,1f67e892-cc33-463b-a6ff-5562704a3967) // Ended(2018-12-20T01:30:00Z,935c3b01-a28e-4b69-8c00-d4244b0e604a) // Ended(2018-12-20T01:30:00Z,f8a54e64-9985-4cf3-9865-9b2b196ab1eb) // Started(2018-12-20T01:30:00Z,9ac4dc36-352c-465b-a92a-961da19b2490) // Started(2018-12-20T01:30:00Z,43031daf-fc0a-4bfa-9835-a5584e957ace) // Ended(2018-12-20T02:00:00Z,43031daf-fc0a-4bfa-9835-a5584e957ace) // Ended(2018-12-20T02:00:00Z,9ac4dc36-352c-465b-a92a-961da19b2490) ``` ??? Sessions are repeated forever. --- ## Add another scenario! ```scala def pauseSession: Stream[IO, PlaybackEvent] = for { sessionId <- Stream.eval(IO(UUID.randomUUID())) s <- nowS.map(Started(_, sessionId)) ++ nowS.map(Paused(_, sessionId)).delayBy(20.seconds) ++ nowS.map(Resumed(_, sessionId)).delayBy(5.seconds) ++ nowS.map(Ended(_, sessionId)).delayBy(20.minutes) } yield s ``` --- ## Running multiple scenarios ```scala Stream.emits(List(idealSession, pauseSession)) .repeat .take(4) // limit to 4 for example .parJoin(numConcurrent) .evalMap(event => IO(println(event))) .compile .drain .unsafeRunAsyncAndForget() ``` -- ```scala testContext.tick(1.hour) // Started(2018-12-20T02:00:00Z,3a5a230e-dcb3-4b66-aa82-1586c3bc34af) // Started(2018-12-20T02:00:00Z,986272c1-90e1-4baa-8d8c-ecb81bee9a80) // Started(2018-12-20T02:00:00Z,23ce6c11-f27c-4816-8d75-47456dab7449) // Paused(2018-12-20T02:00:20Z,23ce6c11-f27c-4816-8d75-47456dab7449) // Resumed(2018-12-20T02:00:25Z,23ce6c11-f27c-4816-8d75-47456dab7449) // Ended(2018-12-20T02:20:25Z,23ce6c11-f27c-4816-8d75-47456dab7449) // Started(2018-12-20T02:20:25Z,2c7510e7-2c4b-420d-a7fc-59e50233c0ee) // Paused(2018-12-20T02:20:45Z,2c7510e7-2c4b-420d-a7fc-59e50233c0ee) // Resumed(2018-12-20T02:20:50Z,2c7510e7-2c4b-420d-a7fc-59e50233c0ee) // Ended(2018-12-20T02:30:00Z,3a5a230e-dcb3-4b66-aa82-1586c3bc34af) // Ended(2018-12-20T02:30:00Z,986272c1-90e1-4baa-8d8c-ecb81bee9a80) // Ended(2018-12-20T02:40:50Z,2c7510e7-2c4b-420d-a7fc-59e50233c0ee) ``` --- ## Testing Can use `cats.effect.laws.util.TestContext` to simulate time. ```scala val f = idealSession .compile .toList .unsafeToFuture() ``` -- ```scala f.value // res9: Option[scala.util.Try[List[PlaybackEvent]]] = None ``` -- ```scala testContext.tick(1.hour) f.value // res11: Option[scala.util.Try[List[PlaybackEvent]]] = Some( // Success( // List( // Started(2018-12-20T03:00:00Z, 3b6feeae-ff8d-4301-8e55-956e25bb90cd), // Ended(2018-12-20T03:30:00Z, 3b6feeae-ff8d-4301-8e55-956e25bb90cd) // ) // ) // ) ``` --- ## Counting sessions ```scala val f2 = Stream.emit(idealSession) .repeat .parJoin(1000) .interruptAfter(1.hour) .compile .toList // count sessions .map(_.groupBy(_.sessionId).keySet.size) .unsafeToFuture() ``` `idealSession` lasts 30 minutes, so after 1 hour there should be ~2000 sessions. -- ```scala testContext.tick(1.hour) f2.value // res13: Option[scala.util.Try[Int]] = Some(Success(2000)) ``` 👌 --- ## Further reading and links * [FS2](https://github.com/functional-streams-for-scala/fs2) by [@mpilquist](https://github.com/mpilquist) * [cats-effect](https://github.com/typelevel/cats-effect) ## Thanks! * [mdoc](https://github.com/scalameta/mdoc) used to evaluate scala examples ## Questions?