index.js 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. var once = require('once')
  2. var eos = require('end-of-stream')
  3. var fs
  4. try {
  5. fs = require('fs') // we only need fs to get the ReadStream and WriteStream prototypes
  6. } catch (e) {}
  7. var noop = function () {}
  8. var ancient = /^v?\.0/.test(process.version)
  9. var isFn = function (fn) {
  10. return typeof fn === 'function'
  11. }
  12. var isFS = function (stream) {
  13. if (!ancient) return false // newer node version do not need to care about fs is a special way
  14. if (!fs) return false // browser
  15. return (stream instanceof (fs.ReadStream || noop) || stream instanceof (fs.WriteStream || noop)) && isFn(stream.close)
  16. }
  17. var isRequest = function (stream) {
  18. return stream.setHeader && isFn(stream.abort)
  19. }
  20. var destroyer = function (stream, reading, writing, callback) {
  21. callback = once(callback)
  22. var closed = false
  23. stream.on('close', function () {
  24. closed = true
  25. })
  26. eos(stream, {readable: reading, writable: writing}, function (err) {
  27. if (err) return callback(err)
  28. closed = true
  29. callback()
  30. })
  31. var destroyed = false
  32. return function (err) {
  33. if (closed) return
  34. if (destroyed) return
  35. destroyed = true
  36. if (isFS(stream)) return stream.close(noop) // use close for fs streams to avoid fd leaks
  37. if (isRequest(stream)) return stream.abort() // request.destroy just do .end - .abort is what we want
  38. if (isFn(stream.destroy)) return stream.destroy()
  39. callback(err || new Error('stream was destroyed'))
  40. }
  41. }
  42. var call = function (fn) {
  43. fn()
  44. }
  45. var pipe = function (from, to) {
  46. return from.pipe(to)
  47. }
  48. var pump = function () {
  49. var streams = Array.prototype.slice.call(arguments)
  50. var callback = isFn(streams[streams.length - 1] || noop) && streams.pop() || noop
  51. if (Array.isArray(streams[0])) streams = streams[0]
  52. if (streams.length < 2) throw new Error('pump requires two streams per minimum')
  53. var error
  54. var destroys = streams.map(function (stream, i) {
  55. var reading = i < streams.length - 1
  56. var writing = i > 0
  57. return destroyer(stream, reading, writing, function (err) {
  58. if (!error) error = err
  59. if (err) destroys.forEach(call)
  60. if (reading) return
  61. destroys.forEach(call)
  62. callback(error)
  63. })
  64. })
  65. return streams.reduce(pipe)
  66. }
  67. module.exports = pump