rusty_common/websocket/
heartbeat.rs1use log::{debug, warn};
7use parking_lot::RwLock;
8use quanta::{Clock, Instant};
9use std::sync::Arc;
10use std::time::Duration;
11
12use super::client::ConnectionState;
13use super::stats::ConnectionStats;
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum HeartbeatStatus {
18 Healthy,
20 Unhealthy,
22 Dead,
24}
25
26#[derive(Debug, Clone)]
28pub struct HeartbeatMonitor {
29 last_message_time: Arc<RwLock<Instant>>,
31 last_check_time: Arc<RwLock<Instant>>,
33 heartbeat_interval_milliseconds: u64,
35 heartbeat_timeout_milliseconds: u64,
37 max_missed_heartbeats: u32,
39 missed_heartbeats: Arc<RwLock<u32>>,
41 connection_status: Arc<RwLock<ConnectionState>>,
43 stats: Arc<RwLock<ConnectionStats>>,
45 clock: Clock,
47}
48
49impl HeartbeatMonitor {
50 #[must_use]
52 pub fn new(
53 heartbeat_interval_milliseconds: u64,
54 heartbeat_timeout_milliseconds: u64,
55 max_missed_heartbeats: u32,
56 connection_status: Arc<RwLock<ConnectionState>>,
57 stats: Arc<RwLock<ConnectionStats>>,
58 ) -> Self {
59 let clock = Clock::new();
60 let now = clock.now();
61 Self {
62 last_message_time: Arc::new(RwLock::new(now)),
63 last_check_time: Arc::new(RwLock::new(now)),
64 heartbeat_interval_milliseconds,
65 heartbeat_timeout_milliseconds,
66 max_missed_heartbeats,
67 missed_heartbeats: Arc::new(RwLock::new(0)),
68 connection_status,
69 stats,
70 clock,
71 }
72 }
73
74 pub fn update_last_message_time(&self) {
76 *self.last_message_time.write() = self.clock.now();
77 *self.missed_heartbeats.write() = 0;
78 }
79
80 pub fn check_heartbeat(&self) -> HeartbeatStatus {
82 if self.heartbeat_interval_milliseconds == 0 {
84 return HeartbeatStatus::Healthy;
85 }
86
87 let now = self.clock.now();
88 let last_message_time = *self.last_message_time.read();
89 let last_check_time = *self.last_check_time.read();
90 let elapsed_since_last_message = now.duration_since(last_message_time);
91 let elapsed_since_last_check = now.duration_since(last_check_time);
92 let interval = Duration::from_millis(self.heartbeat_interval_milliseconds);
93 let timeout = Duration::from_millis(self.heartbeat_timeout_milliseconds);
94
95 if elapsed_since_last_check < interval {
97 let missed = *self.missed_heartbeats.read();
99 return if missed >= self.max_missed_heartbeats {
100 HeartbeatStatus::Dead
101 } else if missed > 0 {
102 HeartbeatStatus::Unhealthy
103 } else {
104 HeartbeatStatus::Healthy
105 };
106 }
107
108 *self.last_check_time.write() = now;
110
111 if elapsed_since_last_message > timeout {
113 let mut missed_heartbeats = self.missed_heartbeats.write();
115 *missed_heartbeats += 1;
116
117 debug!(
119 "Missed heartbeat: {} of {} (timeout: {}ms, elapsed: {}ms)",
120 *missed_heartbeats,
121 self.max_missed_heartbeats,
122 self.heartbeat_timeout_milliseconds,
123 elapsed_since_last_message.as_millis()
124 );
125
126 if *missed_heartbeats >= self.max_missed_heartbeats {
128 warn!(
129 "Connection dead: missed {} heartbeats (max: {})",
130 *missed_heartbeats, self.max_missed_heartbeats
131 );
132 *self.connection_status.write() = ConnectionState::Error;
133 self.stats.write().errors += 1;
134 return HeartbeatStatus::Dead;
135 }
136
137 return HeartbeatStatus::Unhealthy;
139 }
140
141 *self.missed_heartbeats.write() = 0;
143 HeartbeatStatus::Healthy
144 }
145
146 pub fn reset(&self) {
148 let now = self.clock.now();
149 *self.last_message_time.write() = now;
150 *self.last_check_time.write() = now;
151 *self.missed_heartbeats.write() = 0;
152 }
153
154 pub fn missed_heartbeats(&self) -> u32 {
156 *self.missed_heartbeats.read()
157 }
158
159 pub fn time_since_last_message(&self) -> Duration {
161 self.clock
162 .now()
163 .duration_since(*self.last_message_time.read())
164 }
165}
166
167#[cfg(test)]
168mod tests {
169 use super::*;
170 use std::thread;
171
172 #[test]
173 fn test_heartbeat_monitor() {
174 let connection_status = Arc::new(RwLock::new(ConnectionState::Connected));
175 let stats = Arc::new(RwLock::new(ConnectionStats::default()));
176
177 let monitor = HeartbeatMonitor::new(
179 100, 200, 3, connection_status.clone(),
183 stats.clone(),
184 );
185
186 assert_eq!(monitor.check_heartbeat(), HeartbeatStatus::Healthy);
188
189 monitor.update_last_message_time();
191 assert_eq!(monitor.check_heartbeat(), HeartbeatStatus::Healthy);
192
193 thread::sleep(Duration::from_millis(300));
195
196 assert_eq!(monitor.check_heartbeat(), HeartbeatStatus::Unhealthy);
198 assert_eq!(monitor.missed_heartbeats(), 1);
199
200 thread::sleep(Duration::from_millis(300));
202 assert_eq!(monitor.check_heartbeat(), HeartbeatStatus::Unhealthy);
203 assert_eq!(monitor.missed_heartbeats(), 2);
204
205 thread::sleep(Duration::from_millis(300));
206
207 assert_eq!(monitor.check_heartbeat(), HeartbeatStatus::Dead);
209 assert_eq!(monitor.missed_heartbeats(), 3);
210 assert_eq!(*connection_status.read(), ConnectionState::Error);
211
212 monitor.reset();
214 assert_eq!(monitor.missed_heartbeats(), 0);
215 assert_eq!(monitor.check_heartbeat(), HeartbeatStatus::Healthy);
216 }
217}