connection.js 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845
  1. 'use strict'
  2. var util = require('util')
  3. var transport = require('../spdy-transport')
  4. var debug = {
  5. server: require('debug')('spdy:connection:server'),
  6. client: require('debug')('spdy:connection:client')
  7. }
  8. var EventEmitter = require('events').EventEmitter
  9. var Stream = transport.Stream
  10. function Connection (socket, options) {
  11. EventEmitter.call(this)
  12. var state = {}
  13. this._spdyState = state
  14. // NOTE: There's a big trick here. Connection is used as a `this` argument
  15. // to the wrapped `connection` event listener.
  16. // socket end doesn't necessarly mean connection drop
  17. this.httpAllowHalfOpen = true
  18. state.timeout = new transport.utils.Timeout(this)
  19. // Protocol info
  20. state.protocol = transport.protocol[options.protocol]
  21. state.version = null
  22. state.constants = state.protocol.constants
  23. state.pair = null
  24. state.isServer = options.isServer
  25. // Root of priority tree (i.e. stream id = 0)
  26. state.priorityRoot = new transport.Priority({
  27. defaultWeight: state.constants.DEFAULT_WEIGHT,
  28. maxCount: transport.protocol.base.constants.MAX_PRIORITY_STREAMS
  29. })
  30. // Defaults
  31. state.maxStreams = options.maxStreams ||
  32. state.constants.MAX_CONCURRENT_STREAMS
  33. state.autoSpdy31 = options.protocol.name !== 'h2' && options.autoSpdy31
  34. state.acceptPush = options.acceptPush === undefined
  35. ? !state.isServer
  36. : options.acceptPush
  37. if (options.maxChunk === false) { state.maxChunk = Infinity } else if (options.maxChunk === undefined) { state.maxChunk = transport.protocol.base.constants.DEFAULT_MAX_CHUNK } else {
  38. state.maxChunk = options.maxChunk
  39. }
  40. // Connection-level flow control
  41. var windowSize = options.windowSize || 1 << 20
  42. state.window = new transport.Window({
  43. id: 0,
  44. isServer: state.isServer,
  45. recv: {
  46. size: state.constants.DEFAULT_WINDOW,
  47. max: state.constants.MAX_INITIAL_WINDOW_SIZE
  48. },
  49. send: {
  50. size: state.constants.DEFAULT_WINDOW,
  51. max: state.constants.MAX_INITIAL_WINDOW_SIZE
  52. }
  53. })
  54. // It starts with DEFAULT_WINDOW, update must be sent to change it on client
  55. state.window.recv.setMax(windowSize)
  56. // Boilerplate for Stream constructor
  57. state.streamWindow = new transport.Window({
  58. id: -1,
  59. isServer: state.isServer,
  60. recv: {
  61. size: windowSize,
  62. max: state.constants.MAX_INITIAL_WINDOW_SIZE
  63. },
  64. send: {
  65. size: state.constants.DEFAULT_WINDOW,
  66. max: state.constants.MAX_INITIAL_WINDOW_SIZE
  67. }
  68. })
  69. // Various state info
  70. state.pool = state.protocol.compressionPool.create(options.headerCompression)
  71. state.counters = {
  72. push: 0,
  73. stream: 0
  74. }
  75. // Init streams list
  76. state.stream = {
  77. map: {},
  78. count: 0,
  79. nextId: state.isServer ? 2 : 1,
  80. lastId: {
  81. both: 0,
  82. received: 0
  83. }
  84. }
  85. state.ping = {
  86. nextId: state.isServer ? 2 : 1,
  87. map: {}
  88. }
  89. state.goaway = false
  90. // Debug
  91. state.debug = state.isServer ? debug.server : debug.client
  92. // X-Forwarded feature
  93. state.xForward = null
  94. // Create parser and hole for framer
  95. state.parser = state.protocol.parser.create({
  96. // NOTE: needed to distinguish ping from ping ACK in SPDY
  97. isServer: state.isServer,
  98. window: state.window
  99. })
  100. state.framer = state.protocol.framer.create({
  101. window: state.window,
  102. timeout: state.timeout
  103. })
  104. // SPDY has PUSH enabled on servers
  105. if (state.protocol.name === 'spdy') {
  106. state.framer.enablePush(state.isServer)
  107. }
  108. if (!state.isServer) { state.parser.skipPreface() }
  109. this.socket = socket
  110. this._init()
  111. }
  112. util.inherits(Connection, EventEmitter)
  113. exports.Connection = Connection
  114. Connection.create = function create (socket, options) {
  115. return new Connection(socket, options)
  116. }
  117. Connection.prototype._init = function init () {
  118. var self = this
  119. var state = this._spdyState
  120. var pool = state.pool
  121. // Initialize session window
  122. state.window.recv.on('drain', function () {
  123. self._onSessionWindowDrain()
  124. })
  125. // Initialize parser
  126. state.parser.on('data', function (frame) {
  127. self._handleFrame(frame)
  128. })
  129. state.parser.once('version', function (version) {
  130. self._onVersion(version)
  131. })
  132. // Propagate parser errors
  133. state.parser.on('error', function (err) {
  134. self._onParserError(err)
  135. })
  136. // Propagate framer errors
  137. state.framer.on('error', function (err) {
  138. self.emit('error', err)
  139. })
  140. this.socket.pipe(state.parser)
  141. state.framer.pipe(this.socket)
  142. // Allow high-level api to catch socket errors
  143. this.socket.on('error', function onSocketError (e) {
  144. self.emit('error', e)
  145. })
  146. this.socket.once('close', function onclose (hadError) {
  147. var err
  148. if (hadError) {
  149. err = new Error('socket hang up')
  150. err.code = 'ECONNRESET'
  151. }
  152. self.destroyStreams(err)
  153. self.emit('close')
  154. if (state.pair) {
  155. pool.put(state.pair)
  156. }
  157. state.framer.resume()
  158. })
  159. // Reset timeout on close
  160. this.once('close', function () {
  161. self.setTimeout(0)
  162. })
  163. function _onWindowOverflow () {
  164. self._onWindowOverflow()
  165. }
  166. state.window.recv.on('overflow', _onWindowOverflow)
  167. state.window.send.on('overflow', _onWindowOverflow)
  168. // Do not allow half-open connections
  169. this.socket.allowHalfOpen = false
  170. }
  171. Connection.prototype._onVersion = function _onVersion (version) {
  172. var state = this._spdyState
  173. var prev = state.version
  174. var parser = state.parser
  175. var framer = state.framer
  176. var pool = state.pool
  177. state.version = version
  178. state.debug('id=0 version=%d', version)
  179. // Ignore transition to 3.1
  180. if (!prev) {
  181. state.pair = pool.get(version)
  182. parser.setCompression(state.pair)
  183. framer.setCompression(state.pair)
  184. }
  185. framer.setVersion(version)
  186. if (!state.isServer) {
  187. framer.prefaceFrame()
  188. if (state.xForward !== null) {
  189. framer.xForwardedFor({ host: state.xForward })
  190. }
  191. }
  192. // Send preface+settings frame (once)
  193. framer.settingsFrame({
  194. max_header_list_size: state.constants.DEFAULT_MAX_HEADER_LIST_SIZE,
  195. max_concurrent_streams: state.maxStreams,
  196. enable_push: state.acceptPush ? 1 : 0,
  197. initial_window_size: state.window.recv.max
  198. })
  199. // Update session window
  200. if (state.version >= 3.1 || (state.isServer && state.autoSpdy31)) { this._onSessionWindowDrain() }
  201. this.emit('version', version)
  202. }
  203. Connection.prototype._onParserError = function _onParserError (err) {
  204. var state = this._spdyState
  205. // Prevent further errors
  206. state.parser.kill()
  207. // Send GOAWAY
  208. if (err instanceof transport.protocol.base.utils.ProtocolError) {
  209. this._goaway({
  210. lastId: state.stream.lastId.both,
  211. code: err.code,
  212. extra: err.message,
  213. send: true
  214. })
  215. }
  216. this.emit('error', err)
  217. }
  218. Connection.prototype._handleFrame = function _handleFrame (frame) {
  219. var state = this._spdyState
  220. state.debug('id=0 frame', frame)
  221. state.timeout.reset()
  222. // For testing purposes
  223. this.emit('frame', frame)
  224. var stream
  225. // Session window update
  226. if (frame.type === 'WINDOW_UPDATE' && frame.id === 0) {
  227. if (state.version < 3.1 && state.autoSpdy31) {
  228. state.debug('id=0 switch version to 3.1')
  229. state.version = 3.1
  230. this.emit('version', 3.1)
  231. }
  232. state.window.send.update(frame.delta)
  233. return
  234. }
  235. if (state.isServer && frame.type === 'PUSH_PROMISE') {
  236. state.debug('id=0 server PUSH_PROMISE')
  237. this._goaway({
  238. lastId: state.stream.lastId.both,
  239. code: 'PROTOCOL_ERROR',
  240. send: true
  241. })
  242. return
  243. }
  244. if (!stream && frame.id !== undefined) {
  245. // Load created one
  246. stream = state.stream.map[frame.id]
  247. // Fail if not found
  248. if (!stream &&
  249. frame.type !== 'HEADERS' &&
  250. frame.type !== 'PRIORITY' &&
  251. frame.type !== 'RST') {
  252. // Other side should destroy the stream upon receiving GOAWAY
  253. if (this._isGoaway(frame.id)) { return }
  254. state.debug('id=0 stream=%d not found', frame.id)
  255. state.framer.rstFrame({ id: frame.id, code: 'INVALID_STREAM' })
  256. return
  257. }
  258. }
  259. // Create new stream
  260. if (!stream && frame.type === 'HEADERS') {
  261. this._handleHeaders(frame)
  262. return
  263. }
  264. if (stream) {
  265. stream._handleFrame(frame)
  266. } else if (frame.type === 'SETTINGS') {
  267. this._handleSettings(frame.settings)
  268. } else if (frame.type === 'ACK_SETTINGS') {
  269. // TODO(indutny): handle it one day
  270. } else if (frame.type === 'PING') {
  271. this._handlePing(frame)
  272. } else if (frame.type === 'GOAWAY') {
  273. this._handleGoaway(frame)
  274. } else if (frame.type === 'X_FORWARDED_FOR') {
  275. // Set X-Forwarded-For only once
  276. if (state.xForward === null) {
  277. state.xForward = frame.host
  278. }
  279. } else if (frame.type === 'PRIORITY') {
  280. // TODO(indutny): handle this
  281. } else {
  282. state.debug('id=0 unknown frame type: %s', frame.type)
  283. }
  284. }
  285. Connection.prototype._onWindowOverflow = function _onWindowOverflow () {
  286. var state = this._spdyState
  287. state.debug('id=0 window overflow')
  288. this._goaway({
  289. lastId: state.stream.lastId.both,
  290. code: 'FLOW_CONTROL_ERROR',
  291. send: true
  292. })
  293. }
  294. Connection.prototype._isGoaway = function _isGoaway (id) {
  295. var state = this._spdyState
  296. if (state.goaway !== false && state.goaway < id) { return true }
  297. return false
  298. }
  299. Connection.prototype._getId = function _getId () {
  300. var state = this._spdyState
  301. var id = state.stream.nextId
  302. state.stream.nextId += 2
  303. return id
  304. }
  305. Connection.prototype._createStream = function _createStream (uri) {
  306. var state = this._spdyState
  307. var id = uri.id
  308. if (id === undefined) { id = this._getId() }
  309. var isGoaway = this._isGoaway(id)
  310. if (uri.push && !state.acceptPush) {
  311. state.debug('id=0 push disabled promisedId=%d', id)
  312. // Fatal error
  313. this._goaway({
  314. lastId: state.stream.lastId.both,
  315. code: 'PROTOCOL_ERROR',
  316. send: true
  317. })
  318. isGoaway = true
  319. }
  320. var stream = new Stream(this, {
  321. id: id,
  322. request: uri.request !== false,
  323. method: uri.method,
  324. path: uri.path,
  325. host: uri.host,
  326. priority: uri.priority,
  327. headers: uri.headers,
  328. parent: uri.parent,
  329. readable: !isGoaway && uri.readable,
  330. writable: !isGoaway && uri.writable
  331. })
  332. var self = this
  333. // Just an empty stream for API consistency
  334. if (isGoaway) {
  335. return stream
  336. }
  337. state.stream.lastId.both = Math.max(state.stream.lastId.both, id)
  338. state.debug('id=0 add stream=%d', stream.id)
  339. state.stream.map[stream.id] = stream
  340. state.stream.count++
  341. state.counters.stream++
  342. if (stream.parent !== null) {
  343. state.counters.push++
  344. }
  345. stream.once('close', function () {
  346. self._removeStream(stream)
  347. })
  348. return stream
  349. }
  350. Connection.prototype._handleHeaders = function _handleHeaders (frame) {
  351. var state = this._spdyState
  352. // Must be HEADERS frame after stream close
  353. if (frame.id <= state.stream.lastId.received) { return }
  354. // Someone is using our ids!
  355. if ((frame.id + state.stream.nextId) % 2 === 0) {
  356. state.framer.rstFrame({ id: frame.id, code: 'PROTOCOL_ERROR' })
  357. return
  358. }
  359. var stream = this._createStream({
  360. id: frame.id,
  361. request: false,
  362. method: frame.headers[':method'],
  363. path: frame.headers[':path'],
  364. host: frame.headers[':authority'],
  365. priority: frame.priority,
  366. headers: frame.headers,
  367. writable: frame.writable
  368. })
  369. // GOAWAY
  370. if (this._isGoaway(stream.id)) {
  371. return
  372. }
  373. state.stream.lastId.received = Math.max(
  374. state.stream.lastId.received,
  375. stream.id
  376. )
  377. // TODO(indutny) handle stream limit
  378. if (!this.emit('stream', stream)) {
  379. // No listeners was set - abort the stream
  380. stream.abort()
  381. return
  382. }
  383. // Create fake frame to simulate end of the data
  384. if (frame.fin) {
  385. stream._handleFrame({ type: 'FIN', fin: true })
  386. }
  387. return stream
  388. }
  389. Connection.prototype._onSessionWindowDrain = function _onSessionWindowDrain () {
  390. var state = this._spdyState
  391. if (state.version < 3.1 && !(state.isServer && state.autoSpdy31)) {
  392. return
  393. }
  394. var delta = state.window.recv.getDelta()
  395. if (delta === 0) {
  396. return
  397. }
  398. state.debug('id=0 session window drain, update by %d', delta)
  399. state.framer.windowUpdateFrame({
  400. id: 0,
  401. delta: delta
  402. })
  403. state.window.recv.update(delta)
  404. }
  405. Connection.prototype.start = function start (version) {
  406. this._spdyState.parser.setVersion(version)
  407. }
  408. // Mostly for testing
  409. Connection.prototype.getVersion = function getVersion () {
  410. return this._spdyState.version
  411. }
  412. Connection.prototype._handleSettings = function _handleSettings (settings) {
  413. var state = this._spdyState
  414. state.framer.ackSettingsFrame()
  415. this._setDefaultWindow(settings)
  416. if (settings.max_frame_size) { state.framer.setMaxFrameSize(settings.max_frame_size) }
  417. // TODO(indutny): handle max_header_list_size
  418. if (settings.header_table_size) {
  419. try {
  420. state.pair.compress.updateTableSize(settings.header_table_size)
  421. } catch (e) {
  422. this._goaway({
  423. lastId: 0,
  424. code: 'PROTOCOL_ERROR',
  425. send: true
  426. })
  427. return
  428. }
  429. }
  430. // HTTP2 clients needs to enable PUSH streams explicitly
  431. if (state.protocol.name !== 'spdy') {
  432. if (settings.enable_push === undefined) {
  433. state.framer.enablePush(state.isServer)
  434. } else {
  435. state.framer.enablePush(settings.enable_push === 1)
  436. }
  437. }
  438. // TODO(indutny): handle max_concurrent_streams
  439. }
  440. Connection.prototype._setDefaultWindow = function _setDefaultWindow (settings) {
  441. if (settings.initial_window_size === undefined) {
  442. return
  443. }
  444. var state = this._spdyState
  445. // Update defaults
  446. var window = state.streamWindow
  447. window.send.setMax(settings.initial_window_size)
  448. // Update existing streams
  449. Object.keys(state.stream.map).forEach(function (id) {
  450. var stream = state.stream.map[id]
  451. var window = stream._spdyState.window
  452. window.send.updateMax(settings.initial_window_size)
  453. })
  454. }
  455. Connection.prototype._handlePing = function handlePing (frame) {
  456. var self = this
  457. var state = this._spdyState
  458. // Handle incoming PING
  459. if (!frame.ack) {
  460. state.framer.pingFrame({
  461. opaque: frame.opaque,
  462. ack: true
  463. })
  464. self.emit('ping', frame.opaque)
  465. return
  466. }
  467. // Handle reply PING
  468. var hex = frame.opaque.toString('hex')
  469. if (!state.ping.map[hex]) {
  470. return
  471. }
  472. var ping = state.ping.map[hex]
  473. delete state.ping.map[hex]
  474. if (ping.cb) {
  475. ping.cb(null)
  476. }
  477. }
  478. Connection.prototype._handleGoaway = function handleGoaway (frame) {
  479. this._goaway({
  480. lastId: frame.lastId,
  481. code: frame.code,
  482. send: false
  483. })
  484. }
  485. Connection.prototype.ping = function ping (callback) {
  486. var state = this._spdyState
  487. // HTTP2 is using 8-byte opaque
  488. var opaque = Buffer.alloc(state.constants.PING_OPAQUE_SIZE)
  489. opaque.fill(0)
  490. opaque.writeUInt32BE(state.ping.nextId, opaque.length - 4)
  491. state.ping.nextId += 2
  492. state.ping.map[opaque.toString('hex')] = { cb: callback }
  493. state.framer.pingFrame({
  494. opaque: opaque,
  495. ack: false
  496. })
  497. }
  498. Connection.prototype.getCounter = function getCounter (name) {
  499. return this._spdyState.counters[name]
  500. }
  501. Connection.prototype.reserveStream = function reserveStream (uri, callback) {
  502. var stream = this._createStream(uri)
  503. // GOAWAY
  504. if (this._isGoaway(stream.id)) {
  505. var err = new Error('Can\'t send request after GOAWAY')
  506. process.nextTick(function () {
  507. if (callback) { callback(err) } else {
  508. stream.emit('error', err)
  509. }
  510. })
  511. return stream
  512. }
  513. if (callback) {
  514. process.nextTick(function () {
  515. callback(null, stream)
  516. })
  517. }
  518. return stream
  519. }
  520. Connection.prototype.request = function request (uri, callback) {
  521. var stream = this.reserveStream(uri, function (err) {
  522. if (err) {
  523. if (callback) {
  524. callback(err)
  525. } else {
  526. stream.emit('error', err)
  527. }
  528. return
  529. }
  530. if (stream._wasSent()) {
  531. if (callback) {
  532. callback(null, stream)
  533. }
  534. return
  535. }
  536. stream.send(function (err) {
  537. if (err) {
  538. if (callback) { return callback(err) } else { return stream.emit('error', err) }
  539. }
  540. if (callback) {
  541. callback(null, stream)
  542. }
  543. })
  544. })
  545. return stream
  546. }
  547. Connection.prototype._removeStream = function _removeStream (stream) {
  548. var state = this._spdyState
  549. state.debug('id=0 remove stream=%d', stream.id)
  550. delete state.stream.map[stream.id]
  551. state.stream.count--
  552. if (state.stream.count === 0) {
  553. this.emit('_streamDrain')
  554. }
  555. }
  556. Connection.prototype._goaway = function _goaway (params) {
  557. var state = this._spdyState
  558. var self = this
  559. state.goaway = params.lastId
  560. state.debug('id=0 goaway from=%d', state.goaway)
  561. Object.keys(state.stream.map).forEach(function (id) {
  562. var stream = state.stream.map[id]
  563. // Abort every stream started after GOAWAY
  564. if (stream.id <= params.lastId) {
  565. return
  566. }
  567. stream.abort()
  568. stream.emit('error', new Error('New stream after GOAWAY'))
  569. })
  570. function finish () {
  571. // Destroy socket if there are no streams
  572. if (state.stream.count === 0 || params.code !== 'OK') {
  573. // No further frames should be processed
  574. state.parser.kill()
  575. process.nextTick(function () {
  576. var err = new Error('Fatal error: ' + params.code)
  577. self._onStreamDrain(err)
  578. })
  579. return
  580. }
  581. self.on('_streamDrain', self._onStreamDrain)
  582. }
  583. if (params.send) {
  584. // Make sure that GOAWAY frame is sent before dumping framer
  585. state.framer.goawayFrame({
  586. lastId: params.lastId,
  587. code: params.code,
  588. extra: params.extra
  589. }, finish)
  590. } else {
  591. finish()
  592. }
  593. }
  594. Connection.prototype._onStreamDrain = function _onStreamDrain (error) {
  595. var state = this._spdyState
  596. state.debug('id=0 _onStreamDrain')
  597. state.framer.dump()
  598. state.framer.unpipe(this.socket)
  599. state.framer.resume()
  600. if (this.socket.destroySoon) {
  601. this.socket.destroySoon()
  602. }
  603. this.emit('close', error)
  604. }
  605. Connection.prototype.end = function end (callback) {
  606. var state = this._spdyState
  607. if (callback) {
  608. this.once('close', callback)
  609. }
  610. this._goaway({
  611. lastId: state.stream.lastId.both,
  612. code: 'OK',
  613. send: true
  614. })
  615. }
  616. Connection.prototype.destroyStreams = function destroyStreams (err) {
  617. var state = this._spdyState
  618. Object.keys(state.stream.map).forEach(function (id) {
  619. var stream = state.stream.map[id]
  620. stream.destroy()
  621. if (err) {
  622. stream.emit('error', err)
  623. }
  624. })
  625. }
  626. Connection.prototype.isServer = function isServer () {
  627. return this._spdyState.isServer
  628. }
  629. Connection.prototype.getXForwardedFor = function getXForwardFor () {
  630. return this._spdyState.xForward
  631. }
  632. Connection.prototype.sendXForwardedFor = function sendXForwardedFor (host) {
  633. var state = this._spdyState
  634. if (state.version !== null) {
  635. state.framer.xForwardedFor({ host: host })
  636. } else {
  637. state.xForward = host
  638. }
  639. }
  640. Connection.prototype.pushPromise = function pushPromise (parent, uri, callback) {
  641. var state = this._spdyState
  642. var stream = this._createStream({
  643. request: false,
  644. parent: parent,
  645. method: uri.method,
  646. path: uri.path,
  647. host: uri.host,
  648. priority: uri.priority,
  649. headers: uri.headers,
  650. readable: false
  651. })
  652. var err
  653. // TODO(indutny): deduplicate this logic somehow
  654. if (this._isGoaway(stream.id)) {
  655. err = new Error('Can\'t send PUSH_PROMISE after GOAWAY')
  656. process.nextTick(function () {
  657. if (callback) {
  658. callback(err)
  659. } else {
  660. stream.emit('error', err)
  661. }
  662. })
  663. return stream
  664. }
  665. if (uri.push && !state.acceptPush) {
  666. err = new Error(
  667. 'Can\'t send PUSH_PROMISE, other side won\'t accept it')
  668. process.nextTick(function () {
  669. if (callback) { callback(err) } else {
  670. stream.emit('error', err)
  671. }
  672. })
  673. return stream
  674. }
  675. stream._sendPush(uri.status, uri.response, function (err) {
  676. if (!callback) {
  677. if (err) {
  678. stream.emit('error', err)
  679. }
  680. return
  681. }
  682. if (err) { return callback(err) }
  683. callback(null, stream)
  684. })
  685. return stream
  686. }
  687. Connection.prototype.setTimeout = function setTimeout (delay, callback) {
  688. var state = this._spdyState
  689. state.timeout.set(delay, callback)
  690. }