hose.js 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. 'use strict';
  2. var util = require('util');
  3. var EventEmitter = require('events').EventEmitter;
  4. function Hose(socket, options, filter) {
  5. EventEmitter.call(this);
  6. if (typeof options === 'function') {
  7. filter = options;
  8. options = {};
  9. }
  10. this.socket = socket;
  11. this.options = options;
  12. this.filter = filter;
  13. this.buffer = null;
  14. var self = this;
  15. this.listeners = {
  16. error: function(err) {
  17. return self.onError(err);
  18. },
  19. data: function(chunk) {
  20. return self.onData(chunk);
  21. },
  22. end: function() {
  23. return self.onEnd();
  24. }
  25. };
  26. this.socket.on('error', this.listeners.error);
  27. this.socket.on('data', this.listeners.data);
  28. this.socket.on('end', this.listeners.end);
  29. }
  30. util.inherits(Hose, EventEmitter);
  31. module.exports = Hose;
  32. Hose.create = function create(socket, options, filter) {
  33. return new Hose(socket, options, filter);
  34. };
  35. Hose.prototype.detach = function detach() {
  36. // Stop the flow
  37. this.socket.pause();
  38. this.socket.removeListener('error', this.listeners.error);
  39. this.socket.removeListener('data', this.listeners.data);
  40. this.socket.removeListener('end', this.listeners.end);
  41. };
  42. Hose.prototype.reemit = function reemit() {
  43. var buffer = this.buffer;
  44. this.buffer = null;
  45. // Modern age
  46. if (this.socket.unshift) {
  47. this.socket.unshift(buffer);
  48. if (this.socket.listeners('data').length > 0)
  49. this.socket.resume();
  50. return;
  51. }
  52. // Rusty node v0.8
  53. if (this.socket.ondata)
  54. this.socket.ondata(buffer, 0, buffer.length);
  55. this.socket.emit('data', buffer);
  56. this.socket.resume();
  57. };
  58. Hose.prototype.onError = function onError(err) {
  59. this.detach();
  60. this.emit('error', err);
  61. };
  62. Hose.prototype.onData = function onData(chunk) {
  63. if (this.buffer)
  64. this.buffer = Buffer.concat([ this.buffer, chunk ]);
  65. else
  66. this.buffer = chunk;
  67. var self = this;
  68. this.filter(this.buffer, function(err, protocol) {
  69. if (err)
  70. return self.onError(err);
  71. // No protocol selected yet
  72. if (!protocol)
  73. return;
  74. self.detach();
  75. self.emit('select', protocol, self.socket);
  76. self.reemit();
  77. });
  78. };
  79. Hose.prototype.onEnd = function onEnd() {
  80. this.detach();
  81. this.emit('error', new Error('Not enough data to recognize protocol'));
  82. };