@@ -3546,30 +3546,6 @@ class UserTypeDoesNotExist(Exception):
35463546 pass
35473547
35483548
3549- class _ControlReconnectionHandler (_ReconnectionHandler ):
3550- """
3551- Internal
3552- """
3553-
3554- def __init__ (self , control_connection , * args , ** kwargs ):
3555- _ReconnectionHandler .__init__ (self , * args , ** kwargs )
3556- self .control_connection = weakref .proxy (control_connection )
3557-
3558- def try_reconnect (self ):
3559- return self .control_connection ._reconnect_internal ()
3560-
3561- def on_reconnection (self , connection ):
3562- self .control_connection ._set_new_connection (connection )
3563-
3564- def on_exception (self , exc , next_delay ):
3565- # TODO only overridden to add logging, so add logging
3566- if isinstance (exc , AuthenticationFailed ):
3567- return False
3568- else :
3569- log .debug ("Error trying to reconnect control connection: %r" , exc )
3570- return True
3571-
3572-
35733549def _watch_callback (obj_weakref , method_name , * args , ** kwargs ):
35743550 """
35753551 A callback handler for the ControlConnection that tolerates
@@ -3662,6 +3638,7 @@ def __init__(self, cluster, timeout,
36623638
36633639 self ._reconnection_handler = None
36643640 self ._reconnection_lock = RLock ()
3641+ self ._reconnection_pending = False
36653642
36663643 self ._event_schedule_times = {}
36673644
@@ -3818,33 +3795,43 @@ def reconnect(self):
38183795 if self ._is_shutdown :
38193796 return
38203797
3798+ if self ._reconnection_pending :
3799+ return
3800+ self ._reconnection_pending = True
3801+
38213802 self ._submit (self ._reconnect )
38223803
3823- def _reconnect (self ):
3804+ def _reconnect (self , schedule = None ):
38243805 log .debug ("[control connection] Attempting to reconnect" )
3806+ if self ._is_shutdown :
3807+ return
3808+
38253809 try :
38263810 self ._set_new_connection (self ._reconnect_internal ())
3811+ self ._reconnection_pending = False
3812+ return
38273813 except NoHostAvailable :
3828- # make a retry schedule (which includes backoff)
3829- schedule = self ._cluster .reconnection_policy .new_schedule ()
3814+ log .debug ("[control connection] Reconnection plan is exhausted, scheduling new reconnection attempt" )
3815+ except Exception as ex :
3816+ log .debug ("[control connection] Unexpected exception during reconnect, scheduling new reconnection attempt: %s" , ex )
38303817
3831- with self ._reconnection_lock :
3818+ if schedule is None :
3819+ schedule = self ._cluster .reconnection_policy .new_schedule ()
38323820
3833- # cancel existing reconnection attempts
3834- if self ._reconnection_handler :
3835- self ._reconnection_handler .cancel ()
3821+ try :
3822+ next_delay = next (schedule )
3823+ except StopIteration :
3824+ # the schedule has been exhausted
3825+ schedule = self ._cluster .reconnection_policy .new_schedule ()
3826+ try :
3827+ next_delay = next (schedule )
3828+ except StopIteration :
3829+ next_delay = 0
38363830
3837- # when a connection is successfully made, _set_new_connection
3838- # will be called with the new connection and then our
3839- # _reconnection_handler will be cleared out
3840- self ._reconnection_handler = _ControlReconnectionHandler (
3841- self , self ._cluster .scheduler , schedule ,
3842- self ._get_and_set_reconnection_handler ,
3843- new_handler = None )
3844- self ._reconnection_handler .start ()
3845- except Exception :
3846- log .debug ("[control connection] error reconnecting" , exc_info = True )
3847- raise
3831+ if next_delay == 0 :
3832+ self ._submit (self ._reconnect )
3833+ else :
3834+ self ._cluster .scheduler .schedule (next_delay , partial (self ._reconnect , schedule ))
38483835
38493836 def _get_and_set_reconnection_handler (self , new_handler ):
38503837 """
0 commit comments