class: center, middle, inverse # Fake it till you make it with FS2 James Collier @JRCCollier ![FS2 Logo](./assets/fs2-logo.png) --- ## Playback Session Telemetry events will be emitted from devices. ```scala import java.time.Instant import java.util.UUID sealed trait PlaybackEvent { def sessionId: UUID } final case class Started(timestamp: Instant, sessionId: UUID) extends PlaybackEvent final case class Paused(timestamp: Instant, sessionId: UUID) extends PlaybackEvent final case class Resumed(timestamp: Instant, sessionId: UUID) extends PlaybackEvent final case class Ended(timestamp: Instant, sessionId: UUID) extends PlaybackEvent ``` --- ## 1000ft view ![device to cloud](./assets/device-to-cloud.png) --- class: inverse ## The catch 😱 it's going to be a while till devices are instrumented! -- The show must go on! -- Need to develop while application teams are instrumenting. --- ## Fake it till we instrument it We have the ~~technology~~ schema, we can build the events. -- ![gen to cloud](./assets/gen-to-cloud.png) --- ## Let's just use scalacheck! ```scala import org.scalacheck.ScalacheckShapeless._ import org.scalacheck.{Arbitrary, Gen} private implicit val arbInstant: Arbitrary[Instant] = Arbitrary(Gen.calendar.map(_.toInstant)) val arbPlaybackEvt = implicitly[Arbitrary[PlaybackEvent]] ``` Now let's generate some events... -- ```scala List.fill(10)(arbPlaybackEvt.arbitrary.sample).flatten // res0: List[PlaybackEvent] = List( // Paused(+161162656-01-01T05:00:00Z, 9e233247-2469-4d66-979c-98303c66a5f4), // Started(+292269054-12-31T20:02:04.932Z, 986d31d3-7820-4120-9239-8f717e32fdaa), // Resumed(0464-09-23T23:59:59.059Z, 6faf7ee4-2cdc-4562-a0d7-39336e83bda4), // Resumed(0001-12-29T00:00:00Z, 2fc8b640-3986-4c1f-8b26-6414a2910e04), // Paused(1160-09-01T07:13:50.648Z, 2f1d8849-d817-4316-8eb4-ff17c99c3d22), // Paused(0000-12-30T08:52:43.593Z, bb07c271-bdd6-48ae-b2f3-62b2ae1e7298), // Resumed(1970-01-01T00:00:00Z, 10489302-845a-4856-91dd-ff5e3f33369d), // Paused(1969-12-31T23:59:59.999Z, d599d11a-9f69-4c99-92cf-becb27e4f932), // Paused(1970-01-01T23:59:59.059Z, e45e1570-c463-4d37-9f18-221dd058a9c3), // Resumed(1970-01-01T00:00:00.001Z, 49f08d84-7925-4b0c-9c61-2a4cb535d86c) // ) ``` 😱 not useful! ??? * timestamp is nonsense. * each event has it's own session. * not a representative lifecycle. --- ## Requirements * Sessions modelled. * Representative delays between events. * Configure the number of concurrent sessions. -- Maybe this can be modelled as streams? -- Step in FS2: ![FS2 Logo](./assets/fs2-logo.png) ```scala import cats.effect._ import fs2.Stream import scala.concurrent.ExecutionContext import scala.concurrent.duration._ ``` --- ## Model a playback scenario ```scala def session: Stream[IO, PlaybackEvent] = for { now <- Stream.emit(Instant.parse("2019-11-26T12:00:00Z")) sessionId <- Stream.eval(IO(UUID.randomUUID())) s <- Stream.emits(List( Started(now, sessionId), Ended(now.plusSeconds(60), sessionId) )) } yield s ``` -- ```scala session.compile.toList.unsafeRunSync() // res2: List[PlaybackEvent] = List( // Started(2019-11-26T12:00:00Z, e6714efa-73e2-4e9c-ae26-fc421baaf1e9), // Ended(2019-11-26T12:01:00Z, e6714efa-73e2-4e9c-ae26-fc421baaf1e9) // ) ``` --- ## Nap between events 💤💤💤 Useful to model the progress of time between emitting events for a session. -- ```scala def nowS: Stream[IO, Instant] = Stream.eval(timer.clock.realTime(MILLISECONDS).map(Instant.ofEpochMilli)) def idealSession: Stream[IO, PlaybackEvent] = for { sessionId <- Stream.eval(IO(UUID.randomUUID())) s <- nowS.map(Started(_, sessionId)) ++ nowS.map(Ended(_, sessionId)).delayBy(30.minutes) } yield s ``` -- ```scala idealSession .evalMap(event => IO(println(event))) .compile .drain .unsafeRunAsyncAndForget() // Started(2018-12-20T00:00:00Z,ff5bcae2-9d0b-4fef-b5f6-3efbd178ea32) testContext.tick(1.hour) // Ended(2018-12-20T00:30:00Z,ff5bcae2-9d0b-4fef-b5f6-3efbd178ea32) ``` --- ## A stream of sessions ![Session streams](./assets/session-streams.png) ??? Par join to choose how many concurrent sessions to have. --- ## Concurrent sessions ```scala val numConcurrent = 3 Stream.emit(idealSession) .repeat .take(5) // limit to 5 for example .parJoin(numConcurrent) .evalMap(event => IO(println(event))) .compile .drain .unsafeRunAsyncAndForget() ``` -- ```scala testContext.tick(1.hour) // Started(2018-12-20T01:00:00Z,160beb7c-344a-44e6-835a-591034f38091) // Started(2018-12-20T01:00:00Z,ef29f449-328e-4a4e-9e0d-02162beb9b49) // Started(2018-12-20T01:00:00Z,ffcb34a6-6ab1-4619-b8c4-c116f2d63f23) // Ended(2018-12-20T01:30:00Z,ffcb34a6-6ab1-4619-b8c4-c116f2d63f23) // Ended(2018-12-20T01:30:00Z,ef29f449-328e-4a4e-9e0d-02162beb9b49) // Ended(2018-12-20T01:30:00Z,160beb7c-344a-44e6-835a-591034f38091) // Started(2018-12-20T01:30:00Z,c56587aa-185f-4041-8b8a-9a1aae127cb4) // Started(2018-12-20T01:30:00Z,911c6064-7823-4a4a-91e2-21396a82fd04) // Ended(2018-12-20T02:00:00Z,c56587aa-185f-4041-8b8a-9a1aae127cb4) // Ended(2018-12-20T02:00:00Z,911c6064-7823-4a4a-91e2-21396a82fd04) ``` ??? Sessions are repeated forever. --- ## Add another scenario! ```scala def pauseSession: Stream[IO, PlaybackEvent] = for { sessionId <- Stream.eval(IO(UUID.randomUUID())) s <- nowS.map(Started(_, sessionId)) ++ nowS.map(Paused(_, sessionId)).delayBy(20.seconds) ++ nowS.map(Resumed(_, sessionId)).delayBy(5.seconds) ++ nowS.map(Ended(_, sessionId)).delayBy(20.minutes) } yield s ``` --- ## Running multiple scenarios ```scala Stream.emits(List(idealSession, pauseSession)) .repeat .take(4) // limit to 4 for example .parJoin(numConcurrent) .evalMap(event => IO(println(event))) .compile .drain .unsafeRunAsyncAndForget() ``` -- ```scala testContext.tick(1.hour) // Started(2018-12-20T02:00:00Z,ce8517c2-a2da-424c-be0e-f4e36bd83fff) // Started(2018-12-20T02:00:00Z,036e1102-e7a4-4715-a4d8-feceb43848ce) // Started(2018-12-20T02:00:00Z,b7c22310-b479-4578-a8d8-fcf99b97a4cf) // Paused(2018-12-20T02:00:20Z,036e1102-e7a4-4715-a4d8-feceb43848ce) // Resumed(2018-12-20T02:00:25Z,036e1102-e7a4-4715-a4d8-feceb43848ce) // Ended(2018-12-20T02:20:25Z,036e1102-e7a4-4715-a4d8-feceb43848ce) // Started(2018-12-20T02:20:25Z,63e3f38b-b799-4cc0-8872-5e7e5c0eb54e) // Paused(2018-12-20T02:20:45Z,63e3f38b-b799-4cc0-8872-5e7e5c0eb54e) // Resumed(2018-12-20T02:20:50Z,63e3f38b-b799-4cc0-8872-5e7e5c0eb54e) // Ended(2018-12-20T02:30:00Z,ce8517c2-a2da-424c-be0e-f4e36bd83fff) // Ended(2018-12-20T02:30:00Z,b7c22310-b479-4578-a8d8-fcf99b97a4cf) // Ended(2018-12-20T02:40:50Z,63e3f38b-b799-4cc0-8872-5e7e5c0eb54e) ``` --- ## Testing Can use `cats.effect.laws.util.TestContext` to simulate time. ```scala val f = idealSession .compile .toList .unsafeToFuture() ``` -- ```scala f.value // res9: Option[scala.util.Try[List[PlaybackEvent]]] = None ``` -- ```scala testContext.tick(1.hour) f.value // res11: Option[scala.util.Try[List[PlaybackEvent]]] = Some( // Success( // List( // Started(2018-12-20T03:00:00Z, ce624b0f-56b6-49ea-b83a-4da8726ca076), // Ended(2018-12-20T03:30:00Z, ce624b0f-56b6-49ea-b83a-4da8726ca076) // ) // ) // ) ``` --- ## Counting sessions ```scala val f2 = Stream.emit(idealSession) .repeat .parJoin(1000) .interruptAfter(1.hour) .compile .toList // count sessions .map(_.groupBy(_.sessionId).keySet.size) .unsafeToFuture() ``` `idealSession` lasts 30 minutes, so after 1 hour there should be ~2000 sessions. -- ```scala testContext.tick(1.hour) f2.value // res13: Option[scala.util.Try[Int]] = Some(Success(2000)) ``` 👌 --- ## Further reading and links * [FS2](https://github.com/functional-streams-for-scala/fs2) by [@mpilquist](https://github.com/mpilquist) * [cats-effect](https://github.com/typelevel/cats-effect) ## Thanks! * [mdoc](https://github.com/scalameta/mdoc) used to evaluate scala examples ## Questions?