Compare commits

...

7 Commits

Author SHA1 Message Date
Erik Johnston
10a67f0d69 Don't spuriously create new listeners 2015-06-18 11:12:03 +01:00
Erik Johnston
9bc36b7f31 Don't reuse name 2015-06-18 10:57:42 +01:00
Erik Johnston
c59e904839 Mark as notified 2015-06-18 10:52:00 +01:00
Erik Johnston
e70d484e1c Keep track of previous listeners 2015-06-17 16:20:51 +01:00
Erik Johnston
6844bb8a6f Paranoia try..except 2015-06-17 15:38:46 +01:00
Erik Johnston
30b53812de Store timeout 2015-06-17 15:29:31 +01:00
Erik Johnston
bddacb6dd1 Add some things to help debug notifer leak 2015-06-17 15:15:51 +01:00

View File

@@ -21,6 +21,7 @@ from synapse.types import StreamToken
import synapse.metrics
import logging
import time
logger = logging.getLogger(__name__)
@@ -46,8 +47,11 @@ class _NotificationListener(object):
notify the handler it is sufficient to resolve the deferred.
"""
def __init__(self, deferred):
def __init__(self, deferred, timeout):
self.deferred = deferred
self.created = int(time.time() * 1000)
self.timeout = timeout
self.have_notified = False
def notified(self):
return self.deferred.called
@@ -55,6 +59,7 @@ class _NotificationListener(object):
def notify(self, token):
""" Inform whoever is listening about the new events.
"""
self.have_notified = True
try:
self.deferred.callback(token)
except defer.AlreadyCalledError:
@@ -96,7 +101,10 @@ class _NotifierUserStream(object):
listeners = self.listeners
self.listeners = set()
for listener in listeners:
listener.notify(self.current_token)
try:
listener.notify(self.current_token)
except:
logger.exception("Failed to notify listener")
def remove(self, notifier):
""" Remove this listener from all the indexes in the Notifier
@@ -308,10 +316,10 @@ class Notifier(object):
else:
current_token = user_stream.current_token
listener = [_NotificationListener(deferred)]
listeners = [_NotificationListener(deferred, timeout)]
if timeout and not current_token.is_after(from_token):
user_stream.listeners.add(listener[0])
user_stream.listeners.update(listeners)
if current_token.is_after(from_token):
result = yield callback(from_token, current_token)
@@ -321,7 +329,7 @@ class Notifier(object):
timer = [None]
if result:
user_stream.listeners.discard(listener[0])
user_stream.listeners.difference_update(listeners)
defer.returnValue(result)
return
@@ -331,8 +339,9 @@ class Notifier(object):
def _timeout_listener():
timed_out[0] = True
timer[0] = None
user_stream.listeners.discard(listener[0])
listener[0].notify(current_token)
user_stream.listeners.difference_update(listeners)
for listener in listeners:
listener.notify(current_token)
# We create multiple notification listeners so we have to manage
# canceling the timeout ourselves.
@@ -340,12 +349,16 @@ class Notifier(object):
while not result and not timed_out[0]:
new_token = yield deferred
deferred = defer.Deferred()
listener[0] = _NotificationListener(deferred)
user_stream.listeners.add(listener[0])
result = yield callback(current_token, new_token)
current_token = new_token
if not result:
deferred = defer.Deferred()
listener = _NotificationListener(deferred, timeout)
listeners.append(listener)
user_stream.listeners.add(listener)
if timer[0] is not None:
try:
self.clock.cancel_call_later(timer[0])