connection.js 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988
  1. // This file was modified by Oracle on June 1, 2021.
  2. // The changes involve new logic to handle an additional ERR Packet sent by
  3. // the MySQL server when the connection is closed unexpectedly.
  4. // Modifications copyright (c) 2021, Oracle and/or its affiliates.
  5. // This file was modified by Oracle on June 17, 2021.
  6. // The changes involve logic to ensure the socket connection is closed when
  7. // there is a fatal error.
  8. // Modifications copyright (c) 2021, Oracle and/or its affiliates.
  9. // This file was modified by Oracle on September 21, 2021.
  10. // The changes involve passing additional authentication factor passwords
  11. // to the ChangeUser Command instance.
  12. // Modifications copyright (c) 2021, Oracle and/or its affiliates.
  13. 'use strict';
  14. const Net = require('net');
  15. const Tls = require('tls');
  16. const Timers = require('timers');
  17. const EventEmitter = require('events').EventEmitter;
  18. const Readable = require('stream').Readable;
  19. const Queue = require('denque');
  20. const SqlString = require('sql-escaper');
  21. const { createLRU } = require('lru.min');
  22. const PacketParser = require('../packet_parser.js');
  23. const Packets = require('../packets/index.js');
  24. const Commands = require('../commands/index.js');
  25. const ConnectionConfig = require('../connection_config.js');
  26. const CharsetToEncoding = require('../constants/charset_encodings.js');
  27. let _connectionId = 0;
  28. let convertNamedPlaceholders = null;
  29. class BaseConnection extends EventEmitter {
  30. constructor(opts) {
  31. super();
  32. this.config = opts.config;
  33. // TODO: fill defaults
  34. // if no params, connect to /var/lib/mysql/mysql.sock ( /tmp/mysql.sock on OSX )
  35. // if host is given, connect to host:3306
  36. // TODO: use `/usr/local/mysql/bin/mysql_config --socket` output? as default socketPath
  37. // if there is no host/port and no socketPath parameters?
  38. if (!opts.config.stream) {
  39. if (opts.config.socketPath) {
  40. this.stream = Net.connect(opts.config.socketPath);
  41. } else {
  42. this.stream = Net.connect(opts.config.port, opts.config.host);
  43. // Optionally enable keep-alive on the socket.
  44. if (this.config.enableKeepAlive) {
  45. this.stream.on('connect', () => {
  46. this.stream.setKeepAlive(true, this.config.keepAliveInitialDelay);
  47. });
  48. }
  49. // Enable TCP_NODELAY flag. This is needed so that the network packets
  50. // are sent immediately to the server
  51. this.stream.setNoDelay(true);
  52. }
  53. // if stream is a function, treat it as "stream agent / factory"
  54. } else if (typeof opts.config.stream === 'function') {
  55. this.stream = opts.config.stream(opts);
  56. } else {
  57. this.stream = opts.config.stream;
  58. }
  59. this._internalId = _connectionId++;
  60. this._commands = new Queue();
  61. this._command = null;
  62. this._paused = false;
  63. this._paused_packets = new Queue();
  64. this._statements = createLRU({
  65. max: this.config.maxPreparedStatements,
  66. onEviction: function (_, statement) {
  67. statement.close();
  68. },
  69. });
  70. this.serverCapabilityFlags = 0;
  71. this.authorized = false;
  72. this.sequenceId = 0;
  73. this.compressedSequenceId = 0;
  74. this.threadId = null;
  75. this._handshakePacket = null;
  76. this._fatalError = null;
  77. this._protocolError = null;
  78. this._outOfOrderPackets = [];
  79. this.clientEncoding = CharsetToEncoding[this.config.charsetNumber];
  80. this.stream.on('error', this._handleNetworkError.bind(this));
  81. // see https://gist.github.com/khoomeister/4985691#use-that-instead-of-bind
  82. this.packetParser = new PacketParser((p) => {
  83. this.handlePacket(p);
  84. });
  85. this.stream.on('data', (data) => {
  86. if (this.connectTimeout) {
  87. Timers.clearTimeout(this.connectTimeout);
  88. this.connectTimeout = null;
  89. }
  90. this.packetParser.execute(data);
  91. });
  92. this.stream.on('end', () => {
  93. // emit the end event so that the pooled connection can close the connection
  94. this.emit('end');
  95. });
  96. this.stream.on('close', () => {
  97. // we need to set this flag everywhere where we want connection to close
  98. if (this._closing) {
  99. return;
  100. }
  101. if (!this._protocolError) {
  102. // no particular error message before disconnect
  103. this._protocolError = new Error(
  104. 'Connection lost: The server closed the connection.'
  105. );
  106. this._protocolError.fatal = true;
  107. this._protocolError.code = 'PROTOCOL_CONNECTION_LOST';
  108. }
  109. this._notifyError(this._protocolError);
  110. });
  111. let handshakeCommand;
  112. if (!this.config.isServer) {
  113. handshakeCommand = new Commands.ClientHandshake(this.config.clientFlags);
  114. handshakeCommand.on('end', () => {
  115. // this happens when handshake finishes early either because there was
  116. // some fatal error or the server sent an error packet instead of
  117. // an hello packet (for example, 'Too many connections' error)
  118. if (
  119. !handshakeCommand.handshake ||
  120. this._fatalError ||
  121. this._protocolError
  122. ) {
  123. return;
  124. }
  125. this._handshakePacket = handshakeCommand.handshake;
  126. this.threadId = handshakeCommand.handshake.connectionId;
  127. this.emit('connect', handshakeCommand.handshake);
  128. });
  129. handshakeCommand.on('error', (err) => {
  130. this._closing = true;
  131. this._notifyError(err);
  132. });
  133. this.addCommand(handshakeCommand);
  134. }
  135. // in case there was no initial handshake but we need to read sting, assume it utf-8
  136. // most common example: "Too many connections" error ( packet is sent immediately on connection attempt, we don't know server encoding yet)
  137. // will be overwritten with actual encoding value as soon as server handshake packet is received
  138. this.serverEncoding = 'utf8';
  139. if (this.config.connectTimeout) {
  140. const timeoutHandler = this._handleTimeoutError.bind(this);
  141. this.connectTimeout = Timers.setTimeout(
  142. timeoutHandler,
  143. this.config.connectTimeout
  144. );
  145. }
  146. }
  147. _addCommandClosedState(cmd) {
  148. const err = new Error(
  149. "Can't add new command when connection is in closed state"
  150. );
  151. err.fatal = true;
  152. if (cmd.onResult) {
  153. cmd.onResult(err);
  154. } else {
  155. this.emit('error', err);
  156. }
  157. }
  158. _handleFatalError(err) {
  159. err.fatal = true;
  160. // stop receiving packets
  161. this.stream.removeAllListeners('data');
  162. this.addCommand = this._addCommandClosedState;
  163. this.write = () => {
  164. this.emit('error', new Error("Can't write in closed state"));
  165. };
  166. this._notifyError(err);
  167. this._fatalError = err;
  168. }
  169. _handleNetworkError(err) {
  170. if (this.connectTimeout) {
  171. Timers.clearTimeout(this.connectTimeout);
  172. this.connectTimeout = null;
  173. }
  174. // Do not throw an error when a connection ends with a RST,ACK packet
  175. if (err.code === 'ECONNRESET' && this._closing) {
  176. return;
  177. }
  178. this._handleFatalError(err);
  179. }
  180. _handleTimeoutError() {
  181. if (this.connectTimeout) {
  182. Timers.clearTimeout(this.connectTimeout);
  183. this.connectTimeout = null;
  184. }
  185. this.stream.destroy && this.stream.destroy();
  186. const err = new Error('connect ETIMEDOUT');
  187. err.errorno = 'ETIMEDOUT';
  188. err.code = 'ETIMEDOUT';
  189. err.syscall = 'connect';
  190. this._handleNetworkError(err);
  191. }
  192. // notify all commands in the queue and bubble error as connection "error"
  193. // called on stream error or unexpected termination
  194. _notifyError(err) {
  195. if (this.connectTimeout) {
  196. Timers.clearTimeout(this.connectTimeout);
  197. this.connectTimeout = null;
  198. }
  199. // prevent from emitting 'PROTOCOL_CONNECTION_LOST' after EPIPE or ECONNRESET
  200. if (this._fatalError) {
  201. return;
  202. }
  203. let command;
  204. // if there is no active command, notify connection
  205. // if there are commands and all of them have callbacks, pass error via callback
  206. let bubbleErrorToConnection = !this._command;
  207. if (this._command && this._command.onResult) {
  208. this._command.onResult(err);
  209. this._command = null;
  210. // connection handshake is special because we allow it to be implicit
  211. // if error happened during handshake, but there are others commands in queue
  212. // then bubble error to other commands and not to connection
  213. } else if (
  214. !(
  215. this._command &&
  216. this._command.constructor === Commands.ClientHandshake &&
  217. this._commands.length > 0
  218. )
  219. ) {
  220. bubbleErrorToConnection = true;
  221. }
  222. while ((command = this._commands.shift())) {
  223. if (command.onResult) {
  224. command.onResult(err);
  225. } else {
  226. bubbleErrorToConnection = true;
  227. }
  228. }
  229. // notify connection if some comands in the queue did not have callbacks
  230. // or if this is pool connection ( so it can be removed from pool )
  231. if (bubbleErrorToConnection || this._pool) {
  232. this.emit('error', err);
  233. }
  234. // close connection after emitting the event in case of a fatal error
  235. if (err.fatal) {
  236. this.close();
  237. }
  238. }
  239. write(buffer) {
  240. const result = this.stream.write(buffer, (err) => {
  241. if (err) {
  242. this._handleNetworkError(err);
  243. }
  244. });
  245. if (!result) {
  246. this.stream.emit('pause');
  247. }
  248. }
  249. // http://dev.mysql.com/doc/internals/en/sequence-id.html
  250. //
  251. // The sequence-id is incremented with each packet and may wrap around.
  252. // It starts at 0 and is reset to 0 when a new command
  253. // begins in the Command Phase.
  254. // http://dev.mysql.com/doc/internals/en/example-several-mysql-packets.html
  255. _resetSequenceId() {
  256. this.sequenceId = 0;
  257. this.compressedSequenceId = 0;
  258. }
  259. _bumpCompressedSequenceId(numPackets) {
  260. this.compressedSequenceId += numPackets;
  261. this.compressedSequenceId %= 256;
  262. }
  263. _bumpSequenceId(numPackets) {
  264. this.sequenceId += numPackets;
  265. this.sequenceId %= 256;
  266. }
  267. writePacket(packet) {
  268. const MAX_PACKET_LENGTH = 16777215;
  269. const length = packet.length();
  270. let chunk, offset, header;
  271. if (length < MAX_PACKET_LENGTH) {
  272. packet.writeHeader(this.sequenceId);
  273. if (this.config.debug) {
  274. console.log(
  275. `${this._internalId} ${this.connectionId} <== ${this._command._commandName}#${this._command.stateName()}(${[this.sequenceId, packet._name, packet.length()].join(',')})`
  276. );
  277. console.log(
  278. `${this._internalId} ${this.connectionId} <== ${packet.buffer.toString('hex')}`
  279. );
  280. }
  281. this._bumpSequenceId(1);
  282. this.write(packet.buffer);
  283. } else {
  284. if (this.config.debug) {
  285. console.log(
  286. `${this._internalId} ${this.connectionId} <== Writing large packet, raw content not written:`
  287. );
  288. console.log(
  289. `${this._internalId} ${this.connectionId} <== ${this._command._commandName}#${this._command.stateName()}(${[this.sequenceId, packet._name, packet.length()].join(',')})`
  290. );
  291. }
  292. for (offset = 4; offset < 4 + length; offset += MAX_PACKET_LENGTH) {
  293. chunk = packet.buffer.slice(offset, offset + MAX_PACKET_LENGTH);
  294. if (chunk.length === MAX_PACKET_LENGTH) {
  295. header = Buffer.from([0xff, 0xff, 0xff, this.sequenceId]);
  296. } else {
  297. header = Buffer.from([
  298. chunk.length & 0xff,
  299. (chunk.length >> 8) & 0xff,
  300. (chunk.length >> 16) & 0xff,
  301. this.sequenceId,
  302. ]);
  303. }
  304. this._bumpSequenceId(1);
  305. this.write(header);
  306. this.write(chunk);
  307. }
  308. }
  309. }
  310. // 0.11+ environment
  311. startTLS(onSecure) {
  312. if (this.config.debug) {
  313. console.log('Upgrading connection to TLS');
  314. }
  315. const secureContext = Tls.createSecureContext({
  316. ca: this.config.ssl.ca,
  317. cert: this.config.ssl.cert,
  318. ciphers: this.config.ssl.ciphers,
  319. key: this.config.ssl.key,
  320. passphrase: this.config.ssl.passphrase,
  321. minVersion: this.config.ssl.minVersion,
  322. maxVersion: this.config.ssl.maxVersion,
  323. });
  324. const rejectUnauthorized = this.config.ssl.rejectUnauthorized;
  325. const verifyIdentity = this.config.ssl.verifyIdentity;
  326. const servername = Net.isIP(this.config.host)
  327. ? undefined
  328. : this.config.host;
  329. let secureEstablished = false;
  330. this.stream.removeAllListeners('data');
  331. const secureSocket = Tls.connect(
  332. {
  333. rejectUnauthorized,
  334. requestCert: rejectUnauthorized,
  335. checkServerIdentity: verifyIdentity
  336. ? Tls.checkServerIdentity
  337. : function () {
  338. return undefined;
  339. },
  340. secureContext,
  341. isServer: false,
  342. socket: this.stream,
  343. servername,
  344. },
  345. () => {
  346. secureEstablished = true;
  347. if (rejectUnauthorized) {
  348. if (typeof servername === 'string' && verifyIdentity) {
  349. const cert = secureSocket.getPeerCertificate(true);
  350. const serverIdentityCheckError = Tls.checkServerIdentity(
  351. servername,
  352. cert
  353. );
  354. if (serverIdentityCheckError) {
  355. onSecure(serverIdentityCheckError);
  356. return;
  357. }
  358. }
  359. }
  360. onSecure();
  361. }
  362. );
  363. // error handler for secure socket
  364. secureSocket.on('error', (err) => {
  365. if (secureEstablished) {
  366. this._handleNetworkError(err);
  367. } else {
  368. onSecure(err);
  369. }
  370. });
  371. secureSocket.on('data', (data) => {
  372. this.packetParser.execute(data);
  373. });
  374. this.stream = secureSocket;
  375. }
  376. protocolError(message, code) {
  377. // Starting with MySQL 8.0.24, if the client closes the connection
  378. // unexpectedly, the server will send a last ERR Packet, which we can
  379. // safely ignore.
  380. // https://dev.mysql.com/worklog/task/?id=12999
  381. if (this._closing) {
  382. return;
  383. }
  384. const err = new Error(message);
  385. err.fatal = true;
  386. err.code = code || 'PROTOCOL_ERROR';
  387. this.emit('error', err);
  388. }
  389. get state() {
  390. // Error state has highest priority
  391. if (this._fatalError || this._protocolError) {
  392. return 'error';
  393. }
  394. // Closing state has second priority
  395. if (this._closing || (this.stream && this.stream.destroyed)) {
  396. return 'disconnected';
  397. }
  398. // Authenticated state has third priority
  399. if (this.authorized) {
  400. return 'authenticated';
  401. }
  402. // Connected state: handshake completed but not yet authorized
  403. // This matches the original mysql driver's 'connected' state
  404. if (this._handshakePacket) {
  405. return 'connected';
  406. }
  407. // Protocol handshake state: connection established, handshake in progress
  408. if (this.stream && !this.stream.destroyed) {
  409. return 'protocol_handshake';
  410. }
  411. // Default: not connected
  412. return 'disconnected';
  413. }
  414. get fatalError() {
  415. return this._fatalError;
  416. }
  417. handlePacket(packet) {
  418. if (this._paused) {
  419. this._paused_packets.push(packet);
  420. return;
  421. }
  422. if (this.config.debug) {
  423. if (packet) {
  424. console.log(
  425. ` raw: ${packet.buffer
  426. .slice(packet.offset, packet.offset + packet.length())
  427. .toString('hex')}`
  428. );
  429. console.trace();
  430. const commandName = this._command
  431. ? this._command._commandName
  432. : '(no command)';
  433. const stateName = this._command
  434. ? this._command.stateName()
  435. : '(no command)';
  436. console.log(
  437. `${this._internalId} ${this.connectionId} ==> ${commandName}#${stateName}(${[packet.sequenceId, packet.type(), packet.length()].join(',')})`
  438. );
  439. }
  440. }
  441. if (!this._command) {
  442. const marker = packet.peekByte();
  443. // If it's an Err Packet, we should use it.
  444. if (marker === 0xff) {
  445. const error = Packets.Error.fromPacket(packet);
  446. this.protocolError(error.message, error.code);
  447. } else {
  448. // Otherwise, it means it's some other unexpected packet.
  449. this.protocolError(
  450. 'Unexpected packet while no commands in the queue',
  451. 'PROTOCOL_UNEXPECTED_PACKET'
  452. );
  453. }
  454. this.close();
  455. return;
  456. }
  457. if (packet) {
  458. // Note: when server closes connection due to inactivity, Err packet ER_CLIENT_INTERACTION_TIMEOUT from MySQL 8.0.24, sequenceId will be 0
  459. if (this.sequenceId !== packet.sequenceId) {
  460. const err = new Error(
  461. `Warning: got packets out of order. Expected ${this.sequenceId} but received ${packet.sequenceId}`
  462. );
  463. err.expected = this.sequenceId;
  464. err.received = packet.sequenceId;
  465. this.emit('warn', err); // REVIEW
  466. console.error(err.message);
  467. }
  468. this._bumpSequenceId(packet.numPackets);
  469. }
  470. try {
  471. if (this._fatalError) {
  472. // skip remaining packets after client is in the error state
  473. return;
  474. }
  475. const done = this._command.execute(packet, this);
  476. if (done) {
  477. this._command = this._commands.shift();
  478. if (this._command) {
  479. this.sequenceId = 0;
  480. this.compressedSequenceId = 0;
  481. this.handlePacket();
  482. }
  483. }
  484. } catch (err) {
  485. this._handleFatalError(err);
  486. this.stream.destroy();
  487. }
  488. }
  489. addCommand(cmd) {
  490. // this.compressedSequenceId = 0;
  491. // this.sequenceId = 0;
  492. if (this.config.debug) {
  493. const commandName = cmd.constructor.name;
  494. console.log(`Add command: ${commandName}`);
  495. cmd._commandName = commandName;
  496. }
  497. if (!this._command) {
  498. this._command = cmd;
  499. this.handlePacket();
  500. } else {
  501. this._commands.push(cmd);
  502. }
  503. return cmd;
  504. }
  505. format(sql, values) {
  506. if (typeof this.config.queryFormat === 'function') {
  507. return this.config.queryFormat.call(
  508. this,
  509. sql,
  510. values,
  511. this.config.timezone
  512. );
  513. }
  514. const opts = {
  515. sql: sql,
  516. values: values,
  517. };
  518. this._resolveNamedPlaceholders(opts);
  519. return SqlString.format(
  520. opts.sql,
  521. opts.values,
  522. this.config.stringifyObjects,
  523. this.config.timezone
  524. );
  525. }
  526. escape(value) {
  527. return SqlString.escape(value, false, this.config.timezone);
  528. }
  529. escapeId(value) {
  530. return SqlString.escapeId(value, false);
  531. }
  532. raw(sql) {
  533. return SqlString.raw(sql);
  534. }
  535. _resolveNamedPlaceholders(options) {
  536. let unnamed;
  537. if (this.config.namedPlaceholders || options.namedPlaceholders) {
  538. if (Array.isArray(options.values)) {
  539. // if an array is provided as the values, assume the conversion is not necessary.
  540. // this allows the usage of unnamed placeholders even if the namedPlaceholders flag is enabled.
  541. return;
  542. }
  543. if (convertNamedPlaceholders === null) {
  544. convertNamedPlaceholders = require('named-placeholders')();
  545. }
  546. unnamed = convertNamedPlaceholders(options.sql, options.values);
  547. options.sql = unnamed[0];
  548. options.values = unnamed[1];
  549. }
  550. }
  551. query(sql, values, cb) {
  552. let cmdQuery;
  553. if (sql.constructor === Commands.Query) {
  554. cmdQuery = sql;
  555. } else {
  556. cmdQuery = BaseConnection.createQuery(sql, values, cb, this.config);
  557. }
  558. this._resolveNamedPlaceholders(cmdQuery);
  559. const rawSql = this.format(
  560. cmdQuery.sql,
  561. cmdQuery.values !== undefined ? cmdQuery.values : []
  562. );
  563. cmdQuery.sql = rawSql;
  564. return this.addCommand(cmdQuery);
  565. }
  566. pause() {
  567. this._paused = true;
  568. this.stream.pause();
  569. }
  570. resume() {
  571. let packet;
  572. this._paused = false;
  573. while ((packet = this._paused_packets.shift())) {
  574. this.handlePacket(packet);
  575. // don't resume if packet handler paused connection
  576. if (this._paused) {
  577. return;
  578. }
  579. }
  580. this.stream.resume();
  581. }
  582. // TODO: named placeholders support
  583. prepare(options, cb) {
  584. if (typeof options === 'string') {
  585. options = { sql: options };
  586. }
  587. return this.addCommand(new Commands.Prepare(options, cb));
  588. }
  589. unprepare(sql) {
  590. let options = {};
  591. if (typeof sql === 'object') {
  592. options = sql;
  593. } else {
  594. options.sql = sql;
  595. }
  596. const key = BaseConnection.statementKey(options);
  597. const stmt = this._statements.get(key);
  598. if (stmt) {
  599. this._statements.delete(key);
  600. stmt.close();
  601. }
  602. return stmt;
  603. }
  604. execute(sql, values, cb) {
  605. let options = {
  606. infileStreamFactory: this.config.infileStreamFactory,
  607. };
  608. if (typeof sql === 'object') {
  609. // execute(options, cb)
  610. options = {
  611. ...options,
  612. ...sql,
  613. sql: sql.sql,
  614. values: sql.values,
  615. };
  616. if (typeof values === 'function') {
  617. cb = values;
  618. } else {
  619. options.values = options.values || values;
  620. }
  621. } else if (typeof values === 'function') {
  622. // execute(sql, cb)
  623. cb = values;
  624. options.sql = sql;
  625. options.values = undefined;
  626. } else {
  627. // execute(sql, values, cb)
  628. options.sql = sql;
  629. options.values = values;
  630. }
  631. this._resolveNamedPlaceholders(options);
  632. // check for values containing undefined
  633. if (options.values) {
  634. //If namedPlaceholder is not enabled and object is passed as bind parameters
  635. if (!Array.isArray(options.values)) {
  636. throw new TypeError(
  637. 'Bind parameters must be array if namedPlaceholders parameter is not enabled'
  638. );
  639. }
  640. options.values.forEach((val) => {
  641. //If namedPlaceholder is not enabled and object is passed as bind parameters
  642. if (!Array.isArray(options.values)) {
  643. throw new TypeError(
  644. 'Bind parameters must be array if namedPlaceholders parameter is not enabled'
  645. );
  646. }
  647. if (val === undefined) {
  648. throw new TypeError(
  649. 'Bind parameters must not contain undefined. To pass SQL NULL specify JS null'
  650. );
  651. }
  652. if (typeof val === 'function') {
  653. throw new TypeError(
  654. 'Bind parameters must not contain function(s). To pass the body of a function as a string call .toString() first'
  655. );
  656. }
  657. });
  658. }
  659. const executeCommand = new Commands.Execute(options, cb);
  660. const prepareCommand = new Commands.Prepare(options, (err, stmt) => {
  661. if (err) {
  662. // skip execute command if prepare failed, we have main
  663. // combined callback here
  664. executeCommand.start = function () {
  665. return null;
  666. };
  667. if (cb) {
  668. cb(err);
  669. } else {
  670. executeCommand.emit('error', err);
  671. }
  672. executeCommand.emit('end');
  673. return;
  674. }
  675. executeCommand.statement = stmt;
  676. });
  677. this.addCommand(prepareCommand);
  678. this.addCommand(executeCommand);
  679. return executeCommand;
  680. }
  681. changeUser(options, callback) {
  682. if (!callback && typeof options === 'function') {
  683. callback = options;
  684. options = {};
  685. }
  686. const charsetNumber = options.charset
  687. ? ConnectionConfig.getCharsetNumber(options.charset)
  688. : this.config.charsetNumber;
  689. return this.addCommand(
  690. new Commands.ChangeUser(
  691. {
  692. user: options.user || this.config.user,
  693. // for the purpose of multi-factor authentication, or not, the main
  694. // password (used for the 1st authentication factor) can also be
  695. // provided via the "password1" option
  696. password:
  697. options.password ||
  698. options.password1 ||
  699. this.config.password ||
  700. this.config.password1,
  701. password2: options.password2 || this.config.password2,
  702. password3: options.password3 || this.config.password3,
  703. passwordSha1: options.passwordSha1 || this.config.passwordSha1,
  704. database: options.database || this.config.database,
  705. timeout: options.timeout,
  706. charsetNumber: charsetNumber,
  707. currentConfig: this.config,
  708. },
  709. (err) => {
  710. if (err) {
  711. err.fatal = true;
  712. }
  713. if (callback) {
  714. callback(err);
  715. }
  716. }
  717. )
  718. );
  719. }
  720. // transaction helpers
  721. beginTransaction(cb) {
  722. return this.query('START TRANSACTION', cb);
  723. }
  724. commit(cb) {
  725. return this.query('COMMIT', cb);
  726. }
  727. rollback(cb) {
  728. return this.query('ROLLBACK', cb);
  729. }
  730. ping(cb) {
  731. return this.addCommand(new Commands.Ping(cb));
  732. }
  733. _registerSlave(opts, cb) {
  734. return this.addCommand(new Commands.RegisterSlave(opts, cb));
  735. }
  736. _binlogDump(opts, cb) {
  737. return this.addCommand(new Commands.BinlogDump(opts, cb));
  738. }
  739. // currently just alias to close
  740. destroy() {
  741. this.close();
  742. }
  743. close() {
  744. if (this.connectTimeout) {
  745. Timers.clearTimeout(this.connectTimeout);
  746. this.connectTimeout = null;
  747. }
  748. this._closing = true;
  749. this.stream.end();
  750. this.addCommand = this._addCommandClosedState;
  751. }
  752. createBinlogStream(opts) {
  753. // TODO: create proper stream class
  754. // TODO: use through2
  755. let test = 1;
  756. const stream = new Readable({ objectMode: true });
  757. stream._read = function () {
  758. return {
  759. data: test++,
  760. };
  761. };
  762. this._registerSlave(opts, () => {
  763. const dumpCmd = this._binlogDump(opts);
  764. dumpCmd.on('event', (ev) => {
  765. stream.push(ev);
  766. });
  767. dumpCmd.on('eof', () => {
  768. stream.push(null);
  769. // if non-blocking, then close stream to prevent errors
  770. if (opts.flags && opts.flags & 0x01) {
  771. this.close();
  772. }
  773. });
  774. // TODO: pipe errors as well
  775. });
  776. return stream;
  777. }
  778. connect(cb) {
  779. if (!cb) {
  780. return;
  781. }
  782. if (this._fatalError || this._protocolError) {
  783. return cb(this._fatalError || this._protocolError);
  784. }
  785. if (this._handshakePacket) {
  786. return cb(null, this);
  787. }
  788. /* eslint-disable prefer-const */
  789. let onError, onConnect;
  790. onError = (param) => {
  791. this.removeListener('connect', onConnect);
  792. cb(param);
  793. };
  794. onConnect = (param) => {
  795. this.removeListener('error', onError);
  796. cb(null, param);
  797. };
  798. /* eslint-enable prefer-const */
  799. this.once('error', onError);
  800. this.once('connect', onConnect);
  801. }
  802. // ===================================
  803. // outgoing server connection methods
  804. // ===================================
  805. writeColumns(columns) {
  806. this.writePacket(Packets.ResultSetHeader.toPacket(columns.length));
  807. columns.forEach((column) => {
  808. this.writePacket(
  809. Packets.ColumnDefinition.toPacket(column, this.serverConfig.encoding)
  810. );
  811. });
  812. this.writeEof();
  813. }
  814. // row is array of columns, not hash
  815. writeTextRow(column) {
  816. this.writePacket(
  817. Packets.TextRow.toPacket(column, this.serverConfig.encoding)
  818. );
  819. }
  820. writeBinaryRow(column) {
  821. this.writePacket(
  822. Packets.BinaryRow.toPacket(column, this.serverConfig.encoding)
  823. );
  824. }
  825. writeTextResult(rows, columns, binary = false) {
  826. this.writeColumns(columns);
  827. rows.forEach((row) => {
  828. const arrayRow = new Array(columns.length);
  829. columns.forEach((column) => {
  830. arrayRow.push(row[column.name]);
  831. });
  832. if (binary) {
  833. this.writeBinaryRow(arrayRow);
  834. } else this.writeTextRow(arrayRow);
  835. });
  836. this.writeEof();
  837. }
  838. writeEof(warnings, statusFlags) {
  839. this.writePacket(Packets.EOF.toPacket(warnings, statusFlags));
  840. }
  841. writeOk(args) {
  842. if (!args) {
  843. args = { affectedRows: 0 };
  844. }
  845. this.writePacket(Packets.OK.toPacket(args, this.serverConfig.encoding));
  846. }
  847. writeError(args) {
  848. // if we want to send error before initial hello was sent, use default encoding
  849. const encoding = this.serverConfig ? this.serverConfig.encoding : 'cesu8';
  850. this.writePacket(Packets.Error.toPacket(args, encoding));
  851. }
  852. serverHandshake(args) {
  853. this.serverConfig = args;
  854. this.serverConfig.encoding =
  855. CharsetToEncoding[this.serverConfig.characterSet];
  856. return this.addCommand(new Commands.ServerHandshake(args));
  857. }
  858. [Symbol.dispose]() {
  859. if (!this._closing) {
  860. this.end();
  861. }
  862. }
  863. // ===============================================================
  864. end(callback) {
  865. if (this.config.isServer) {
  866. this._closing = true;
  867. const quitCmd = new EventEmitter();
  868. setImmediate(() => {
  869. this.stream.end();
  870. quitCmd.emit('end');
  871. });
  872. return quitCmd;
  873. }
  874. // trigger error if more commands enqueued after end command
  875. const quitCmd = this.addCommand(new Commands.Quit(callback));
  876. this.addCommand = this._addCommandClosedState;
  877. return quitCmd;
  878. }
  879. static createQuery(sql, values, cb, config) {
  880. let options = {
  881. rowsAsArray: config.rowsAsArray,
  882. infileStreamFactory: config.infileStreamFactory,
  883. };
  884. if (typeof sql === 'object') {
  885. // query(options, cb)
  886. options = {
  887. ...options,
  888. ...sql,
  889. sql: sql.sql,
  890. values: sql.values,
  891. };
  892. if (typeof values === 'function') {
  893. cb = values;
  894. } else if (values !== undefined) {
  895. options.values = values;
  896. }
  897. } else if (typeof values === 'function') {
  898. // query(sql, cb)
  899. cb = values;
  900. options.sql = sql;
  901. options.values = undefined;
  902. } else {
  903. // query(sql, values, cb)
  904. options.sql = sql;
  905. options.values = values;
  906. }
  907. return new Commands.Query(options, cb);
  908. }
  909. static statementKey(options) {
  910. return `${typeof options.nestTables}/${options.nestTables}/${options.rowsAsArray}${options.sql}`;
  911. }
  912. }
  913. module.exports = BaseConnection;