query.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. 'use strict';
  2. const process = require('process');
  3. const Timers = require('timers');
  4. const Readable = require('stream').Readable;
  5. const Command = require('./command.js');
  6. const Packets = require('../packets/index.js');
  7. const getTextParser = require('../parsers/text_parser.js');
  8. const staticParser = require('../parsers/static_text_parser.js');
  9. const ServerStatus = require('../constants/server_status.js');
  10. const EmptyPacket = new Packets.Packet(0, Buffer.allocUnsafe(4), 0, 4);
  11. // http://dev.mysql.com/doc/internals/en/com-query.html
  12. class Query extends Command {
  13. constructor(options, callback) {
  14. super();
  15. this.sql = options.sql;
  16. this.values = options.values;
  17. this._queryOptions = options;
  18. this.namedPlaceholders = options.namedPlaceholders || false;
  19. this.onResult = callback;
  20. this.timeout = options.timeout;
  21. this.queryTimeout = null;
  22. this._fieldCount = 0;
  23. this._rowParser = null;
  24. this._fields = [];
  25. this._rows = [];
  26. this._receivedFieldsCount = 0;
  27. this._resultIndex = 0;
  28. this._localStream = null;
  29. this._unpipeStream = function () {};
  30. this._streamFactory = options.infileStreamFactory;
  31. this._connection = null;
  32. }
  33. then() {
  34. const err =
  35. "You have tried to call .then(), .catch(), or invoked await on the result of query that is not a promise, which is a programming error. Try calling con.promise().query(), or require('mysql2/promise') instead of 'mysql2' for a promise-compatible version of the query interface. To learn how to use async/await or Promises check out documentation at https://sidorares.github.io/node-mysql2/docs#using-promise-wrapper, or the mysql2 documentation at https://sidorares.github.io/node-mysql2/docs/documentation/promise-wrapper";
  36. console.log(err);
  37. throw new Error(err);
  38. }
  39. /* eslint no-unused-vars: ["error", { "argsIgnorePattern": "^_" }] */
  40. start(_packet, connection) {
  41. if (connection.config.debug) {
  42. console.log(' Sending query command: %s', this.sql);
  43. }
  44. this._connection = connection;
  45. this.options = Object.assign({}, connection.config, this._queryOptions);
  46. this._setTimeout();
  47. const cmdPacket = new Packets.Query(
  48. this.sql,
  49. connection.config.charsetNumber
  50. );
  51. connection.writePacket(cmdPacket.toPacket(1));
  52. return Query.prototype.resultsetHeader;
  53. }
  54. done() {
  55. this._unpipeStream();
  56. // if all ready timeout, return null directly
  57. if (this.timeout && !this.queryTimeout) {
  58. return null;
  59. }
  60. // else clear timer
  61. if (this.queryTimeout) {
  62. Timers.clearTimeout(this.queryTimeout);
  63. this.queryTimeout = null;
  64. }
  65. if (this.onResult) {
  66. let rows, fields;
  67. if (this._resultIndex === 0) {
  68. rows = this._rows[0];
  69. fields = this._fields[0];
  70. } else {
  71. rows = this._rows;
  72. fields = this._fields;
  73. }
  74. if (fields) {
  75. process.nextTick(() => {
  76. this.onResult(null, rows, fields);
  77. });
  78. } else {
  79. process.nextTick(() => {
  80. this.onResult(null, rows);
  81. });
  82. }
  83. }
  84. return null;
  85. }
  86. doneInsert(rs) {
  87. if (this._localStreamError) {
  88. if (this.onResult) {
  89. this.onResult(this._localStreamError, rs);
  90. } else {
  91. this.emit('error', this._localStreamError);
  92. }
  93. return null;
  94. }
  95. this._rows.push(rs);
  96. this._fields.push(void 0);
  97. this.emit('fields', void 0);
  98. this.emit('result', rs);
  99. if (rs.serverStatus & ServerStatus.SERVER_MORE_RESULTS_EXISTS) {
  100. this._resultIndex++;
  101. return this.resultsetHeader;
  102. }
  103. return this.done();
  104. }
  105. resultsetHeader(packet, connection) {
  106. const rs = new Packets.ResultSetHeader(packet, connection);
  107. this._fieldCount = rs.fieldCount;
  108. if (connection.config.debug) {
  109. console.log(
  110. ` Resultset header received, expecting ${rs.fieldCount} column definition packets`
  111. );
  112. }
  113. if (this._fieldCount === 0) {
  114. return this.doneInsert(rs);
  115. }
  116. if (this._fieldCount === null) {
  117. return this._streamLocalInfile(connection, rs.infileName);
  118. }
  119. this._receivedFieldsCount = 0;
  120. this._rows.push([]);
  121. this._fields.push([]);
  122. return this.readField;
  123. }
  124. _streamLocalInfile(connection, path) {
  125. if (this._streamFactory) {
  126. this._localStream = this._streamFactory(path);
  127. } else {
  128. this._localStreamError = new Error(
  129. `As a result of LOCAL INFILE command server wants to read ${path} file, but as of v2.0 you must provide streamFactory option returning ReadStream.`
  130. );
  131. connection.writePacket(EmptyPacket);
  132. return this.infileOk;
  133. }
  134. const onConnectionError = () => {
  135. this._unpipeStream();
  136. };
  137. const onDrain = () => {
  138. this._localStream.resume();
  139. };
  140. const onPause = () => {
  141. this._localStream.pause();
  142. };
  143. const onData = function (data) {
  144. const dataWithHeader = Buffer.allocUnsafe(data.length + 4);
  145. data.copy(dataWithHeader, 4);
  146. connection.writePacket(
  147. new Packets.Packet(0, dataWithHeader, 0, dataWithHeader.length)
  148. );
  149. };
  150. const onEnd = () => {
  151. connection.removeListener('error', onConnectionError);
  152. connection.writePacket(EmptyPacket);
  153. };
  154. const onError = (err) => {
  155. this._localStreamError = err;
  156. connection.removeListener('error', onConnectionError);
  157. connection.writePacket(EmptyPacket);
  158. };
  159. this._unpipeStream = () => {
  160. connection.stream.removeListener('pause', onPause);
  161. connection.stream.removeListener('drain', onDrain);
  162. this._localStream.removeListener('data', onData);
  163. this._localStream.removeListener('end', onEnd);
  164. this._localStream.removeListener('error', onError);
  165. };
  166. connection.stream.on('pause', onPause);
  167. connection.stream.on('drain', onDrain);
  168. this._localStream.on('data', onData);
  169. this._localStream.on('end', onEnd);
  170. this._localStream.on('error', onError);
  171. connection.once('error', onConnectionError);
  172. return this.infileOk;
  173. }
  174. readField(packet, connection) {
  175. this._receivedFieldsCount++;
  176. // Often there is much more data in the column definition than in the row itself
  177. // If you set manually _fields[0] to array of ColumnDefinition's (from previous call)
  178. // you can 'cache' result of parsing. Field packets still received, but ignored in that case
  179. // this is the reason _receivedFieldsCount exist (otherwise we could just use current length of fields array)
  180. if (this._fields[this._resultIndex].length !== this._fieldCount) {
  181. const field = new Packets.ColumnDefinition(
  182. packet,
  183. connection.clientEncoding
  184. );
  185. this._fields[this._resultIndex].push(field);
  186. if (connection.config.debug) {
  187. console.log(' Column definition:');
  188. console.log(` name: ${field.name}`);
  189. console.log(` type: ${field.columnType}`);
  190. console.log(` flags: ${field.flags}`);
  191. }
  192. }
  193. // last field received
  194. if (this._receivedFieldsCount === this._fieldCount) {
  195. const fields = this._fields[this._resultIndex];
  196. this.emit('fields', fields);
  197. if (this.options.disableEval) {
  198. this._rowParser = staticParser(fields, this.options, connection.config);
  199. } else {
  200. this._rowParser = new (getTextParser(
  201. fields,
  202. this.options,
  203. connection.config
  204. ))(fields);
  205. }
  206. return Query.prototype.fieldsEOF;
  207. }
  208. return Query.prototype.readField;
  209. }
  210. fieldsEOF(packet, connection) {
  211. // check EOF
  212. if (!packet.isEOF()) {
  213. return connection.protocolError('Expected EOF packet');
  214. }
  215. return this.row;
  216. }
  217. row(packet, _connection) {
  218. if (packet.isEOF()) {
  219. const status = packet.eofStatusFlags();
  220. const moreResults = status & ServerStatus.SERVER_MORE_RESULTS_EXISTS;
  221. if (moreResults) {
  222. this._resultIndex++;
  223. return Query.prototype.resultsetHeader;
  224. }
  225. return this.done();
  226. }
  227. let row;
  228. try {
  229. row = this._rowParser.next(
  230. packet,
  231. this._fields[this._resultIndex],
  232. this.options
  233. );
  234. } catch (err) {
  235. this._localStreamError = err;
  236. return this.doneInsert(null);
  237. }
  238. if (this.onResult) {
  239. this._rows[this._resultIndex].push(row);
  240. } else {
  241. this.emit('result', row, this._resultIndex);
  242. }
  243. return Query.prototype.row;
  244. }
  245. infileOk(packet, connection) {
  246. const rs = new Packets.ResultSetHeader(packet, connection);
  247. return this.doneInsert(rs);
  248. }
  249. stream(options) {
  250. options = options || Object.create(null);
  251. options.objectMode = true;
  252. const stream = new Readable({
  253. ...options,
  254. emitClose: true,
  255. autoDestroy: true,
  256. read: () => {
  257. this._connection && this._connection.resume();
  258. },
  259. });
  260. // Prevent a breaking change for users that rely on `end` event
  261. stream.once('close', () => {
  262. if (!stream.readableEnded) {
  263. stream.emit('end');
  264. }
  265. });
  266. const onResult = (row, index) => {
  267. if (stream.destroyed) return;
  268. if (!stream.push(row)) {
  269. this._connection && this._connection.pause();
  270. }
  271. stream.emit('result', row, index); // replicate old emitter
  272. };
  273. const onFields = (fields) => {
  274. if (stream.destroyed) return;
  275. stream.emit('fields', fields); // replicate old emitter
  276. };
  277. const onEnd = () => {
  278. if (stream.destroyed) return;
  279. stream.push(null); // pushing null, indicating EOF
  280. };
  281. const onError = (err) => {
  282. stream.destroy(err);
  283. };
  284. stream._destroy = (err, cb) => {
  285. this._connection && this._connection.resume();
  286. this.removeListener('result', onResult);
  287. this.removeListener('fields', onFields);
  288. this.removeListener('end', onEnd);
  289. this.removeListener('error', onError);
  290. cb(err); // Pass on any errors
  291. };
  292. this.on('result', onResult);
  293. this.on('fields', onFields);
  294. this.on('end', onEnd);
  295. this.on('error', onError);
  296. return stream;
  297. }
  298. _setTimeout() {
  299. if (this.timeout) {
  300. const timeoutHandler = this._handleTimeoutError.bind(this);
  301. this.queryTimeout = Timers.setTimeout(timeoutHandler, this.timeout);
  302. }
  303. }
  304. _handleTimeoutError() {
  305. if (this.queryTimeout) {
  306. Timers.clearTimeout(this.queryTimeout);
  307. this.queryTimeout = null;
  308. }
  309. const err = new Error('Query inactivity timeout');
  310. err.errorno = 'PROTOCOL_SEQUENCE_TIMEOUT';
  311. err.code = 'PROTOCOL_SEQUENCE_TIMEOUT';
  312. err.syscall = 'query';
  313. if (this.onResult) {
  314. this.onResult(err);
  315. } else {
  316. this.emit('error', err);
  317. }
  318. }
  319. }
  320. Query.prototype.catch = Query.prototype.then;
  321. module.exports = Query;