11'use strict' ;
22
3+ const maybePromise = require ( '../utils' ) . maybePromise ;
34const MongoError = require ( '../core/error' ) . MongoError ;
45const Aspect = require ( './operation' ) . Aspect ;
56const OperationBase = require ( './operation' ) . OperationBase ;
@@ -21,7 +22,7 @@ const isUnifiedTopology = require('../core/utils').isUnifiedTopology;
2122 * @param {Operation } operation The operation to execute
2223 * @param {function } callback The command result callback
2324 */
24- function executeOperation ( topology , operation , callback ) {
25+ function executeOperation ( topology , operation , cb ) {
2526 if ( topology == null ) {
2627 throw new TypeError ( 'This method requires a valid topology instance' ) ;
2728 }
@@ -30,64 +31,57 @@ function executeOperation(topology, operation, callback) {
3031 throw new TypeError ( 'This method requires a valid operation instance' ) ;
3132 }
3233
33- if ( isUnifiedTopology ( topology ) && topology . shouldCheckForSessionSupport ( ) ) {
34- return selectServerForSessionSupport ( topology , operation , callback ) ;
35- }
36-
37- const Promise = topology . s . promiseLibrary ;
38-
39- // The driver sessions spec mandates that we implicitly create sessions for operations
40- // that are not explicitly provided with a session.
41- let session , owner ;
42- if ( topology . hasSessionSupport ( ) ) {
43- if ( operation . session == null ) {
44- owner = Symbol ( ) ;
45- session = topology . startSession ( { owner } ) ;
46- operation . session = session ;
47- } else if ( operation . session . hasEnded ) {
48- throw new MongoError ( 'Use of expired sessions is not permitted' ) ;
34+ return maybePromise ( topology , cb , callback => {
35+ if ( isUnifiedTopology ( topology ) && topology . shouldCheckForSessionSupport ( ) ) {
36+ // Recursive call to executeOperation after a server selection
37+ return selectServerForSessionSupport ( topology , operation , callback ) ;
4938 }
50- }
51-
52- let result ;
53- if ( typeof callback !== 'function' ) {
54- result = new Promise ( ( resolve , reject ) => {
55- callback = ( err , res ) => {
56- if ( err ) return reject ( err ) ;
57- resolve ( res ) ;
58- } ;
59- } ) ;
60- }
6139
62- function executeCallback ( err , result ) {
63- if ( session && session . owner === owner ) {
64- session . endSession ( ) ;
65- if ( operation . session === session ) {
66- operation . clearSession ( ) ;
40+ // The driver sessions spec mandates that we implicitly create sessions for operations
41+ // that are not explicitly provided with a session.
42+ let session , owner ;
43+ if ( topology . hasSessionSupport ( ) ) {
44+ if ( operation . session == null ) {
45+ owner = Symbol ( ) ;
46+ session = topology . startSession ( { owner } ) ;
47+ operation . session = session ;
48+ } else if ( operation . session . hasEnded ) {
49+ return callback ( new MongoError ( 'Use of expired sessions is not permitted' ) ) ;
6750 }
51+ } else if ( operation . session ) {
52+ // If the user passed an explicit session and we are still, after server selection,
53+ // trying to run against a topology that doesn't support sessions we error out.
54+ return callback ( new MongoError ( 'Current topology does not support sessions' ) ) ;
6855 }
6956
70- callback ( err , result ) ;
71- }
72-
73- try {
74- if ( operation . hasAspect ( Aspect . EXECUTE_WITH_SELECTION ) ) {
75- executeWithServerSelection ( topology , operation , executeCallback ) ;
76- } else {
77- operation . execute ( executeCallback ) ;
78- }
79- } catch ( e ) {
80- if ( session && session . owner === owner ) {
81- session . endSession ( ) ;
82- if ( operation . session === session ) {
83- operation . clearSession ( ) ;
57+ function executeCallback ( err , result ) {
58+ if ( session && session . owner === owner ) {
59+ session . endSession ( ) ;
60+ if ( operation . session === session ) {
61+ operation . clearSession ( ) ;
62+ }
8463 }
64+
65+ callback ( err , result ) ;
8566 }
8667
87- throw e ;
88- }
68+ try {
69+ if ( operation . hasAspect ( Aspect . EXECUTE_WITH_SELECTION ) ) {
70+ executeWithServerSelection ( topology , operation , executeCallback ) ;
71+ } else {
72+ operation . execute ( executeCallback ) ;
73+ }
74+ } catch ( error ) {
75+ if ( session && session . owner === owner ) {
76+ session . endSession ( ) ;
77+ if ( operation . session === session ) {
78+ operation . clearSession ( ) ;
79+ }
80+ }
8981
90- return result ;
82+ callback ( error ) ;
83+ }
84+ } ) ;
9185}
9286
9387function supportsRetryableReads ( server ) {
@@ -139,7 +133,6 @@ function executeWithServerSelection(topology, operation, callback) {
139133 callback ( err , null ) ;
140134 return ;
141135 }
142-
143136 const shouldRetryReads =
144137 topology . s . options . retryReads !== false &&
145138 operation . session &&
@@ -156,31 +149,16 @@ function executeWithServerSelection(topology, operation, callback) {
156149 } ) ;
157150}
158151
159- // TODO: This is only supported for unified topology, it should go away once
160- // we remove support for legacy topology types.
152+ // The Unified Topology runs serverSelection before executing every operation
153+ // Session support is determined by the result of a monitoring check triggered by this selection
161154function selectServerForSessionSupport ( topology , operation , callback ) {
162- const Promise = topology . s . promiseLibrary ;
163-
164- let result ;
165- if ( typeof callback !== 'function' ) {
166- result = new Promise ( ( resolve , reject ) => {
167- callback = ( err , result ) => {
168- if ( err ) return reject ( err ) ;
169- resolve ( result ) ;
170- } ;
171- } ) ;
172- }
173-
174155 topology . selectServer ( ReadPreference . primaryPreferred , err => {
175156 if ( err ) {
176- callback ( err ) ;
177- return ;
157+ return callback ( err ) ;
178158 }
179159
180160 executeOperation ( topology , operation , callback ) ;
181161 } ) ;
182-
183- return result ;
184162}
185163
186164module . exports = executeOperation ;
0 commit comments