//filter will reemit the data if cb(err,pass) pass is truthy

// reduce is more tricky
// maybe we want to group the reductions or emit progress updates occasionally
// the most basic reduce just emits one 'data' event after it has recieved 'end'

var Stream = require('stream').Stream
  , es = exports
  , through = require('through')
  , from = require('from')
  , duplex = require('duplexer')
  , map = require('map-stream')
  , pause = require('pause-stream')
  , split = require('split')
  , pipeline = require('stream-combiner')
  , immediately = global.setImmediate || process.nextTick;

es.Stream = Stream //re-export Stream from core
es.through = through
es.from = from
es.duplex = duplex
es.map = map
es.pause = pause
es.split = split
es.pipeline = es.connect = es.pipe = pipeline
// merge / concat
//
// combine multiple streams into a single stream.
// will emit end only once

es.concat = //actually this should be called concat
es.merge = function (/*streams...*/) {
  var toMerge = [].slice.call(arguments)
  if (toMerge.length === 1 && (toMerge[0] instanceof Array)) {
    toMerge = toMerge[0] //handle array as arguments object
  }
  var stream = new Stream()
  stream.setMaxListeners(0) // allow adding more than 11 streams
  var endCount = 0
  stream.writable = stream.readable = true

  if (toMerge.length) {
    toMerge.forEach(function (e) {
      e.pipe(stream, {end: false})
      var ended = false
      e.on('end', function () {
        if(ended) return
        ended = true
        endCount ++
        if(endCount == toMerge.length)
          stream.emit('end')
      })
    })
  } else {
    process.nextTick(function () {
      stream.emit('end')
    })
  }
  
  stream.write = function (data) {
    this.emit('data', data)
  }
  stream.destroy = function () {
    toMerge.forEach(function (e) {
      if(e.destroy) e.destroy()
    })
  }
  return stream
}


// writable stream, collects all events into an array
// and calls back when 'end' occurs
// mainly I'm using this to test the other functions

es.writeArray = function (done) {
  if ('function' !== typeof done)
    throw new Error('function writeArray (done): done must be function')

  var a = new Stream ()
    , array = [], isDone = false
  a.write = function (l) {
    array.push(l)
  }
  a.end = function () {
    isDone = true
    done(null, array)
  }
  a.writable = true
  a.readable = false
  a.destroy = function () {
    a.writable = a.readable = false
    if(isDone) return
    done(new Error('destroyed before end'), array)
  }
  return a
}

//return a Stream that reads the properties of an object
//respecting pause() and resume()

es.readArray = function (array) {
  var stream = new Stream()
    , i = 0
    , paused = false
    , ended = false

  stream.readable = true
  stream.writable = false

  if(!Array.isArray(array))
    throw new Error('event-stream.read expects an array')

  stream.resume = function () {
    if(ended) return
    paused = false
    var l = array.length
    while(i < l && !paused && !ended) {
      stream.emit('data', array[i++])
    }
    if(i == l && !ended)
      ended = true, stream.readable = false, stream.emit('end')
  }
  process.nextTick(stream.resume)
  stream.pause = function () {
     paused = true
  }
  stream.destroy = function () {
    ended = true
    stream.emit('close')
  }
  return stream
}

//
// readable (asyncFunction)
// return a stream that calls an async function while the stream is not paused.
//
// the function must take: (count, callback) {...
//

es.readable =
function (func, continueOnError) {
  var stream = new Stream()
    , i = 0
    , paused = false
    , ended = false
    , reading = false

  stream.readable = true
  stream.writable = false

  if('function' !== typeof func)
    throw new Error('event-stream.readable expects async function')

  stream.on('end', function () { ended = true })

  function get (err, data) {

    if(err) {
      stream.emit('error', err)
      if(!continueOnError) stream.emit('end')
    } else if (arguments.length > 1)
      stream.emit('data', data)

    immediately(function () {
      if(ended || paused || reading) return
      try {
        reading = true
        func.call(stream, i++, function () {
          reading = false
          get.apply(null, arguments)
        })
      } catch (err) {
        stream.emit('error', err)
      }
    })
  }
  stream.resume = function () {
    paused = false
    get()
  }
  process.nextTick(get)
  stream.pause = function () {
     paused = true
  }
  stream.destroy = function () {
    stream.emit('end')
    stream.emit('close')
    ended = true
  }
  return stream
}


//
// map sync
//

es.mapSync = function (sync) {
  return es.through(function write(data) {
    var mappedData
    try {
      mappedData = sync(data)
    } catch (err) {
      return this.emit('error', err)
    }
    if (mappedData !== undefined)
      this.emit('data', mappedData)
  })
}

//
// log just print out what is coming through the stream, for debugging
//

es.log = function (name) {
  return es.through(function (data) {
    var args = [].slice.call(arguments)
    if(name) console.error(name, data)
    else     console.error(data)
    this.emit('data', data)
  })
}


//
// child -- pipe through a child process
//

es.child = function (child) {

  return es.duplex(child.stdin, child.stdout)

}

//
// parse
//
// must be used after es.split() to ensure that each chunk represents a line
// source.pipe(es.split()).pipe(es.parse())

es.parse = function (options) {
  var emitError = !!(options ? options.error : false)
  return es.through(function (data) {
    var obj
    try {
      if(data) //ignore empty lines
        obj = JSON.parse(data.toString())
    } catch (err) {
      if (emitError)
        return this.emit('error', err)
      return console.error(err, 'attempting to parse:', data)
    }
    //ignore lines that where only whitespace.
    if(obj !== undefined)
      this.emit('data', obj)
  })
}
//
// stringify
//

es.stringify = function () {
  var Buffer = require('buffer').Buffer
  return es.mapSync(function (e){
    return JSON.stringify(Buffer.isBuffer(e) ? e.toString() : e) + '\n'
  })
}

//
// replace a string within a stream.
//
// warn: just concatenates the string and then does str.split().join().
// probably not optimal.
// for smallish responses, who cares?
// I need this for shadow-npm so it's only relatively small json files.

es.replace = function (from, to) {
  return es.pipeline(es.split(from), es.join(to))
}

//
// join chunks with a joiner. just like Array#join
// also accepts a callback that is passed the chunks appended together
// this is still supported for legacy reasons.
//

es.join = function (str) {

  //legacy api
  if('function' === typeof str)
    return es.wait(str)

  var first = true
  return es.through(function (data) {
    if(!first)
      this.emit('data', str)
    first = false
    this.emit('data', data)
    return true
  })
}


//
// wait. callback when 'end' is emitted, with all chunks appended as string.
//

es.wait = function (callback) {
  var arr = []
  return es.through(function (data) { arr.push(data) },
    function () {
      var body = Buffer.isBuffer(arr[0]) ? Buffer.concat(arr)
        : arr.join('')
      this.emit('data', body)
      this.emit('end')
      if(callback) callback(null, body)
    })
}

es.pipeable = function () {
  throw new Error('[EVENT-STREAM] es.pipeable is deprecated')
}