import stream from 'stream'
import Stream from '../Stream'
import {pull} from '../pull'
import {pushable} from '../sources/pushable'
import {abortable} from '../operators/abortable'
import {drain as forEach} from '../sinks/drain'

/**
 * @todo
 */
export const fromReadableStream = (reader, options = {}) => {
  const {
    objectMode = reader._readableState.objectMode,
    lowWaterMark = 0,
    highWaterMark = objectMode ? 64 : 64 * 1024,
    end = true,
    debugLabel
  } = options

  let error
  let ended = false
  const triggerEnd = (err, cb) => {
    if (!error && err) {
      error = err
      if (debugLabel) {
        console.log(`fromReadableStream(${debugLabel}) register error`)
      }
    }
    if (!ended) {
      if (debugLabel) {
        console.log(`fromReadableStream(${debugLabel}) schedule end`)
      }
      ended = true
      process.nextTick(() => {
        if (debugLabel) {
          console.log(`fromReadableStream(${debugLabel}) ending queue`)
        }
        queue.end(error)
        cb && cb()
      })
    } else {
      cb && cb()
    }
  }

  const cleanup = () => {
    if (debugLabel) {
      console.log(`fromReadableStream(${debugLabel}) cleanup`)
    }
    reader.removeListener('error', onError)
    if (end === true && typeof reader.destroy === 'function') {
      reader.destroy()
    } else {
      reader.unpipe(writer)
    }
  }
  const onError = (err) => {
    if (debugLabel) {
      console.log(`fromReadableStream(${debugLabel}) onError:`, err)
    }
    triggerEnd(err)
  }
  reader.on('error', onError)

  const queue = pushable({
    highWaterMark: 1,
    onEnd: cleanup
  })

  const writer = new stream.Writable({
    objectMode,
    highWaterMark,
    write (chunk, enc, cb) {
      let r
      try {
        r = queue.push(chunk)
      } catch (err) {
        onError(err)
        return cb(err)
      }
      if (r && typeof r.then === 'function') {
        r.then(cb)
      } else {
        cb()
      }
    },
    final (cb) {
      if (debugLabel) {
        console.log(`fromReadableStream(${debugLabel}) final`)
      }
      triggerEnd(null, cb)
    }
  })
  writer.on('error', onError)
  reader.pipe(writer)

  return queue[Stream.source]
}

/**
 * @todo
 */
export const fromWritableStream = (writer, options = {}) => {
  const {
    end = true,
    debugLabel
  } = options

  let error
  let inLoop = false
  let closed = false
  let ended = false
  const abortHandle = abortable()
  const cleanup = () => {
    if (!ended && !inLoop) {
      abortHandle.abort(error)
      ended = true
    }
    writer.removeListener('error', onError)
    writer.removeListener('close', onClose)
    writer.removeListener('end', onEnd)
  }
  const onError = (err) => {
    error = err
    if (debugLabel) {
      console.log(`fromWritableStream(${debugLabel}) onError:`, err)
    }
    cleanup()
  }
  const onClose = (err) => {
    closed = true
    if (debugLabel) {
      console.log(`fromWritableStream(${debugLabel}) onClose`)
    }
    cleanup()
  }
  const onEnd = (err) => {
    closed = true
    if (debugLabel) {
      console.log(`fromWritableStream(${debugLabel}) onEnd`)
    }
    cleanup()
  }
  writer.on('error', onError)
  writer.on('close', onClose)
  writer.on('end', onEnd)
  writer.setMaxListeners(128)

  return (source) => pull(
    source,
    abortHandle,
    forEach(
      (buffer, cb) => {
        if (closed) {
          return false
        }
        inLoop = true
        const drained = writer.write(buffer)
        inLoop = false
        if (error) {
          return cb(error)
        }
        if (drained) {
          cb()
        } else {
          writer.once('drain', cb)
        }
      },
      (buffer, cb) => {
        if (debugLabel) {
          console.log(`fromWritableStream(${debugLabel}) forEach end`)
        }
        inLoop = true
        if (end === true) {
          // TODO: Check if error can happen later and if we need an event listener for it here.
          if (closed) {
            cb(error)
          } else {
            writer.once('finish', () => {
              cb()
            })
            writer.end(buffer)
            if (error) {
              cb(error)
            }
          }
        } else if (buffer) {
          writer.write(buffer)
          cb(error)
        } else {
          cb(error)
        }
        inLoop = false
        ended = true
      }
    )
  )
}

/**
 * @todo
 */
export const fromStream = (nodeStream, options) => {
  if (typeof nodeStream.read === 'function' && typeof nodeStream.write === 'function') {
    let sink, source
    return {
      get [Stream.sink] () {
        if (typeof nodeStream.write !== 'function') {
          throw new Error('Node stream is not writable.')
        }
        if (!sink) {
          sink = fromWritableStream(nodeStream, options)
        }
        return sink
      },
      get [Stream.source] () {
        if (typeof nodeStream.read !== 'function') {
          throw new Error('Node stream is not readable.')
        }
        if (!source) {
          source = fromReadableStream(nodeStream, options)
        }
        return source
      }
    }
  } else if (typeof nodeStream.read === 'function') {
    return fromReadableStream(nodeStream, options)
  } else if (typeof nodeStream.write === 'function') {
    return fromWritableStream(nodeStream, options)
  } else {
    throw new Error('Node stream is neither readable or writable.')
  }
}
