class: center, middle, inverse # Fake it till you make it with FS2 James Collier @JRCCollier  --- ## Playback Session Telemetry events will be emitted from devices. ```scala import java.time.Instant import java.util.UUID sealed trait PlaybackEvent { def sessionId: UUID } final case class Started(timestamp: Instant, sessionId: UUID) extends PlaybackEvent final case class Paused(timestamp: Instant, sessionId: UUID) extends PlaybackEvent final case class Resumed(timestamp: Instant, sessionId: UUID) extends PlaybackEvent final case class Ended(timestamp: Instant, sessionId: UUID) extends PlaybackEvent ``` --- ## 1000ft view  --- class: inverse ## The catch 😱 it's going to be a while till devices are instrumented! -- The show must go on! -- Need to develop while application teams are instrumenting. --- ## Fake it till we instrument it We have the ~~technology~~ schema, we can build the events. --  --- ## Let's just use scalacheck! ```scala import org.scalacheck.ScalacheckShapeless._ import org.scalacheck.{Arbitrary, Gen} private implicit val arbInstant: Arbitrary[Instant] = Arbitrary(Gen.calendar.map(_.toInstant)) val arbPlaybackEvt = implicitly[Arbitrary[PlaybackEvent]] ``` Now let's generate some events... -- ```scala List.fill(10)(arbPlaybackEvt.arbitrary.sample).flatten // res0: List[PlaybackEvent] = List( // Resumed(+126277065-12-31T13:44:47.070Z, 4a4eeb1d-9a63-4ae7-8155-d36232cf74d1), // Started(2645-04-11T18:10:53.400Z, c4bcbd33-316b-4b49-8383-8aade7cb1a36), // Paused(1689-01-16T23:59:59.059Z, 32d2bcc1-08e1-48a6-847d-fbdda2c8630c), // Ended(+292269054-01-01T09:48:33.156Z, ef603ae0-6673-42ec-a23f-745a3a8c8f31), // Resumed(+117080944-05-01T06:57:10.672Z, 5f56377a-46c9-479d-87cd-5976b6552289), // Started(1389-03-18T18:28:07.055Z, 1165c2c1-3e6c-4f93-976b-537a50024d64), // Started(1050-04-07T17:36:58.706Z, 6a99ef6a-3cbc-4ebe-a1f6-df4ef015e4c5), // Started(4000-11-03T20:17:29.116Z, ff10e7d0-4b35-4e16-a4a7-8002cb83327b), // Started(0487-06-16T06:15:34.635Z, 55605a8d-0a96-440d-9659-50bac163b50a), // Ended(1273-06-26T23:59:59.059Z, 816239b2-8478-4c3a-90c5-4a94b4191a87) // ) ``` 😱 not useful! ??? * timestamp is nonsense. * each event has it's own session. * not a representative lifecycle. --- ## Requirements * Sessions modelled. * Representative delays between events. * Configure the number of concurrent sessions. -- Maybe this can be modelled as streams? -- Step in FS2:  ```scala import cats.effect._ import fs2.Stream import scala.concurrent.ExecutionContext import scala.concurrent.duration._ ``` --- ## Model a playback scenario ```scala def session: Stream[IO, PlaybackEvent] = for { now <- Stream.emit(Instant.parse("2019-11-26T12:00:00Z")) sessionId <- Stream.eval(IO(UUID.randomUUID())) s <- Stream.emits(List( Started(now, sessionId), Ended(now.plusSeconds(60), sessionId) )) } yield s ``` -- ```scala session.compile.toList.unsafeRunSync() // res2: List[PlaybackEvent] = List( // Started(2019-11-26T12:00:00Z, 58ec5fd1-a340-4f55-91b3-1245c4347174), // Ended(2019-11-26T12:01:00Z, 58ec5fd1-a340-4f55-91b3-1245c4347174) // ) ``` --- ## Nap between events 💤💤💤 Useful to model the progress of time between emitting events for a session. -- ```scala def nowS: Stream[IO, Instant] = Stream.eval(timer.clock.realTime(MILLISECONDS).map(Instant.ofEpochMilli)) def idealSession: Stream[IO, PlaybackEvent] = for { sessionId <- Stream.eval(IO(UUID.randomUUID())) s <- nowS.map(Started(_, sessionId)) ++ nowS.map(Ended(_, sessionId)).delayBy(30.minutes) } yield s ``` -- ```scala idealSession .evalMap(event => IO(println(event))) .compile .drain .unsafeRunAsyncAndForget() // Started(2018-12-20T00:00:00Z,52b1827f-e086-4918-8012-d8c13f7a11ce) testContext.tick(1.hour) // Ended(2018-12-20T00:30:00Z,52b1827f-e086-4918-8012-d8c13f7a11ce) ``` --- ## A stream of sessions  ??? Par join to choose how many concurrent sessions to have. --- ## Concurrent sessions ```scala val numConcurrent = 3 Stream.emit(idealSession) .repeat .take(5) // limit to 5 for example .parJoin(numConcurrent) .evalMap(event => IO(println(event))) .compile .drain .unsafeRunAsyncAndForget() ``` -- ```scala testContext.tick(1.hour) // Started(2018-12-20T01:00:00Z,546051d6-03ee-4ca1-a8dc-d32a8f187c0b) // Started(2018-12-20T01:00:00Z,36ad5f8e-5f18-44c1-8935-70bec227b117) // Started(2018-12-20T01:00:00Z,38c20b28-d5f8-4b66-8040-6215f461b7db) // Ended(2018-12-20T01:30:00Z,38c20b28-d5f8-4b66-8040-6215f461b7db) // Ended(2018-12-20T01:30:00Z,36ad5f8e-5f18-44c1-8935-70bec227b117) // Ended(2018-12-20T01:30:00Z,546051d6-03ee-4ca1-a8dc-d32a8f187c0b) // Started(2018-12-20T01:30:00Z,476dc9db-67bf-4cfd-8c73-fcfe6a675bf1) // Started(2018-12-20T01:30:00Z,124267b2-a700-46cd-acb5-0e3a248a0ff4) // Ended(2018-12-20T02:00:00Z,476dc9db-67bf-4cfd-8c73-fcfe6a675bf1) // Ended(2018-12-20T02:00:00Z,124267b2-a700-46cd-acb5-0e3a248a0ff4) ``` ??? Sessions are repeated forever. --- ## Add another scenario! ```scala def pauseSession: Stream[IO, PlaybackEvent] = for { sessionId <- Stream.eval(IO(UUID.randomUUID())) s <- nowS.map(Started(_, sessionId)) ++ nowS.map(Paused(_, sessionId)).delayBy(20.seconds) ++ nowS.map(Resumed(_, sessionId)).delayBy(5.seconds) ++ nowS.map(Ended(_, sessionId)).delayBy(20.minutes) } yield s ``` --- ## Running multiple scenarios ```scala Stream.emits(List(idealSession, pauseSession)) .repeat .take(4) // limit to 4 for example .parJoin(numConcurrent) .evalMap(event => IO(println(event))) .compile .drain .unsafeRunAsyncAndForget() ``` -- ```scala testContext.tick(1.hour) // Started(2018-12-20T02:00:00Z,f5d39ef3-87cf-421d-87e9-02c20bc0fb6c) // Started(2018-12-20T02:00:00Z,35b4f438-a3ac-4f00-8e52-6bedf5e83450) // Started(2018-12-20T02:00:00Z,5ea62418-53f1-4625-afe0-962f79735efc) // Paused(2018-12-20T02:00:20Z,35b4f438-a3ac-4f00-8e52-6bedf5e83450) // Resumed(2018-12-20T02:00:25Z,35b4f438-a3ac-4f00-8e52-6bedf5e83450) // Ended(2018-12-20T02:20:25Z,35b4f438-a3ac-4f00-8e52-6bedf5e83450) // Started(2018-12-20T02:20:25Z,e5f625e9-91b4-4222-8ef1-e74d02618ceb) // Paused(2018-12-20T02:20:45Z,e5f625e9-91b4-4222-8ef1-e74d02618ceb) // Resumed(2018-12-20T02:20:50Z,e5f625e9-91b4-4222-8ef1-e74d02618ceb) // Ended(2018-12-20T02:30:00Z,f5d39ef3-87cf-421d-87e9-02c20bc0fb6c) // Ended(2018-12-20T02:30:00Z,5ea62418-53f1-4625-afe0-962f79735efc) // Ended(2018-12-20T02:40:50Z,e5f625e9-91b4-4222-8ef1-e74d02618ceb) ``` --- ## Testing Can use `cats.effect.laws.util.TestContext` to simulate time. ```scala val f = idealSession .compile .toList .unsafeToFuture() ``` -- ```scala f.value // res9: Option[scala.util.Try[List[PlaybackEvent]]] = None ``` -- ```scala testContext.tick(1.hour) f.value // res11: Option[scala.util.Try[List[PlaybackEvent]]] = Some( // Success( // List( // Started(2018-12-20T03:00:00Z, 67434a2f-1cf0-4702-a947-e2b81ef60587), // Ended(2018-12-20T03:30:00Z, 67434a2f-1cf0-4702-a947-e2b81ef60587) // ) // ) // ) ``` --- ## Counting sessions ```scala val f2 = Stream.emit(idealSession) .repeat .parJoin(1000) .interruptAfter(1.hour) .compile .toList // count sessions .map(_.groupBy(_.sessionId).keySet.size) .unsafeToFuture() ``` `idealSession` lasts 30 minutes, so after 1 hour there should be ~2000 sessions. -- ```scala testContext.tick(1.hour) f2.value // res13: Option[scala.util.Try[Int]] = Some(Success(2000)) ``` 👌 --- ## Further reading and links * [FS2](https://github.com/functional-streams-for-scala/fs2) by [@mpilquist](https://github.com/mpilquist) * [cats-effect](https://github.com/typelevel/cats-effect) ## Thanks! * [mdoc](https://github.com/scalameta/mdoc) used to evaluate scala examples ## Questions?