import Deque from 'double-ended-queue'
import EventEmitter from 'events'
import Stream from '../Stream'
import {noop} from '../util'
import {share} from '../operators/share'

/**
 * @todo
 */
export const pushable = ({highWaterMark = -1, onEnd = noop} = {}) => {
  const queue = new Deque()
  const ee = new EventEmitter()
  ee.setMaxListeners(0)

  let listenerTalkback
  let inLoop = false
  let got1 = false
  let waiting = false
  let ended = false

  const loop = () => {
    inLoop = true
    while (got1 && (!ended || queue.length > 0)) {
      got1 = false
      if (queue.length > 0) {
        waiting = false
        listenerTalkback(1, queue.shift())
        ee.emit('shift')
      } else if (ended) {
        if (listenerTalkback) {
          listenerTalkback(2)
        }
      } else {
        waiting = true
      }
    }
    inLoop = false
  }

  const source = share()((t, d) => {
    if (t !== 0) {
      return
    }
    listenerTalkback = d
    listenerTalkback(0, (st, sd) => {
      if (st === 1) {
        got1 = true
        if (!inLoop && (!ended || queue.length > 0)) {
          loop()
        }
      } else if (st === 2) {
        ended = true
        listenerTalkback = undefined
        queue.clear()
        onEnd()
      }
    })
  })

  const push = (value) => {
    if (ended) {
      return false
    }
    if (waiting) {
      waiting = false
      if (listenerTalkback) {
        listenerTalkback(1, value)
      }
    } else {
      queue.push(value)
    }

    // Return a promise if queue is at the high water mark. Resolve as soon as there is room again
    // in the queue by observing when an item is removed from it.
    if (highWaterMark > 0 && queue.length >= highWaterMark) {
      return new Promise(resolve => {
        const check = () => {
          if (queue.length < highWaterMark) {
            ee.removeListener('shift', check)
            resolve()
          }
        }
        ee.on('shift', check)
      })
    } else {
      return true
    }
  }

  const end = (err) => {
    if (ended) {
      return
    }
    if (queue.length === 0 || err) {
      if (listenerTalkback) {
        listenerTalkback(2, err)
        listenerTalkback = undefined
      }
      onEnd()
    }
    ended = true
  }

  return {
    [Stream.source]: source,
    push,
    end,
    get length () {
      return queue.length
    },
    get ended () {
      return ended && queue.length === 0
    }
  }
}
