package com.nikolastojiljkovic.algot.client.airstream

import com.raquo.airstream.common.{InternalTryObserver, SingleParentObservable}
import com.raquo.airstream.core.{EventStream, Signal, Transaction, WritableEventStream}
import com.raquo.airstream.ownership.ManualOwner
import com.raquo.airstream.state.StrictSignal

import scala.scalajs.js
import scala.util.Try
import scala.collection.mutable

class QueuedEventStream[A]
(
  override protected[this] val parent: EventStream[A],
  availability: StrictSignal[Boolean],
) extends WritableEventStream[A] with SingleParentObservable[A, A] with InternalTryObserver[A] {

  private[this] given internalOwner: ManualOwner = new ManualOwner
  private[this] var prevAvailable = availability.now()
  private[this] val queue = mutable.Queue.empty[Try[A]]

  override protected val topoRank: Int = 1

  override protected def onTry(nextValue: Try[A], transaction: Transaction): Unit = {
    if (prevAvailable) {
      new Transaction(fireTry(nextValue, _))
    } else {
      queue.enqueue(nextValue)
    }
  }

  override protected[this] def onStart(): Unit = {
    availability.foreach(available => {
      if (!prevAvailable && available && queue.nonEmpty) {
        // we just changed to being available, fire the next element from parent
        new Transaction(fireTry(Try(queue.dequeue()).flatten, _))
      }
      prevAvailable = available
    })
    super.onStart()
  }

  override protected[this] def onStop(): Unit = {
    internalOwner.killSubscriptions()
    super.onStop()
  }
}
