biomonitor 5.0: import cv2 import mediapipe as mp import numpy as np import scipy.signal as signal import scipy.fftpack as fftpack from scipy.signal import savgol_filter from collections import deque import time import traceback import warnings warnings.filterwarnings("ignore") # --- CONFIGURATION --- BUFFER_SIZE = 300 # Math buffer HISTORY_SIZE = 3000 # Display buffer (Holds ~1.5 mins of data) FPS = 30 MIN_HR, MAX_HR = 45, 140 MIN_RR, MAX_RR = 6, 30 DOT_INTERVAL = 45 # Frames between dots # Visual Style GRAPH_W = 900 GRAPH_H = 480 # Increased height for Scroll Bar BG_COLOR = (15, 15, 15) GRID_COLOR = (40, 40, 40) HR_COLOR = (0, 100, 255) # Orange/Red RR_COLOR = (0, 255, 128) # Spring Green BOX_COLOR_FACE = (0, 255, 255) # Cyan BOX_COLOR_BODY = (255, 200, 0) # Gold SCROLL_BAR_COLOR = (100, 100, 100) SCROLL_HANDLE_COLOR = (200, 200, 200) # --- INDICES --- FOREHEAD = [151, 10, 338, 297, 332, 284, 251, 389, 356, 454, 323, 361, 288, 397, 365, 379, 378, 400, 377, 152, 148, 176, 149, 150, 136, 172, 58, 132, 93, 234, 127, 162, 21, 54, 103, 67, 109] L_CHEEK = [329, 348, 347, 280, 425, 427, 437, 426, 411, 280, 330, 329] R_CHEEK = [100, 119, 118, 50, 205, 207, 217, 206, 187, 50, 101, 100] class BioMonitor: def __init__(self): # Signal Buffers self.raw_r = deque([0]*BUFFER_SIZE, maxlen=BUFFER_SIZE) self.raw_g = deque([0]*BUFFER_SIZE, maxlen=BUFFER_SIZE) self.raw_b = deque([0]*BUFFER_SIZE, maxlen=BUFFER_SIZE) self.motion_y = deque([0]*BUFFER_SIZE, maxlen=BUFFER_SIZE) # Display History self.hr_history = deque([0]*HISTORY_SIZE, maxlen=HISTORY_SIZE) self.rr_history = deque([0]*HISTORY_SIZE, maxlen=HISTORY_SIZE) # Dots self.hr_dots = [] self.rr_dots = [] self.frame_count = 0 # State self.bpm_display = 0.0 self.rpm_display = 0.0 self.snr_confidence = 0.0 self.algo_status = "INIT" # Time Management self.total_paused_time = 0 self.pause_start_timestamp = 0 self.start_time = time.time() # Controls self.paused = False self.scroll_offset = 0 # 0 = Live (Right Edge), >0 = Back into history # MediaPipe self.mp_holistic = mp.solutions.holistic self.holistic = self.mp_holistic.Holistic( model_complexity=1, min_detection_confidence=0.5, min_tracking_confidence=0.5, refine_face_landmarks=False ) self.cap = cv2.VideoCapture(0, cv2.CAP_DSHOW) self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) self.running = True def get_roi_data(self, frame, landmarks, indices): h, w, _ = frame.shape pts = np.array([(int(landmarks[i].x * w), int(landmarks[i].y * h)) for i in indices]) if len(pts) == 0: return 0, 0, 0, None mask = np.zeros((h, w), dtype=np.uint8) cv2.fillConvexPoly(mask, pts, 255) mean = cv2.mean(frame, mask=mask) x, y, bw, bh = cv2.boundingRect(pts) return mean[2], mean[1], mean[0], (x, y, bw, bh) def calculate_omit(self, r, g, b): X = np.nan_to_num(np.array([r, g, b]).T) X_norm = (X - np.mean(X, axis=0)) / (np.std(X, axis=0) + 1e-6) try: Q, _ = np.linalg.qr(X_norm); return Q except: return np.zeros_like(X) def calculate_chrom(self, r, g, b): l = int(FPS * 1.6) if len(r) < l: return np.zeros(len(r)) r_win = np.array(r[-l:]); g_win = np.array(g[-l:]); b_win = np.array(b[-l:]) Xs = 3*r_win - 2*g_win Ys = 1.5*r_win + g_win - 1.5*b_win alpha = np.std(Xs) / (np.std(Ys) + 1e-6) return Xs - alpha * Ys def get_fft(self, signal_data, min_hz, max_hz): if len(signal_data) < FPS * 2: return 0, 0 data = np.nan_to_num(signal_data) try: data = savgol_filter(data, 7, 2) except: pass detrended = signal.detrend(data) try: nyquist = FPS / 2 b, a = signal.butter(2, [min_hz/nyquist, max_hz/nyquist], btype='band') filtered = signal.filtfilt(b, a, detrended) except: filtered = detrended windowed = filtered * np.hamming(len(filtered)) fft_out = np.abs(fftpack.fft(windowed)) freqs = fftpack.fftfreq(len(windowed), d=1.0/FPS) idx = np.where((freqs >= min_hz) & (freqs <= max_hz))[0] if len(idx) == 0: return 0, 0 peak_idx = np.argmax(fft_out[idx]) return freqs[idx][peak_idx] * 60, fft_out[idx][peak_idx] / (np.mean(fft_out[idx]) + 1e-6) def update_signals(self): if self.paused: return r = list(self.raw_r); g = list(self.raw_g); b = list(self.raw_b) m = list(self.motion_y) if len(r) >= BUFFER_SIZE: Q = self.calculate_omit(r, g, b) bpm_o = 0; snr_o = 0 if Q.shape[1] >= 3: for c in [1, 2]: val, snr = self.get_fft(Q[:, c], MIN_HR/60, MAX_HR/60) if snr > snr_o: snr_o = snr; bpm_o = val chrom = self.calculate_chrom(r, g, b) bpm_c, snr_c = self.get_fft(chrom, MIN_HR/60, MAX_HR/60) total_snr = snr_o + snr_c if total_snr > 0.1: final_bpm = (bpm_o * snr_o + bpm_c * snr_c) / total_snr self.algo_status = "OMIT" if snr_o > snr_c else "CHROM" alpha = 0.2 if max(snr_o, snr_c) > 1.5 else 0.05 self.bpm_display = (alpha * final_bpm) + ((1-alpha) * self.bpm_display) self.snr_confidence = min(100, int(max(snr_o, snr_c) * 20)) rpm, _ = self.get_fft(m, MIN_RR/60, MAX_RR/60) self.rpm_display = (0.15 * rpm) + (0.85 * self.rpm_display) self.hr_history.append(self.bpm_display) self.rr_history.append(self.rpm_display) if not self.paused: for dot in self.hr_dots: dot['age'] += 1 for dot in self.rr_dots: dot['age'] += 1 self.hr_dots = [d for d in self.hr_dots if d['age'] < HISTORY_SIZE] self.rr_dots = [d for d in self.rr_dots if d['age'] < HISTORY_SIZE] self.frame_count += 1 if self.frame_count % DOT_INTERVAL == 0: self.hr_dots.append({'val': self.bpm_display, 'age': 0}) self.rr_dots.append({'val': self.rpm_display, 'age': 0}) def draw_graph(self): canvas = np.full((GRAPH_H, GRAPH_W, 3), BG_COLOR, dtype=np.uint8) graph_h_actual = GRAPH_H - 30 # Reserve bottom 30px for Scrollbar # Grid for i in range(0, GRAPH_W, 50): cv2.line(canvas, (i, 0), (i, graph_h_actual), GRID_COLOR, 1) for i in range(1, 4): y = int(i * (graph_h_actual / 4)) cv2.line(canvas, (0, y), (GRAPH_W, y), GRID_COLOR, 1) # --- VIEWPORT LOGIC --- total_len = len(self.hr_history) view_w = GRAPH_W # If paused, we look back by scroll_offset # offset 0 = The very end (Live view) end_idx = total_len - self.scroll_offset start_idx = max(0, end_idx - view_w) # Slice data h_data = list(self.hr_history)[start_idx:end_idx] r_data = list(self.rr_history)[start_idx:end_idx] # S-G Filter if len(h_data) > 45: try: h_data = savgol_filter(h_data, 45, 3) except: pass try: r_data = savgol_filter(r_data, 45, 3) except: pass def draw_layer(data, dots_list, y_base, y_h, color, min_v, max_v): if len(data) < 2: return # Align right if not enough data to fill screen align_offset = view_w - len(data) if not self.paused else 0 if self.scroll_offset == 0 and len(data) < view_w: align_offset = view_w - len(data) else: align_offset = 0 # If scrolling, we usually fill the screen unless at very start pts = [] for i, val in enumerate(data): norm = (val - min_v) / (max_v - min_v + 1e-6) py = int((y_base + y_h) - (np.clip(norm, 0, 1) * y_h)) px = i + align_offset pts.append((px, py)) if len(pts) > 1: fill_pts = np.array(pts + [(pts[-1][0], y_base+y_h), (pts[0][0], y_base+y_h)]) overlay = canvas.copy() cv2.fillPoly(overlay, [fill_pts], color) cv2.addWeighted(overlay, 0.25, canvas, 0.75, 0, canvas) cv2.polylines(canvas, [np.array(pts, np.int32)], False, color, 2, cv2.LINE_AA) # Draw Dots # We need to map 'dot age' to current viewport pixels # A dot with age 0 is at index = total_len - 1 # A dot with age A is at index = total_len - 1 - A # Our viewport starts at: start_idx for dot in dots_list: dot_global_idx = total_len - 1 - dot['age'] # Is this dot inside our [start_idx, end_idx] window? if start_idx <= dot_global_idx < end_idx: # Calculate pixel position # index in 'data' list = dot_global_idx - start_idx local_idx = dot_global_idx - start_idx if 0 <= local_idx < len(pts): px, py = pts[local_idx] cv2.circle(canvas, (px, py), 4, (255, 255, 255), -1) cv2.circle(canvas, (px, py), 6, color, 1) cv2.putText(canvas, f"{int(dot['val'])}", (px - 10, py - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1) draw_layer(h_data, self.hr_dots, 0, graph_h_actual//2 - 20, HR_COLOR, 45, 140) draw_layer(r_data, self.rr_dots, graph_h_actual//2, graph_h_actual//2 - 20, RR_COLOR, 5, 35) cv2.line(canvas, (0, graph_h_actual//2), (GRAPH_W, graph_h_actual//2), (100, 100, 100), 2) # --- SCROLL BAR --- # Draw Background Bar cv2.rectangle(canvas, (0, GRAPH_H-25), (GRAPH_W, GRAPH_H-5), SCROLL_BAR_COLOR, -1) # Draw Handle # Handle width represents the view_w / HISTORY_SIZE ratio if HISTORY_SIZE > 0: ratio_visible = min(1.0, view_w / HISTORY_SIZE) handle_w = max(40, int(GRAPH_W * ratio_visible)) # Handle position # scroll_offset=0 -> Rightmost -> x = GRAPH_W - handle_w # scroll_offset=max -> Leftmost -> x = 0 max_scroll = max(1, total_len - view_w) current_scroll = min(self.scroll_offset, max_scroll) scroll_fraction = current_scroll / max_scroll if max_scroll > 0 else 0 # Invert fraction because offset 0 is "End" (Right side) # visual_x = (1 - scroll_fraction) * (GRAPH_W - handle_w) # Actually, standard scrollbar: Left=Oldest, Right=Newest. # Our self.scroll_offset=0 implies we are looking at Newest (Right). # So offset=0 means Handle should be at Far Right. handle_x = int((1.0 - scroll_fraction) * (GRAPH_W - handle_w)) # If buffer isn't full yet, handle should just stick to right and grow if total_len < view_w: handle_x = 0 handle_w = GRAPH_W cv2.rectangle(canvas, (handle_x, GRAPH_H-25), (handle_x + handle_w, GRAPH_H-5), SCROLL_HANDLE_COLOR, -1) # --- INFO OVERLAYS --- # Stopwatch Calculation current_time = time.time() if self.paused: # While paused, time doesn't increase. Show time at moment of pause. elapsed = self.pause_start_timestamp - self.start_time - self.total_paused_time else: elapsed = current_time - self.start_time - self.total_paused_time mins, secs = divmod(int(elapsed), 60) hours, mins = divmod(mins, 60) cv2.rectangle(canvas, (GRAPH_W-220, 0), (GRAPH_W, 90), (0,0,0), -1) cv2.putText(canvas, f"RUNTIME: {hours:02}:{mins:02}:{secs:02}", (GRAPH_W - 210, 20), cv2.FONT_HERSHEY_PLAIN, 1, (0, 255, 255), 1) status_txt = "PAUSED" if self.paused else f"CONF: {self.snr_confidence}%" status_col = (50, 50, 255) if self.paused else (0, 255, 0) cv2.putText(canvas, status_txt, (GRAPH_W - 210, 45), cv2.FONT_HERSHEY_PLAIN, 1, status_col, 1) cv2.putText(canvas, f"HR: {self.bpm_display:.1f}", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 1.2, HR_COLOR, 2) cv2.putText(canvas, f"RR: {self.rpm_display:.1f}", (10, graph_h_actual//2 + 50), cv2.FONT_HERSHEY_SIMPLEX, 1.2, RR_COLOR, 2) return canvas def run(self): try: while self.running: key = cv2.waitKey(1) & 0xFF if key == ord('q'): self.running = False # PAUSE TOGGLE if key == ord('p'): self.paused = not self.paused if self.paused: # JUST PAUSED self.pause_start_timestamp = time.time() else: # JUST UNPAUSED - Calculate how long we were paused paused_duration = time.time() - self.pause_start_timestamp self.total_paused_time += paused_duration # Do NOT clear buffers. We resume math. # Do NOT reset scroll offset. We jump back to live view? # Usually unpausing jumps to live view. self.scroll_offset = 0 if self.paused: # Scroll Keys if key == ord('a'): # Left self.scroll_offset = min(self.scroll_offset + 40, len(self.hr_history) - GRAPH_W) if key == ord('d'): # Right self.scroll_offset = max(self.scroll_offset - 40, 0) cv2.imshow("Bio-Metric Data", self.draw_graph()) continue # --- LIVE CAMERA --- ret, frame = self.cap.read() if not ret: break rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) results = self.holistic.process(rgb) h, w, _ = frame.shape valid = False if results.face_landmarks: lms = results.face_landmarks.landmark r1,g1,b1, b_fh = self.get_roi_data(frame, lms, FOREHEAD) if b_fh: cv2.rectangle(frame, (b_fh[0], b_fh[1]), (b_fh[0]+b_fh[2], b_fh[1]+b_fh[3]), BOX_COLOR_FACE, 1) cv2.putText(frame, "FOREHEAD", (b_fh[0], b_fh[1]-5), cv2.FONT_HERSHEY_PLAIN, 0.8, BOX_COLOR_FACE, 1) r2,g2,b2, b_lc = self.get_roi_data(frame, lms, L_CHEEK) if b_lc: cv2.rectangle(frame, (b_lc[0], b_lc[1]), (b_lc[0]+b_lc[2], b_lc[1]+b_lc[3]), BOX_COLOR_FACE, 1) cv2.putText(frame, "L-CHEEK", (b_lc[0], b_lc[1]-5), cv2.FONT_HERSHEY_PLAIN, 0.8, BOX_COLOR_FACE, 1) r3,g3,b3, b_rc = self.get_roi_data(frame, lms, R_CHEEK) if b_rc: cv2.rectangle(frame, (b_rc[0], b_rc[1]), (b_rc[0]+b_rc[2], b_rc[1]+b_rc[3]), BOX_COLOR_FACE, 1) cv2.putText(frame, "R-CHEEK", (b_rc[0], b_rc[1]-5), cv2.FONT_HERSHEY_PLAIN, 0.8, BOX_COLOR_FACE, 1) if b_fh and b_lc and b_rc: self.raw_r.append((r1+r2+r3)/3) self.raw_g.append((g1+g2+g3)/3) self.raw_b.append((b1+b2+b3)/3) valid = True chest_y = 0 if results.pose_landmarks: pose = results.pose_landmarks.landmark l_sh = pose[11]; r_sh = pose[12] if l_sh.visibility > 0.5 and r_sh.visibility > 0.5: sx1, sy1 = int(l_sh.x*w), int(l_sh.y*h) sx2, sy2 = int(r_sh.x*w), int(r_sh.y*h) sh_x1, sh_x2 = min(sx1, sx2)-20, max(sx1, sx2)+20 sh_y1, sh_y2 = min(sy1, sy2)-30, max(sy1, sy2)+30 cv2.rectangle(frame, (sh_x1, sh_y1), (sh_x2, sh_y2), BOX_COLOR_BODY, 1) cv2.putText(frame, "SHOULDERS", (sh_x1, sh_y1-5), cv2.FONT_HERSHEY_PLAIN, 0.8, BOX_COLOR_BODY, 1) cx, cy = int((sx1+sx2)/2), int((sy1+sy2)/2) + 50 chest_y = cy cv2.rectangle(frame, (cx-30, cy-30), (cx+30, cy+30), BOX_COLOR_BODY, 1) cv2.putText(frame, "CHEST", (cx-30, cy-5), cv2.FONT_HERSHEY_PLAIN, 0.8, BOX_COLOR_BODY, 1) if chest_y > 0: self.motion_y.append(chest_y) elif len(self.motion_y)>0: self.motion_y.append(self.motion_y[-1]) if not valid: if len(self.raw_r)>0: self.raw_r.append(self.raw_r[-1]); self.raw_g.append(self.raw_g[-1]); self.raw_b.append(self.raw_b[-1]) cv2.putText(frame, "SEARCHING...", (w//2 - 50, h//2), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,0,255), 2) self.update_signals() cv2.imshow("Bio-Metric Data", self.draw_graph()) cv2.imshow("ROI Tracker", frame) except Exception: print(traceback.format_exc()) self.cap.release() cv2.destroyAllWindows() if __name__ == "__main__": BioMonitor().run()