import Stream from '../Stream'

const fixedBatch = ({size} = {}) => (source) => (start, sink) => {
  if (start !== 0) {
    return
  }
  let talkback
  let current = new Array(size)
  let length = 0
  source(0, (t, d) => {
    if (t === 0) {
      talkback = d
      sink(0, d)
    } else if (t === 1) {
      current[length++] = d
      if (length === size) {
        const b = current
        current = new Array(size)
        length = 0
        sink(1, b)
      } else {
        talkback(1)
      }
    } else if (t === 2) {
      if (!d && length > 0) {
        sink(1, current.slice(0, length))
      }
      sink(2, d)
    } else {
      sink(t, d)
    }
  })
}

const asapBatch = ({limit} = {}) => (source) => (start, sink) => {
  if (start !== 0) {
    return
  }
  let sourceTalkback
  let moreRequested = false
  let waiting = false
  let ended = false
  let current = []

  const talkback = (t,d) => {
    if (t === 1) {
      // Don't request more if already requested by us.
      if (!moreRequested) {
        sourceTalkback(1)
      }
    } else if (t === 2) {
      ended = true
      sourceTalkback(2, d)
    } else {
      sourceTalkback(t, d)
    }
  }

  source(0, (t, d) => {
    if (t === 0) {
      sourceTalkback = d
      sink(0, talkback)
    } else if (t === 1) {
      moreRequested = false
      current.push(d)
      if (current.length >= limit) {
        const b = current
        current = []
        sink(1, b)
      } else {
        if (!waiting) {
          waiting = true
          process.nextTick(() => {
            waiting = false
            if (!ended && current.length > 0) {
              const b = current
              current = []
              sink(1, b)
            }
          })
        }
        moreRequested = true
        sourceTalkback(1)
      }
    } else if (t === 2) {
      ended = true
      if (!d && current.length > 0) {
        sink(1, current)
      }
      sink(2, d)
    } else {
      sink(t, d)
    }
  })
}

/**
 * @todo
 */
export const batch = ({size, limit = Infinity} = {}) => {
  if (typeof size === 'number') {
    return fixedBatch({size})
  } else {
    return asapBatch({limit})
  }
}
