WorkerPool.js 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. 'use strict';
  2. Object.defineProperty(exports, "__esModule", {
  3. value: true
  4. });
  5. var _child_process = require('child_process');
  6. var _child_process2 = _interopRequireDefault(_child_process);
  7. var _queue = require('neo-async/queue');
  8. var _queue2 = _interopRequireDefault(_queue);
  9. var _mapSeries = require('neo-async/mapSeries');
  10. var _mapSeries2 = _interopRequireDefault(_mapSeries);
  11. var _readBuffer = require('./readBuffer');
  12. var _readBuffer2 = _interopRequireDefault(_readBuffer);
  13. var _WorkerError = require('./WorkerError');
  14. var _WorkerError2 = _interopRequireDefault(_WorkerError);
  15. function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
  16. const workerPath = require.resolve('./worker'); /* eslint-disable no-console */
  17. let workerId = 0;
  18. class PoolWorker {
  19. constructor(options, onJobDone) {
  20. this.disposed = false;
  21. this.nextJobId = 0;
  22. this.jobs = Object.create(null);
  23. this.activeJobs = 0;
  24. this.onJobDone = onJobDone;
  25. this.id = workerId;
  26. workerId += 1;
  27. // Empty or invalid node args would break the child process
  28. const sanitizedNodeArgs = (options.nodeArgs || []).filter(opt => !!opt);
  29. this.worker = _child_process2.default.spawn(process.execPath, [].concat(sanitizedNodeArgs).concat(workerPath, options.parallelJobs), {
  30. detached: true,
  31. stdio: ['ignore', 'pipe', 'pipe', 'pipe', 'pipe']
  32. });
  33. this.worker.unref();
  34. // This prevents a problem where the worker stdio can be undefined
  35. // when the kernel hits the limit of open files.
  36. // More info can be found on: https://github.com/webpack-contrib/thread-loader/issues/2
  37. if (!this.worker.stdio) {
  38. throw new Error(`Failed to create the worker pool with workerId: ${workerId} and ${''}configuration: ${JSON.stringify(options)}. Please verify if you hit the OS open files limit.`);
  39. }
  40. const [,,, readPipe, writePipe] = this.worker.stdio;
  41. this.readPipe = readPipe;
  42. this.writePipe = writePipe;
  43. this.listenStdOutAndErrFromWorker(this.worker.stdout, this.worker.stderr);
  44. this.readNextMessage();
  45. }
  46. listenStdOutAndErrFromWorker(workerStdout, workerStderr) {
  47. if (workerStdout) {
  48. workerStdout.on('data', this.writeToStdout);
  49. }
  50. if (workerStderr) {
  51. workerStderr.on('data', this.writeToStderr);
  52. }
  53. }
  54. ignoreStdOutAndErrFromWorker(workerStdout, workerStderr) {
  55. if (workerStdout) {
  56. workerStdout.removeListener('data', this.writeToStdout);
  57. }
  58. if (workerStderr) {
  59. workerStderr.removeListener('data', this.writeToStderr);
  60. }
  61. }
  62. writeToStdout(data) {
  63. if (!this.disposed) {
  64. process.stdout.write(data);
  65. }
  66. }
  67. writeToStderr(data) {
  68. if (!this.disposed) {
  69. process.stderr.write(data);
  70. }
  71. }
  72. run(data, callback) {
  73. const jobId = this.nextJobId;
  74. this.nextJobId += 1;
  75. this.jobs[jobId] = { data, callback };
  76. this.activeJobs += 1;
  77. this.writeJson({
  78. type: 'job',
  79. id: jobId,
  80. data
  81. });
  82. }
  83. warmup(requires) {
  84. this.writeJson({
  85. type: 'warmup',
  86. requires
  87. });
  88. }
  89. writeJson(data) {
  90. const lengthBuffer = Buffer.alloc(4);
  91. const messageBuffer = Buffer.from(JSON.stringify(data), 'utf-8');
  92. lengthBuffer.writeInt32BE(messageBuffer.length, 0);
  93. this.writePipe.write(lengthBuffer);
  94. this.writePipe.write(messageBuffer);
  95. }
  96. writeEnd() {
  97. const lengthBuffer = Buffer.alloc(4);
  98. lengthBuffer.writeInt32BE(0, 0);
  99. this.writePipe.write(lengthBuffer);
  100. }
  101. readNextMessage() {
  102. this.state = 'read length';
  103. this.readBuffer(4, (lengthReadError, lengthBuffer) => {
  104. if (lengthReadError) {
  105. console.error(`Failed to communicate with worker (read length) ${lengthReadError}`);
  106. return;
  107. }
  108. this.state = 'length read';
  109. const length = lengthBuffer.readInt32BE(0);
  110. this.state = 'read message';
  111. this.readBuffer(length, (messageError, messageBuffer) => {
  112. if (messageError) {
  113. console.error(`Failed to communicate with worker (read message) ${messageError}`);
  114. return;
  115. }
  116. this.state = 'message read';
  117. const messageString = messageBuffer.toString('utf-8');
  118. const message = JSON.parse(messageString);
  119. this.state = 'process message';
  120. this.onWorkerMessage(message, err => {
  121. if (err) {
  122. console.error(`Failed to communicate with worker (process message) ${err}`);
  123. return;
  124. }
  125. this.state = 'soon next';
  126. setImmediate(() => this.readNextMessage());
  127. });
  128. });
  129. });
  130. }
  131. onWorkerMessage(message, finalCallback) {
  132. const { type, id } = message;
  133. switch (type) {
  134. case 'job':
  135. {
  136. const { data, error, result } = message;
  137. (0, _mapSeries2.default)(data, (length, callback) => this.readBuffer(length, callback), (eachErr, buffers) => {
  138. const { callback: jobCallback } = this.jobs[id];
  139. const callback = (err, arg) => {
  140. if (jobCallback) {
  141. delete this.jobs[id];
  142. this.activeJobs -= 1;
  143. this.onJobDone();
  144. if (err) {
  145. jobCallback(err instanceof Error ? err : new Error(err), arg);
  146. } else {
  147. jobCallback(null, arg);
  148. }
  149. }
  150. finalCallback();
  151. };
  152. if (eachErr) {
  153. callback(eachErr);
  154. return;
  155. }
  156. let bufferPosition = 0;
  157. if (result.result) {
  158. result.result = result.result.map(r => {
  159. if (r.buffer) {
  160. const buffer = buffers[bufferPosition];
  161. bufferPosition += 1;
  162. if (r.string) {
  163. return buffer.toString('utf-8');
  164. }
  165. return buffer;
  166. }
  167. return r.data;
  168. });
  169. }
  170. if (error) {
  171. callback(this.fromErrorObj(error), result);
  172. return;
  173. }
  174. callback(null, result);
  175. });
  176. break;
  177. }
  178. case 'resolve':
  179. {
  180. const { context, request, questionId } = message;
  181. const { data } = this.jobs[id];
  182. data.resolve(context, request, (error, result) => {
  183. this.writeJson({
  184. type: 'result',
  185. id: questionId,
  186. error: error ? {
  187. message: error.message,
  188. details: error.details,
  189. missing: error.missing
  190. } : null,
  191. result
  192. });
  193. });
  194. finalCallback();
  195. break;
  196. }
  197. case 'emitWarning':
  198. {
  199. const { data } = message;
  200. const { data: jobData } = this.jobs[id];
  201. jobData.emitWarning(this.fromErrorObj(data));
  202. finalCallback();
  203. break;
  204. }
  205. case 'emitError':
  206. {
  207. const { data } = message;
  208. const { data: jobData } = this.jobs[id];
  209. jobData.emitError(this.fromErrorObj(data));
  210. finalCallback();
  211. break;
  212. }
  213. default:
  214. {
  215. console.error(`Unexpected worker message ${type} in WorkerPool.`);
  216. finalCallback();
  217. break;
  218. }
  219. }
  220. }
  221. fromErrorObj(arg) {
  222. let obj;
  223. if (typeof arg === 'string') {
  224. obj = { message: arg };
  225. } else {
  226. obj = arg;
  227. }
  228. return new _WorkerError2.default(obj, this.id);
  229. }
  230. readBuffer(length, callback) {
  231. (0, _readBuffer2.default)(this.readPipe, length, callback);
  232. }
  233. dispose() {
  234. if (!this.disposed) {
  235. this.disposed = true;
  236. this.ignoreStdOutAndErrFromWorker(this.worker.stdout, this.worker.stderr);
  237. this.writeEnd();
  238. }
  239. }
  240. }
  241. class WorkerPool {
  242. constructor(options) {
  243. this.options = options || {};
  244. this.numberOfWorkers = options.numberOfWorkers;
  245. this.poolTimeout = options.poolTimeout;
  246. this.workerNodeArgs = options.workerNodeArgs;
  247. this.workerParallelJobs = options.workerParallelJobs;
  248. this.workers = new Set();
  249. this.activeJobs = 0;
  250. this.timeout = null;
  251. this.poolQueue = (0, _queue2.default)(this.distributeJob.bind(this), options.poolParallelJobs);
  252. this.terminated = false;
  253. this.setupLifeCycle();
  254. }
  255. isAbleToRun() {
  256. return !this.terminated;
  257. }
  258. terminate() {
  259. if (this.terminated) {
  260. return;
  261. }
  262. this.terminated = true;
  263. this.poolQueue.kill();
  264. this.disposeWorkers(true);
  265. }
  266. setupLifeCycle() {
  267. process.on('exit', () => {
  268. this.terminate();
  269. });
  270. }
  271. run(data, callback) {
  272. if (this.timeout) {
  273. clearTimeout(this.timeout);
  274. this.timeout = null;
  275. }
  276. this.activeJobs += 1;
  277. this.poolQueue.push(data, callback);
  278. }
  279. distributeJob(data, callback) {
  280. // use worker with the fewest jobs
  281. let bestWorker;
  282. for (const worker of this.workers) {
  283. if (!bestWorker || worker.activeJobs < bestWorker.activeJobs) {
  284. bestWorker = worker;
  285. }
  286. }
  287. if (bestWorker && (bestWorker.activeJobs === 0 || this.workers.size >= this.numberOfWorkers)) {
  288. bestWorker.run(data, callback);
  289. return;
  290. }
  291. const newWorker = this.createWorker();
  292. newWorker.run(data, callback);
  293. }
  294. createWorker() {
  295. // spin up a new worker
  296. const newWorker = new PoolWorker({
  297. nodeArgs: this.workerNodeArgs,
  298. parallelJobs: this.workerParallelJobs
  299. }, () => this.onJobDone());
  300. this.workers.add(newWorker);
  301. return newWorker;
  302. }
  303. warmup(requires) {
  304. while (this.workers.size < this.numberOfWorkers) {
  305. this.createWorker().warmup(requires);
  306. }
  307. }
  308. onJobDone() {
  309. this.activeJobs -= 1;
  310. if (this.activeJobs === 0 && isFinite(this.poolTimeout)) {
  311. this.timeout = setTimeout(() => this.disposeWorkers(), this.poolTimeout);
  312. }
  313. }
  314. disposeWorkers(fromTerminate) {
  315. if (!this.options.poolRespawn && !fromTerminate) {
  316. this.terminate();
  317. return;
  318. }
  319. if (this.activeJobs === 0 || fromTerminate) {
  320. for (const worker of this.workers) {
  321. worker.dispose();
  322. }
  323. this.workers.clear();
  324. }
  325. }
  326. }
  327. exports.default = WorkerPool;