Compare commits

...

2 Commits

Author SHA1 Message Date
Erik Johnston
1845e4ced6 Newsfile 2024-09-27 15:28:21 +01:00
Erik Johnston
b1f707a802 Fix perf when streams don't change often
There is a bug with the `StreamChangeCache` where it would incorrectly
return that all entities had changed if asked for entities changed
*since* the earliest stream position.
2024-09-27 15:25:14 +01:00
3 changed files with 19 additions and 14 deletions

1
changelog.d/17767.misc Normal file
View File

@@ -0,0 +1 @@
Fix perf when streams don't change often.

View File

@@ -142,9 +142,9 @@ class StreamChangeCache:
""" """
assert isinstance(stream_pos, int) assert isinstance(stream_pos, int)
# _cache is not valid at or before the earliest known stream position, so # _cache is not valid before the earliest known stream position, so
# return that the entity has changed. # return that the entity has changed.
if stream_pos <= self._earliest_known_stream_pos: if stream_pos < self._earliest_known_stream_pos:
self.metrics.inc_misses() self.metrics.inc_misses()
return True return True
@@ -186,7 +186,7 @@ class StreamChangeCache:
This will be all entities if the given stream position is at or earlier This will be all entities if the given stream position is at or earlier
than the earliest known stream position. than the earliest known stream position.
""" """
if not self._cache or stream_pos <= self._earliest_known_stream_pos: if not self._cache or stream_pos < self._earliest_known_stream_pos:
self.metrics.inc_misses() self.metrics.inc_misses()
return set(entities) return set(entities)
@@ -238,9 +238,9 @@ class StreamChangeCache:
""" """
assert isinstance(stream_pos, int) assert isinstance(stream_pos, int)
# _cache is not valid at or before the earliest known stream position, so # _cache is not valid before the earliest known stream position, so
# return that an entity has changed. # return that an entity has changed.
if stream_pos <= self._earliest_known_stream_pos: if stream_pos < self._earliest_known_stream_pos:
self.metrics.inc_misses() self.metrics.inc_misses()
return True return True
@@ -270,9 +270,9 @@ class StreamChangeCache:
""" """
assert isinstance(stream_pos, int) assert isinstance(stream_pos, int)
# _cache is not valid at or before the earliest known stream position, so # _cache is not valid before the earliest known stream position, so
# return None to mark that it is unknown if an entity has changed. # return None to mark that it is unknown if an entity has changed.
if stream_pos <= self._earliest_known_stream_pos: if stream_pos < self._earliest_known_stream_pos:
return AllEntitiesChangedResult(None) return AllEntitiesChangedResult(None)
changed_entities: List[EntityType] = [] changed_entities: List[EntityType] = []

View File

@@ -53,8 +53,8 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
# return True, whether it's a known entity or not. # return True, whether it's a known entity or not.
self.assertTrue(cache.has_entity_changed("user@foo.com", 0)) self.assertTrue(cache.has_entity_changed("user@foo.com", 0))
self.assertTrue(cache.has_entity_changed("not@here.website", 0)) self.assertTrue(cache.has_entity_changed("not@here.website", 0))
self.assertTrue(cache.has_entity_changed("user@foo.com", 3)) self.assertTrue(cache.has_entity_changed("user@foo.com", 2))
self.assertTrue(cache.has_entity_changed("not@here.website", 3)) self.assertTrue(cache.has_entity_changed("not@here.website", 2))
def test_entity_has_changed_pops_off_start(self) -> None: def test_entity_has_changed_pops_off_start(self) -> None:
""" """
@@ -76,9 +76,11 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
self.assertTrue("user@foo.com" not in cache._entity_to_key) self.assertTrue("user@foo.com" not in cache._entity_to_key)
self.assertEqual( self.assertEqual(
cache.get_all_entities_changed(3).entities, ["user@elsewhere.org"] cache.get_all_entities_changed(2).entities,
["bar@baz.net", "user@elsewhere.org"],
) )
self.assertFalse(cache.get_all_entities_changed(2).hit) self.assertFalse(cache.get_all_entities_changed(1).hit)
self.assertTrue(cache.get_all_entities_changed(2).hit)
# If we update an existing entity, it keeps the two existing entities # If we update an existing entity, it keeps the two existing entities
cache.entity_has_changed("bar@baz.net", 5) cache.entity_has_changed("bar@baz.net", 5)
@@ -89,7 +91,8 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
cache.get_all_entities_changed(3).entities, cache.get_all_entities_changed(3).entities,
["user@elsewhere.org", "bar@baz.net"], ["user@elsewhere.org", "bar@baz.net"],
) )
self.assertFalse(cache.get_all_entities_changed(2).hit) self.assertFalse(cache.get_all_entities_changed(1).hit)
self.assertTrue(cache.get_all_entities_changed(2).hit)
def test_get_all_entities_changed(self) -> None: def test_get_all_entities_changed(self) -> None:
""" """
@@ -114,7 +117,8 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
self.assertEqual( self.assertEqual(
cache.get_all_entities_changed(3).entities, ["user@elsewhere.org"] cache.get_all_entities_changed(3).entities, ["user@elsewhere.org"]
) )
self.assertFalse(cache.get_all_entities_changed(1).hit) self.assertFalse(cache.get_all_entities_changed(0).hit)
self.assertTrue(cache.get_all_entities_changed(1).hit)
# ... later, things gest more updates # ... later, things gest more updates
cache.entity_has_changed("user@foo.com", 5) cache.entity_has_changed("user@foo.com", 5)
@@ -149,7 +153,7 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
# With no entities, it returns True for the past, present, and False for # With no entities, it returns True for the past, present, and False for
# the future. # the future.
self.assertTrue(cache.has_any_entity_changed(0)) self.assertTrue(cache.has_any_entity_changed(0))
self.assertTrue(cache.has_any_entity_changed(1)) self.assertFalse(cache.has_any_entity_changed(1))
self.assertFalse(cache.has_any_entity_changed(2)) self.assertFalse(cache.has_any_entity_changed(2))
# We add an entity # We add an entity