Compare commits

...

15 Commits

Author SHA1 Message Date
Devon Hudson
c9f212ab44 Revert "Fix LaterGauge metrics to collect from all servers (#18751)"
This reverts commit 076db0ab49.
2025-08-06 15:49:26 -06:00
Devon Hudson
85e3adba86 Revert "Temporarily disable problem test"
This reverts commit 4333eff1d5.
2025-08-06 15:49:03 -06:00
Devon Hudson
d3bdf8b091 Revert "Temporarily disable all tests that call generate_latest"
This reverts commit d8ab5434d5.
2025-08-06 15:48:36 -06:00
Devon Hudson
d8ab5434d5 Temporarily disable all tests that call generate_latest 2025-08-06 15:21:42 -06:00
Devon Hudson
4333eff1d5 Temporarily disable problem test 2025-08-06 14:39:35 -06:00
Devon Hudson
c9f04f3484 Re-enable parallel 2025-08-06 14:39:18 -06:00
Olivier 'reivilibre
a387d6ecf8 better monitor 2025-08-06 18:03:31 +01:00
Olivier 'reivilibre
9e473d9e38 fail slow 2025-08-06 18:02:30 +01:00
Olivier 'reivilibre
d2ea7e32f5 Revert "choose a test"
This reverts commit a256423553.
2025-08-06 18:02:11 +01:00
Olivier 'reivilibre
2db0f1e49b don't wait for lint 2025-08-06 17:58:14 +01:00
Olivier 'reivilibre
a256423553 choose a test 2025-08-06 17:56:54 +01:00
Olivier 'reivilibre
e91aa4fd2f debug diskspace 2025-08-06 17:12:49 +01:00
Olivier 'reivilibre
499c1631de no parallel? 2025-08-06 16:29:03 +01:00
Olivier 'reivilibre
4ecd9aba95 Newsfile
Signed-off-by: Olivier 'reivilibre <oliverw@matrix.org>
2025-08-06 16:07:32 +01:00
Olivier 'reivilibre
4e947d05ab Remove stray @DEBUG 2025-08-06 16:07:07 +01:00
17 changed files with 163 additions and 245 deletions

View File

@@ -373,7 +373,7 @@ jobs:
calculate-test-jobs:
if: ${{ !cancelled() && !failure() }} # Allow previous steps to be skipped, but not fail
needs: linting-done
# needs: linting-done
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
@@ -393,6 +393,7 @@ jobs:
- changes
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
job: ${{ fromJson(needs.calculate-test-jobs.outputs.trial_test_matrix) }}
@@ -426,7 +427,24 @@ jobs:
if: ${{ matrix.job.postgres-version }}
timeout-minutes: 2
run: until pg_isready -h localhost; do sleep 1; done
- run: poetry run trial --jobs=6 tests
- run: |
(
while true; do
echo "......."
date
df -h | grep root
free -m
sleep 10
done
) &
MONITOR_PID=$!
poetry run trial --jobs=6 tests
STATUS=$?
kill $MONITOR_PID
exit $STATUS
env:
SYNAPSE_POSTGRES: ${{ matrix.job.database == 'postgres' || '' }}
SYNAPSE_POSTGRES_HOST: /var/run/postgresql

View File

@@ -1 +0,0 @@
Fix `LaterGauge` metrics to collect from all servers.

1
changelog.d/18787.misc Normal file
View File

@@ -0,0 +1 @@
CI debugging.

View File

@@ -37,7 +37,6 @@ Events are replicated via a separate events stream.
"""
import logging
from enum import Enum
from typing import (
TYPE_CHECKING,
Dict,
@@ -68,25 +67,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
class QueueNames(str, Enum):
PRESENCE_MAP = "presence_map"
KEYED_EDU = "keyed_edu"
KEYED_EDU_CHANGED = "keyed_edu_changed"
EDUS = "edus"
POS_TIME = "pos_time"
PRESENCE_DESTINATIONS = "presence_destinations"
queue_name_to_gauge_map: Dict[QueueNames, LaterGauge] = {}
for queue_name in QueueNames:
queue_name_to_gauge_map[queue_name] = LaterGauge(
name=f"synapse_federation_send_queue_{queue_name.value}_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
class FederationRemoteSendQueue(AbstractFederationSender):
"""A drop in replacement for FederationSender"""
@@ -131,15 +111,23 @@ class FederationRemoteSendQueue(AbstractFederationSender):
# we make a new function, so we need to make a new function so the inner
# lambda binds to the queue rather than to the name of the queue which
# changes. ARGH.
def register(queue_name: QueueNames, queue: Sized) -> None:
queue_name_to_gauge_map[queue_name].register_hook(
lambda: {(self.server_name,): len(queue)}
def register(name: str, queue: Sized) -> None:
LaterGauge(
name="synapse_federation_send_queue_%s_size" % (queue_name,),
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(queue)},
)
for queue_name in QueueNames:
queue = getattr(self, queue_name.value)
assert isinstance(queue, Sized)
register(queue_name, queue=queue)
for queue_name in [
"presence_map",
"keyed_edu",
"keyed_edu_changed",
"edus",
"pos_time",
"presence_destinations",
]:
register(queue_name, getattr(self, queue_name))
self.clock.looping_call(self._clear_queue, 30 * 1000)

View File

@@ -199,24 +199,6 @@ sent_pdus_destination_dist_total = Counter(
labelnames=[SERVER_NAME_LABEL],
)
transaction_queue_pending_destinations_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_destinations",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
transaction_queue_pending_pdus_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_pdus",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
transaction_queue_pending_edus_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_edus",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
# Time (in s) to wait before trying to wake up destinations that have
# catch-up outstanding.
# Please note that rate limiting still applies, so while the loop is
@@ -416,28 +398,38 @@ class FederationSender(AbstractFederationSender):
# map from destination to PerDestinationQueue
self._per_destination_queues: Dict[str, PerDestinationQueue] = {}
transaction_queue_pending_destinations_gauge.register_hook(
lambda: {
LaterGauge(
name="synapse_federation_transaction_queue_pending_destinations",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
(self.server_name,): sum(
1
for d in self._per_destination_queues.values()
if d.transmission_loop_running
)
}
},
)
transaction_queue_pending_pdus_gauge.register_hook(
lambda: {
LaterGauge(
name="synapse_federation_transaction_queue_pending_pdus",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
(self.server_name,): sum(
d.pending_pdu_count() for d in self._per_destination_queues.values()
)
}
},
)
transaction_queue_pending_edus_gauge.register_hook(
lambda: {
LaterGauge(
name="synapse_federation_transaction_queue_pending_edus",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
(self.server_name,): sum(
d.pending_edu_count() for d in self._per_destination_queues.values()
)
}
},
)
self._is_processing = False

View File

@@ -173,18 +173,6 @@ state_transition_counter = Counter(
labelnames=["locality", "from", "to", SERVER_NAME_LABEL],
)
presence_user_to_current_state_size_gauge = LaterGauge(
name="synapse_handlers_presence_user_to_current_state_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
presence_wheel_timer_size_gauge = LaterGauge(
name="synapse_handlers_presence_wheel_timer_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
# "currently_active"
LAST_ACTIVE_GRANULARITY = 60 * 1000
@@ -791,8 +779,11 @@ class PresenceHandler(BasePresenceHandler):
EduTypes.PRESENCE, self.incoming_presence
)
presence_user_to_current_state_size_gauge.register_hook(
lambda: {(self.server_name,): len(self.user_to_current_state)}
LaterGauge(
name="synapse_handlers_presence_user_to_current_state_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.user_to_current_state)},
)
# The per-device presence state, maps user to devices to per-device presence state.
@@ -891,8 +882,11 @@ class PresenceHandler(BasePresenceHandler):
60 * 1000,
)
presence_wheel_timer_size_gauge.register_hook(
lambda: {(self.server_name,): len(self.wheel_timer)}
LaterGauge(
name="synapse_handlers_presence_wheel_timer_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.wheel_timer)},
)
# Used to handle sending of presence to newly joined users/servers

View File

@@ -164,12 +164,12 @@ def _get_in_flight_counts() -> Mapping[Tuple[str, ...], int]:
return counts
in_flight_requests = LaterGauge(
LaterGauge(
name="synapse_http_server_in_flight_requests_count",
desc="",
labelnames=["method", "servlet", SERVER_NAME_LABEL],
caller=_get_in_flight_counts,
)
in_flight_requests.register_hook(_get_in_flight_counts)
class RequestMetrics:

View File

@@ -31,7 +31,6 @@ from typing import (
Dict,
Generic,
Iterable,
List,
Mapping,
Optional,
Sequence,
@@ -74,6 +73,8 @@ logger = logging.getLogger(__name__)
METRICS_PREFIX = "/_synapse/metrics"
all_gauges: Dict[str, Collector] = {}
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
SERVER_NAME_LABEL = "server_name"
@@ -162,47 +163,42 @@ class LaterGauge(Collector):
name: str
desc: str
labelnames: Optional[StrSequence] = attr.ib(hash=False)
# List of callbacks: each callback should either return a value (if there are no
# labels for this metric), or dict mapping from a label tuple to a value
_hooks: List[
Callable[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
]
] = attr.ib(factory=list, hash=False)
# callback: should either return a value (if there are no labels for this metric),
# or dict mapping from a label tuple to a value
caller: Callable[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
]
def collect(self) -> Iterable[Metric]:
# The decision to add `SERVER_NAME_LABEL` is from the `LaterGauge` usage itself
# (we don't enforce it here, one level up).
g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames) # type: ignore[missing-server-name-label]
for hook in self._hooks:
try:
hook_result = hook()
except Exception:
logger.exception(
"Exception running callback for LaterGauge(%s)", self.name
)
yield g
return
if isinstance(hook_result, (int, float)):
g.add_metric([], hook_result)
else:
for k, v in hook_result.items():
g.add_metric(k, v)
try:
calls = self.caller()
except Exception:
logger.exception("Exception running callback for LaterGauge(%s)", self.name)
yield g
return
def register_hook(
self,
hook: Callable[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
],
) -> None:
self._hooks.append(hook)
if isinstance(calls, (int, float)):
g.add_metric([], calls)
else:
for k, v in calls.items():
g.add_metric(k, v)
yield g
def __attrs_post_init__(self) -> None:
self._register()
def _register(self) -> None:
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering", self.name)
REGISTRY.unregister(all_gauges.pop(self.name))
REGISTRY.register(self)
all_gauges[self.name] = self
# `MetricsEntry` only makes sense when it is a `Protocol`,
@@ -254,7 +250,7 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
# Protects access to _registrations
self._lock = threading.Lock()
REGISTRY.register(self)
self._register_with_collector()
def register(
self,
@@ -345,6 +341,14 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
gauge.add_metric(labels=key, value=getattr(metrics, name))
yield gauge
def _register_with_collector(self) -> None:
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering", self.name)
REGISTRY.unregister(all_gauges.pop(self.name))
REGISTRY.register(self)
all_gauges[self.name] = self
class GaugeHistogramMetricFamilyWithLabels(GaugeHistogramMetricFamily):
"""

View File

@@ -86,24 +86,6 @@ users_woken_by_stream_counter = Counter(
labelnames=["stream", SERVER_NAME_LABEL],
)
notifier_listeners_gauge = LaterGauge(
name="synapse_notifier_listeners",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
notifier_rooms_gauge = LaterGauge(
name="synapse_notifier_rooms",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
notifier_users_gauge = LaterGauge(
name="synapse_notifier_users",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
T = TypeVar("T")
@@ -299,16 +281,28 @@ class Notifier:
)
}
notifier_listeners_gauge.register_hook(count_listeners)
notifier_rooms_gauge.register_hook(
lambda: {
LaterGauge(
name="synapse_notifier_listeners",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=count_listeners,
)
LaterGauge(
name="synapse_notifier_rooms",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
(self.server_name,): count(
bool, list(self.room_to_user_streams.values())
)
}
},
)
notifier_users_gauge.register_hook(
lambda: {(self.server_name,): len(self.user_to_user_stream)}
LaterGauge(
name="synapse_notifier_users",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.user_to_user_stream)},
)
def add_replication_callback(self, cb: Callable[[], None]) -> None:

View File

@@ -106,18 +106,6 @@ user_ip_cache_counter = Counter(
"synapse_replication_tcp_resource_user_ip_cache", "", labelnames=[SERVER_NAME_LABEL]
)
tcp_resource_total_connections_gauge = LaterGauge(
name="synapse_replication_tcp_resource_total_connections",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
tcp_command_queue_gauge = LaterGauge(
name="synapse_replication_tcp_command_queue",
desc="Number of inbound RDATA/POSITION commands queued for processing",
labelnames=["stream_name", SERVER_NAME_LABEL],
)
# the type of the entries in _command_queues_by_stream
_StreamCommandQueue = Deque[
@@ -255,8 +243,11 @@ class ReplicationCommandHandler:
# outgoing replication commands to.)
self._connections: List[IReplicationConnection] = []
tcp_resource_total_connections_gauge.register_hook(
lambda: {(self.server_name,): len(self._connections)}
LaterGauge(
name="synapse_replication_tcp_resource_total_connections",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self._connections)},
)
# When POSITION or RDATA commands arrive, we stick them in a queue and process
@@ -275,11 +266,14 @@ class ReplicationCommandHandler:
# from that connection.
self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {}
tcp_command_queue_gauge.register_hook(
lambda: {
LaterGauge(
name="synapse_replication_tcp_command_queue",
desc="Number of inbound RDATA/POSITION commands queued for processing",
labelnames=["stream_name", SERVER_NAME_LABEL],
caller=lambda: {
(stream_name, self.server_name): len(queue)
for stream_name, queue in self._command_queues_by_stream.items()
}
},
)
self._is_master = hs.config.worker.worker_app is None

View File

@@ -527,11 +527,9 @@ pending_commands = LaterGauge(
name="synapse_replication_tcp_protocol_pending_commands",
desc="",
labelnames=["name", SERVER_NAME_LABEL],
)
pending_commands.register_hook(
lambda: {
caller=lambda: {
(p.name, p.server_name): len(p.pending_commands) for p in connected_connections
}
},
)
@@ -546,11 +544,9 @@ transport_send_buffer = LaterGauge(
name="synapse_replication_tcp_protocol_transport_send_buffer",
desc="",
labelnames=["name", SERVER_NAME_LABEL],
)
transport_send_buffer.register_hook(
lambda: {
caller=lambda: {
(p.name, p.server_name): transport_buffer_size(p) for p in connected_connections
}
},
)
@@ -575,12 +571,10 @@ tcp_transport_kernel_send_buffer = LaterGauge(
name="synapse_replication_tcp_protocol_transport_kernel_send_buffer",
desc="",
labelnames=["name", SERVER_NAME_LABEL],
)
tcp_transport_kernel_send_buffer.register_hook(
lambda: {
caller=lambda: {
(p.name, p.server_name): transport_kernel_read_buffer_size(p, False)
for p in connected_connections
}
},
)
@@ -588,10 +582,8 @@ tcp_transport_kernel_read_buffer = LaterGauge(
name="synapse_replication_tcp_protocol_transport_kernel_read_buffer",
desc="",
labelnames=["name", SERVER_NAME_LABEL],
)
tcp_transport_kernel_read_buffer.register_hook(
lambda: {
caller=lambda: {
(p.name, p.server_name): transport_kernel_read_buffer_size(p, True)
for p in connected_connections
}
},
)

View File

@@ -100,12 +100,6 @@ sql_txn_duration = Counter(
labelnames=["desc", SERVER_NAME_LABEL],
)
background_update_status = LaterGauge(
name="synapse_background_update_status",
desc="Background update status",
labelnames=[SERVER_NAME_LABEL],
)
# Unique indexes which have been added in background updates. Maps from table name
# to the name of the background update which added the unique index to that table.
@@ -617,8 +611,11 @@ class DatabasePool:
)
self.updates = BackgroundUpdater(hs, self)
background_update_status.register_hook(
lambda: {(self.server_name,): self.updates.get_status()},
LaterGauge(
name="synapse_background_update_status",
desc="Background update status",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): self.updates.get_status()},
)
self._previous_txn_total_time = 0.0

View File

@@ -84,13 +84,6 @@ _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME = "current_state_events_membership"
_POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE = 1000
federation_known_servers_gauge = LaterGauge(
name="synapse_federation_known_servers",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
@attr.s(frozen=True, slots=True, auto_attribs=True)
class EventIdMembership:
"""Returned by `get_membership_from_event_ids`"""
@@ -123,8 +116,11 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
1,
self._count_known_servers,
)
federation_known_servers_gauge.register_hook(
lambda: {(self.server_name,): self._known_servers_count}
LaterGauge(
name="synapse_federation_known_servers",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): self._known_servers_count},
)
@wrap_as_background_process("_count_known_servers")

View File

@@ -131,31 +131,27 @@ def _get_counts_from_rate_limiter_instance(
# We track the number of affected hosts per time-period so we can
# differentiate one really noisy homeserver from a general
# ratelimit tuning problem across the federation.
sleep_affected_hosts_gauge = LaterGauge(
LaterGauge(
name="synapse_rate_limit_sleep_affected_hosts",
desc="Number of hosts that had requests put to sleep",
labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
)
sleep_affected_hosts_gauge.register_hook(
lambda: _get_counts_from_rate_limiter_instance(
caller=lambda: _get_counts_from_rate_limiter_instance(
lambda rate_limiter_instance: sum(
ratelimiter.should_sleep()
for ratelimiter in rate_limiter_instance.ratelimiters.values()
)
)
),
)
reject_affected_hosts_gauge = LaterGauge(
LaterGauge(
name="synapse_rate_limit_reject_affected_hosts",
desc="Number of hosts that had requests rejected",
labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
)
reject_affected_hosts_gauge.register_hook(
lambda: _get_counts_from_rate_limiter_instance(
caller=lambda: _get_counts_from_rate_limiter_instance(
lambda rate_limiter_instance: sum(
ratelimiter.should_reject()
for ratelimiter in rate_limiter_instance.ratelimiters.values()
)
)
),
)

View File

@@ -44,13 +44,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
running_tasks_gauge = LaterGauge(
name="synapse_scheduler_running_tasks",
desc="The number of concurrent running tasks handled by the TaskScheduler",
labelnames=[SERVER_NAME_LABEL],
)
class TaskScheduler:
"""
This is a simple task scheduler designed for resumable tasks. Normally,
@@ -137,8 +130,11 @@ class TaskScheduler:
TaskScheduler.SCHEDULE_INTERVAL_MS,
)
running_tasks_gauge.register_hook(
lambda: {(self.server_name,): len(self._running_tasks)}
LaterGauge(
name="synapse_scheduler_running_tasks",
desc="The number of concurrent running tasks handled by the TaskScheduler",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self._running_tasks)},
)
def register_action(

View File

@@ -22,7 +22,7 @@
from synapse.api.constants import EduTypes
from tests import unittest
from tests.unittest import DEBUG, override_config
from tests.unittest import override_config
class RoomDirectoryFederationTests(unittest.FederatingHomeserverTestCase):
@@ -48,7 +48,6 @@ class RoomDirectoryFederationTests(unittest.FederatingHomeserverTestCase):
)
self.assertEqual(200, channel.code)
@DEBUG
def test_edu_debugging_doesnt_explode(self) -> None:
"""Sanity check incoming federation succeeds with `synapse.debug_8631` enabled.

View File

@@ -22,13 +22,7 @@ from typing import Dict, Protocol, Tuple
from prometheus_client.core import Sample
from synapse.metrics import (
REGISTRY,
SERVER_NAME_LABEL,
InFlightGauge,
LaterGauge,
generate_latest,
)
from synapse.metrics import REGISTRY, InFlightGauge, generate_latest
from synapse.util.caches.deferred_cache import DeferredCache
from tests import unittest
@@ -291,42 +285,6 @@ class CacheMetricsTests(unittest.HomeserverTestCase):
self.assertEqual(hs2_cache_max_size_metric_value, "777.0")
class LaterGaugeTests(unittest.HomeserverTestCase):
def test_later_gauge_multiple_servers(self) -> None:
"""
Test that LaterGauge metrics are reported correctly across multiple servers. We
will have an metrics entry for each homeserver that is labeled with the
`server_name` label.
"""
later_gauge = LaterGauge(
name="foo",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
later_gauge.register_hook(lambda: {("hs1",): 1})
later_gauge.register_hook(lambda: {("hs2",): 2})
metrics_map = get_latest_metrics()
# Find the metrics for the caches from both homeservers
hs1_metric = 'foo{server_name="hs1"}'
hs1_metric_value = metrics_map.get(hs1_metric)
self.assertIsNotNone(
hs1_metric_value,
f"Missing metric {hs1_metric} in cache metrics {metrics_map}",
)
hs2_metric = 'foo{server_name="hs2"}'
hs2_metric_value = metrics_map.get(hs2_metric)
self.assertIsNotNone(
hs2_metric_value,
f"Missing metric {hs2_metric} in cache metrics {metrics_map}",
)
# Sanity check the metric values
self.assertEqual(hs1_metric_value, "1.0")
self.assertEqual(hs2_metric_value, "2.0")
def get_latest_metrics() -> Dict[str, str]:
"""
Collect the latest metrics from the registry and parse them into an easy to use map.