From 756a6d29f75c7beb4039e0b72da6b11fb87d6366 Mon Sep 17 00:00:00 2001 From: guanana Date: Fri, 30 Jan 2026 00:17:09 +0000 Subject: [PATCH 1/2] feat: Add MJPEG stream support and generalize video input handling by replacing `RTSPReader` with a new `VideoReader`. --- .env.example | 2 +- .gitignore | 2 +- config/settings.py | 7 +- docker-compose.yml | 21 +- requirements.txt | 4 +- services/streaming/streaming_service.py | 75 ++++--- services/streaming/video_reader.py | 255 ++++++++++++++++++++++++ 7 files changed, 325 insertions(+), 41 deletions(-) create mode 100644 services/streaming/video_reader.py diff --git a/.env.example b/.env.example index 1b6954c..07377af 100644 --- a/.env.example +++ b/.env.example @@ -1,5 +1,5 @@ -RTSP_URL=rtsp://username:password@192.168.1.3:554/stream1 +STREAM_URL=rtsp://username:password@192.168.1.3:554/stream1 CAMERA_HOST=192.168.1.100 CAMERA_USERNAME=your_tapo_email@example.com diff --git a/.gitignore b/.gitignore index b7faf40..e8fece7 100644 --- a/.gitignore +++ b/.gitignore @@ -173,7 +173,7 @@ cython_debug/ # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. -#.idea/ +.idea/ # Abstra # Abstra is an AI-powered process automation framework. diff --git a/config/settings.py b/config/settings.py index 93dcc43..55a2275 100644 --- a/config/settings.py +++ b/config/settings.py @@ -10,8 +10,9 @@ class BabyMonitorSettings: """Main configuration class for RTSP Recorder""" - # ==================== RTSP Settings ==================== - RTSP_URL = os.getenv("RTSP_URL") + # ==================== Video Input Settings ==================== + STREAM_URL = os.getenv("STREAM_URL", os.getenv("RTSP_URL")) # Can be RTSP URI or HTTP/HTTPS MJPEG stream URL + STREAM_SELF_SIGNED_CERT = os.getenv("STREAM_SELF_SIGNED_CERT", "false").lower() == "true" RTSP_TIMEOUT = int(os.getenv("RTSP_TIMEOUT", 10)) YOLO_MODEL_NAME = os.getenv("MODEL_NAME", "yolov8n.pt") @@ -33,7 +34,7 @@ class BabyMonitorSettings: # Use Docker-compatible paths if running in container CONFIDENCE_THRESHOLD = 0.4 # detection confidence - TARGET_FPS = 30.0 # reduced fps for CPU processing + TARGET_FPS = 30.0 # reduced fps for CPU processing. Higher values starve the web server. DEBUG_VIDEO = True # enable extra video debugging output # GPU usage flag diff --git a/docker-compose.yml b/docker-compose.yml index cb67ed2..afa0fb2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,26 +1,35 @@ services: ai-baby-monitor: - build: + build: context: . dockerfile: Dockerfile container_name: AI-Baby-Monitor restart: unless-stopped ports: - "8847:8847" - + # Environment variables environment: - - RTSP_URL=${RTSP_URL} + - STREAM_URL=${STREAM_URL} + - STREAM_SELF_SIGNED_CERT=${STREAM_SELF_SIGNED_CERT} - FLASK_ENV=production - PYTHONUNBUFFERED=1 - + # Volume mappings volumes: # Map recordings directory (MONITOR_RECORDINGS_DIR) - ${HOME}/baby-monitor:/app/baby-monitor - # Health check override + # Mount source code for development/debugging + #- ./services:/app/services + #- ./templates:/app/templates + #- ./static:/app/static + #- ./api:/app/api + #- ./config:/app/config + #- ./utils:/app/utils + #- ./app.py:/app/app.py + # Health check override healthcheck: - test: ["CMD", "curl", "-f", "http://localhost:8847/health"] + test: [ "CMD", "curl", "-f", "http://localhost:8847/health" ] interval: 30s timeout: 10s retries: 3 diff --git a/requirements.txt b/requirements.txt index 597ed07..586b376 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,4 +17,6 @@ flask-wtf==1.2.2 wtforms==3.2.1 werkzeug==3.1.3 bcrypt==4.3.0 -pytapo==3.3.49 \ No newline at end of file +pytapo==3.3.49 +requests==2.32.5 +email-validator==2.3.0 \ No newline at end of file diff --git a/services/streaming/streaming_service.py b/services/streaming/streaming_service.py index e1d1ee0..75553e9 100644 --- a/services/streaming/streaming_service.py +++ b/services/streaming/streaming_service.py @@ -13,7 +13,7 @@ from services.tracking.deepsort_tracker import DeepSortTracker from services.monitoring.monitors import SleepMonitor, SafetyMonitor from services.visualization.visualizer import Visualizer -from services.streaming.rtsp_reader import RTSPReader +from services.streaming.video_reader import VideoReader class FrameMemoryPool: @@ -49,6 +49,7 @@ def __init__(self, socketio): self.adaptive_quality = 80 self.adaptive_fps = 25 self.daemon = True + self.emitted_frame_count = 0 def add_frame(self, frame): """Add frame to streaming queue (called by AI thread)""" @@ -105,34 +106,22 @@ def update_client_count(self, count): else: self.adaptive_fps = 25 self.adaptive_quality = 80 + + print(f"[WebStreamManager] Client count updated: {count}. Mode: FPS={self.adaptive_fps}, Quality={self.adaptive_quality}") def run(self): - """WebSocket streaming loop""" + """WebStreamManager loop - Reduced to a low-frequency stats update since MJPEG is used""" while self.running: - frame_data, quality = self.get_latest_frame() - if frame_data is not None: - try: - # Encode frame with adaptive quality - quality_val = int(quality) if quality is not None else 80 - encode_params = [cv2.IMWRITE_JPEG_QUALITY, quality_val] - ret, buffer = cv2.imencode('.jpg', frame_data, encode_params) - - if ret: - # Convert to base64 for WebSocket transmission - frame_b64 = base64.b64encode(buffer).decode('utf-8') - - # Send to only users with streaming permissions via WebSocket room - self.socketio.emit('video_frame', {'frame': frame_b64}, room='streaming_enabled') - - # Memory cleanup - del buffer, frame_b64 - - except Exception as e: - print(f"WebSocket streaming error: {e}") + # MJPEG is now used for streaming directly via the /video_feed route. + # This thread no longer needs to encode and emit frames via WebSockets. + # We keep it alive for stats tracking if needed, but remove the heavy cv2.imencode. - # Adaptive sleep based on client load - sleep_time = 1.0 / self.adaptive_fps - time.sleep(sleep_time) + # self.emitted_frame_count += 1 + # if self.emitted_frame_count % 100 == 0: + # print(f"[WebStreamManager] Streaming heartbeat.") + + # Long sleep as we don't need real-time loops here anymore + time.sleep(1.0) def stop(self): self.running = False @@ -148,7 +137,7 @@ def __init__(self, web_stream_manager): self.tracker = DeepSortTracker() self.sleep_monitor = SleepMonitor() self.safety_monitor = SafetyMonitor() - self.reader = RTSPReader(config.RTSP_URL) + self.reader = VideoReader(config.STREAM_URL) # Initialize frame dimensions frame = None @@ -240,7 +229,7 @@ def run(self): # Send to web streaming thread self.web_stream_manager.add_frame(annotated) - # Periodic garbage collection + # Periodic garbage collection self.frame_count += 1 if self.frame_count % self.gc_interval == 0: gc.collect() @@ -249,7 +238,14 @@ def run(self): print(f"AI Pipeline error: {e}") continue - time.sleep(1.0 / config.TARGET_FPS) + # Dynamic sleep based on active viewers (save CPU if no one is watching) + client_count = self.web_stream_manager.client_connections + if client_count == 0: + # No one watching? Slow down to 2 FPS to reduce idle CPU usage + time.sleep(0.5) + else: + # Someone is watching? Process at TARGET_FPS (max 10 FPS on CPU) + time.sleep(1.0 / config.TARGET_FPS) def stop(self): self.running = False @@ -313,7 +309,7 @@ def stop(self): def get_metrics(self): """Get system and streaming metrics""" # System metrics - cpu = psutil.cpu_percent(interval=0.1) + cpu = psutil.cpu_percent(interval=None) memory = psutil.virtual_memory().percent net_io = psutil.net_io_counters() network = min(100, (net_io.bytes_sent + net_io.bytes_recv) / 1e7) @@ -325,8 +321,27 @@ def get_metrics(self): # Sleep metrics (only if streamer is available) if self.ai_streamer is not None: sleep_state, sleep_time = self.ai_streamer.get_sleep_metrics() + # Child status + child_id = self.ai_streamer.tracker.child_id + if child_id is not None: + child_status = { + 'selected': True, + 'child_id': child_id, + 'confidence': self.ai_streamer.tracker.track_confidences.get(child_id, 0) + } + else: + child_status = {'selected': False} else: sleep_state, sleep_time = 'Offline', '0m' + child_status = {'selected': False} + + # Notification count + from models.notification import Notification + try: + # We wrap this in try-except because it requires app context + notification_count = Notification.query.count() + except Exception: + notification_count = 0 # Streaming metrics active_client_count = len(self.active_clients) @@ -341,6 +356,8 @@ def get_metrics(self): 'sleep_state': sleep_state, 'room_temp': room_temp, 'sleep_time': sleep_time, + 'child_status': child_status, + 'notification_count': notification_count, 'streaming': { 'active_clients': active_client_count, 'adaptive_fps': streaming_fps, diff --git a/services/streaming/video_reader.py b/services/streaming/video_reader.py new file mode 100644 index 0000000..e685b9f --- /dev/null +++ b/services/streaming/video_reader.py @@ -0,0 +1,255 @@ +""" +Video streaming module for camera connectivity (RTSP & MJPEG) +""" +import cv2 +import time +import threading +import numpy as np +import requests +from config.settings import config + + +class MJPEGClient: + """ + Custom MJPEG stream reader using requests. + Used primarily when specific SSL/TLS control is needed (e.g. self-signed certs). + Mimics a subset of cv2.VideoCapture interface. + """ + def __init__(self, url, verify_ssl=True): + self.url = url + self.verify_ssl = verify_ssl + self.stream_response = None + self.bytes_buffer = bytes() + self._is_opened = False + self._connect() + + def _connect(self): + try: + # Open the stream with requests + # timeout is for the connection, not the stream duration + print(f"[MJPEGClient] Connecting to {self.url} with verify_ssl={self.verify_ssl}...") + self.stream_response = requests.get( + self.url, + stream=True, + verify=self.verify_ssl, + timeout=10 + ) + print(f"[MJPEGClient] Connection status: {self.stream_response.status_code}") + if self.stream_response.status_code == 200: + self._is_opened = True + # We need to iterate over the content + self.iterator = self.stream_response.iter_content(chunk_size=1024) + else: + self._is_opened = False + except Exception as e: + print(f"[MJPEGClient] Connection error: {e}") + self._is_opened = False + + def isOpened(self): + return self._is_opened + + def read(self): + """ + Reads the next frame from the MJPEG stream. + Returns (ret, frame) similar to cv2.VideoCapture. + """ + if not self._is_opened: + return False, None + + # Basic MJPEG parsing logic: look for JPEG start/end markers + # SOI (Start of Image): 0xFF 0xD8 + # EOI (End of Image): 0xFF 0xD9 + + # We read chunks until we find a full frame + # This is a blocking operation, but in a thread usually. + # For simplicity and robustness, we buffer slightly efficiently. + + try: + while True: + # Search for start + a = self.bytes_buffer.find(b'\xff\xd8') + # Search for end + b = self.bytes_buffer.find(b'\xff\xd9') + + if a != -1 and b != -1: + # We have a candidate frame + jpg = self.bytes_buffer[a:b+2] + # Shift buffer + self.bytes_buffer = self.bytes_buffer[b+2:] + + # Decode + frame = cv2.imdecode(np.frombuffer(jpg, dtype=np.uint8), cv2.IMREAD_COLOR) + if frame is not None: + return True, frame + # If decode fails, we might have had garbage bytes looking like markers, continue + continue + + # If we don't have a full frame, read more + chunk = next(self.iterator) + self.bytes_buffer += chunk + + # Safety break for buffer size to prevent OOM on broken streams + if len(self.bytes_buffer) > 10 * 1024 * 1024: # 10MB limit + print(f"[MJPEGClient] Buffer size limit exceeded ({len(self.bytes_buffer)} bytes). Resetting buffer.") + self.bytes_buffer = bytes() + return False, None + + except StopIteration: + self._is_opened = False + return False, None + except Exception as e: + # print(f"[MJPEGClient] Read error: {e}") + self._is_opened = False + return False, None + + def grab(self): + # Requests stream is linear, we can't easily skip without reading. + # So grab() acts like read() but discards result? + # Or we just return True to simulate that a frame "could" be read. + # For compatibility with the update loop which calls grab() then read(), + # we can make grab() a no-op that returns True if opened. + return self._is_opened + + def release(self): + self._is_opened = False + if self.stream_response: + self.stream_response.close() + + def set(self, prop, val): + pass # ignore properties + + +class VideoReader: + """Threaded Video stream reader with auto-reconnection (Supports RTSP & MJPEG)""" + + def __init__(self, url): + """Initialize Video reader""" + self.url = url + self.cap = None + self.connect() + + self.lock = threading.Lock() + self.latest_frame = None + self.stopped = False + self.connection_lost = False + + # Start reading thread + t = threading.Thread(target=self.update, daemon=True) + t.start() + + def connect(self): + """Try to connect to video stream (RTSP or MJPEG) with retries""" + for attempt in range(config.MAX_RETRIES): + try: + print(f"[VIDEO] Attempting connection (attempt {attempt + 1}/{config.MAX_RETRIES})...") + + # Check if it's an HTTP/MJPEG stream + if self.url.lower().startswith(('http://', 'https://')): + print(f"[VIDEO] Detected HTTP/MJPEG stream: {self.url}") + + if config.STREAM_SELF_SIGNED_CERT: + print("[VIDEO] Allowing self-signed certificates (using MJPEGClient)") + # Use custom client if we need to ignore SSL certs + self.cap = MJPEGClient(self.url, verify_ssl=False) + else: + # Use OpenCV default if standard + self.cap = cv2.VideoCapture(self.url) + else: + # Assume RTSP or other ffmpeg supported stream + print(f"[VIDEO] Detected RTSP/Video stream: {self.url}") + self.cap = cv2.VideoCapture(self.url, cv2.CAP_FFMPEG) + # Set additional properties for better RTSP connection + self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) + self.cap.set(cv2.CAP_PROP_FPS, 25) + + if self.cap.isOpened(): + # Test if we can actually read a frame + ret, frame = self.cap.read() + if ret and frame is not None: + print(f"[SUCCESS] Video connection successful!") + return + else: + self.cap.release() + print(f"[WARNING] Could not read from stream...") + else: + print(f"[WARNING] Could not open stream with {self.url}") + + print(f"[ERROR] Connection attempt {attempt + 1} failed, retrying in {config.RETRY_DELAY} seconds...") + time.sleep(config.RETRY_DELAY) + + except Exception as e: + print(f"[ERROR] Connection error on attempt {attempt + 1}: {e}") + time.sleep(config.RETRY_DELAY) + + raise RuntimeError(self._get_connection_error_message()) + + def _get_connection_error_message(self): + """Generate detailed error message for connection failure""" + camera_ip = 'unknown' + if '@' in self.url: + camera_ip = self.url.split('@')[1].split('/')[0] + elif '://' in self.url: + camera_ip = self.url.split('://')[1].split('/')[0] + + return (f"[ERROR] Cannot open Video stream after {config.MAX_RETRIES} attempts. Please check:\n" + f"1. Camera IP address: {camera_ip}\n" + f"2. Username/password credentials\n" + f"3. Network connectivity\n" + f"4. Camera is powered on and accessible\n" + f"5. URL is correct (RTSP or HTTP/MJPEG)") + + def update(self): + """Background thread for reading frames""" + consecutive_failures = 0 + + while not self.stopped: + if self.cap is None or not self.cap.isOpened(): + time.sleep(0.1) + continue + + # Note: For OpenCV, grab() separates decoding from reading. + # For our MJPEGClient, grab() is a no-op returning True. + grabbed = self.cap.grab() + if not grabbed: + consecutive_failures += 1 + if consecutive_failures >= config.MAX_FAILURES: + print("[WARNING] Too many consecutive failures, attempting to reconnect...") + try: + self.cap.release() + self.connect() + consecutive_failures = 0 + self.connection_lost = False + except Exception as e: + print(f"[ERROR] Reconnection failed: {e}") + self.connection_lost = True + time.sleep(5) # Wait longer before next attempt + else: + time.sleep(0.01) + continue + + ret, frame = self.cap.read() + if not ret or frame is None: + consecutive_failures += 1 + time.sleep(0.01) + continue + + # Reset failure counter on successful read + consecutive_failures = 0 + self.connection_lost = False + + with self.lock: + self.latest_frame = frame + + def read(self): + """Get the latest frame""" + with self.lock: + return self.latest_frame.copy() if self.latest_frame is not None else None + + def stop(self): + """Stop the reader and release resources""" + self.stopped = True + try: + if self.cap: + self.cap.release() + except Exception: + pass From ca600c0b260985e18667385e77de8885f627ab73 Mon Sep 17 00:00:00 2001 From: guanana Date: Fri, 30 Jan 2026 00:18:55 +0000 Subject: [PATCH 2/2] feat: Implement camera service caching and background initialization, add streaming API, and introduce WebSocket client tests. --- Dockerfile | 2 +- README.md | 1 + api/camera_api.py | 10 +- api/metrics_route.py | 2 +- api/stream_route.py | 36 + app.py | 2 + services/controller/camera_service.py | 161 ++- services/streaming/rtsp_reader.py | 128 -- services/streaming/video_reader.py | 2 +- templates/scripts/script_components.html | 1568 +++++++++++----------- tests/test_mjpeg_client.py | 77 ++ tests/test_websocket_client.py | 52 + 12 files changed, 1078 insertions(+), 963 deletions(-) create mode 100644 api/stream_route.py delete mode 100644 services/streaming/rtsp_reader.py create mode 100644 tests/test_mjpeg_client.py create mode 100644 tests/test_websocket_client.py diff --git a/Dockerfile b/Dockerfile index 22afe08..d8ee054 100644 --- a/Dockerfile +++ b/Dockerfile @@ -84,4 +84,4 @@ HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ CMD curl -f http://localhost:8847/ || exit 1 # Default command - run Flask in production mode with Gunicorn -CMD ["gunicorn", "--bind", "0.0.0.0:8847", "--timeout", "120", "app:app"] +CMD ["gunicorn", "--bind", "0.0.0.0:8847", "--workers", "1", "--threads", "4", "--worker-class", "gthread", "--timeout", "120", "app:app"] diff --git a/README.md b/README.md index 4c9a0c1..b2001d6 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,7 @@ ### 🔄 **Smart Connectivity** - **RTSP Stream Support**: Compatible with IP cameras supporting RTSP protocol +- **MJPEG Stream Support**: Compatible with variety of devices that stream this protocol - **Auto-Reconnection**: Intelligent reconnection on network interruptions - **Buffering Optimization**: Low-latency streaming with minimal delay - **Connection Status**: Visual indicators for stream health diff --git a/api/camera_api.py b/api/camera_api.py index 001fc27..5d3bbd3 100644 --- a/api/camera_api.py +++ b/api/camera_api.py @@ -17,18 +17,22 @@ def get_camera_info(): """Get camera basic information, connection status, and presets.""" try: - # Get camera status + # Get camera status (now handles background init and caching) status = camera_service.get_status() - # Get camera presets + # Get camera presets (now handles background init and caching) presets = camera_service.get_presets() + # We return success: True if we at least got a valid JSON response from the service + # even if the camera itself is offline or initializing. return jsonify({ - 'success': status['available'], + 'success': True, + 'is_available': status['available'], 'device_model': status['device_model'], 'privacy_mode': status['privacy_mode'], 'connection_status': status['connection_status'], 'presets': presets, + 'reason': status.get('reason'), 'error': status.get('error') if not status['available'] else None }) except Exception as e: diff --git a/api/metrics_route.py b/api/metrics_route.py index ec9d5cd..dd0b15c 100644 --- a/api/metrics_route.py +++ b/api/metrics_route.py @@ -20,7 +20,7 @@ def get_metrics(): # Return basic metrics when streaming is not available import psutil return jsonify({ - 'cpu': psutil.cpu_percent(interval=0.1), + 'cpu': psutil.cpu_percent(interval=None), 'memory': psutil.virtual_memory().percent, 'network': 0, 'detection_rate': 0, diff --git a/api/stream_route.py b/api/stream_route.py new file mode 100644 index 0000000..d6852e5 --- /dev/null +++ b/api/stream_route.py @@ -0,0 +1,36 @@ +""" +MJPEG Streaming Routes +""" +from flask import Blueprint, Response +from flask_login import login_required +from services.streaming.streaming_service import get_streaming_service +import cv2 +import time + +stream_bp = Blueprint('stream', __name__) + +def gen_frames(): + """Frame generator for MJPEG stream""" + streaming_service = get_streaming_service() + if not streaming_service: + return + + while True: + if streaming_service.ai_streamer: + frame = streaming_service.ai_streamer.get_latest_frame() + if frame is not None: + # Use slightly lower quality for MJPEG to save bandwidth + ret, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 70]) + if ret: + yield (b'--frame\r\n' + b'Content-Type: image/jpeg\r\n\r\n' + buffer.tobytes() + b'\r\n') + + # Limit to ~20 FPS for the web preview + time.sleep(0.05) + +@stream_bp.route('/video_feed') +@login_required +def video_feed(): + """Video streaming route. Put this in the src attribute of an img tag.""" + return Response(gen_frames(), + mimetype='multipart/x-mixed-replace; boundary=frame') diff --git a/app.py b/app.py index 7e79c42..6b2f9fb 100644 --- a/app.py +++ b/app.py @@ -12,6 +12,7 @@ from api.error_handlers import errors_bp from api.websocket_handlers import register_socketio_events from api.camera_api import camera_info_bp, camera_control_bp +from api.stream_route import stream_bp from services.streaming.streaming_service import initialize_streaming_service app = Flask(__name__) @@ -51,6 +52,7 @@ def load_user(user_id): # Import and register active users blueprint from api.active_users_route import active_users_bp app.register_blueprint(active_users_bp) +app.register_blueprint(stream_bp) # Initialize databases (notification manager first, then auth) notification_manager.init_app(app) diff --git a/services/controller/camera_service.py b/services/controller/camera_service.py index 64e9833..46f2bbf 100644 --- a/services/controller/camera_service.py +++ b/services/controller/camera_service.py @@ -2,6 +2,8 @@ Camera Service Layer - Handles camera operations and abstracts camera controller logic """ import logging +import threading +import time from typing import Dict, List, Optional, Any from services.controller.tapo_camera import TapoCameraController from config.settings import config @@ -17,37 +19,68 @@ def __init__(self): self._host = config.CAMERA_HOST self._username = config.CAMERA_USERNAME self._password = config.CAMERA_PASSWORD - + + # Caching and locking + self._cache_lock = threading.Lock() + self._last_status = None + self._last_presets = None + self._last_update_time = 0 + self._cache_duration = 30 # Cache for 30 seconds + + # Thread for background initialization + self._init_thread = None + self._is_initializing = False + def _get_controller(self): """Get or create camera controller instance.""" if not self._is_enabled: return None if self._controller is None: + if self._is_initializing: + return None + try: # Ensure we have valid credentials if not self._host or not self._username or not self._password: self.logger.error("Camera credentials not properly configured") return None - - self._controller = TapoCameraController( - host=self._host, - username=self._username, - password=self._password, - debug=False - ) - self.logger.info("Camera controller initialized successfully") + + # Start initialization in background to not block the web thread + self._is_initializing = True + self._init_thread = threading.Thread(target=self._background_init, daemon=True) + self._init_thread.start() + return None except Exception as e: - self.logger.error(f"Failed to initialize camera controller: {e}") + self.logger.error(f"Failed to start camera controller initialization: {e}") + self._is_initializing = False return None + return self._controller - + + def _background_init(self): + """Perform camera initialization in background.""" + try: + self.logger.info(f"Starting background camera initialization for {self._host}...") + controller = TapoCameraController( + host=self._host, + username=self._username, + password=self._password, + debug=False + ) + self._controller = controller + self.logger.info("Camera controller initialized successfully via background thread") + except Exception as e: + self.logger.error(f"Background camera initialization failed: {e}") + finally: + self._is_initializing = False + def is_available(self) -> bool: """Check if camera service is available.""" - return self._is_enabled and self._get_controller() is not None + return self._is_enabled and self._controller is not None def get_status(self) -> Dict[str, Any]: - """Get camera status and basic information.""" + """Get camera status with caching.""" if not self._is_enabled: return { 'available': False, @@ -57,17 +90,25 @@ def get_status(self) -> Dict[str, Any]: 'connection_status': 'disabled' } + # Check cache + with self._cache_lock: + current_time = time.time() + if self._last_status and (current_time - self._last_update_time < self._cache_duration): + return self._last_status + controller = self._get_controller() if not controller: return { 'available': False, - 'reason': 'offline', - 'device_model': 'Offline', + 'reason': 'initializing' if self._is_initializing else 'offline', + 'device_model': 'Initializing...' if self._is_initializing else 'Offline', 'privacy_mode': False, - 'connection_status': 'offline' + 'connection_status': 'initializing' if self._is_initializing else 'offline' } try: + # Add timeout protection specifically for these calls if the underlying lib allows it, + # otherwise trust threading to keep the app responsive. basic_info = controller.get_basic_info() privacy_mode = controller.get_privacy_mode() @@ -75,22 +116,29 @@ def get_status(self) -> Dict[str, Any]: if privacy_mode is None: privacy_mode = False elif not isinstance(privacy_mode, bool): - # Convert to boolean if it's not already privacy_mode = bool(privacy_mode) - # Extract device model from various possible fields + # Extract device model device_model = 'Unknown' if isinstance(basic_info, dict): - device_model = basic_info.get('device_info').get('basic_info').get('device_alias') - self.logger.info(f"Camera status: available=True, privacy_mode={privacy_mode}, device_model={device_model}") + try: + device_model = basic_info.get('device_info', {}).get('basic_info', {}).get('device_alias', 'Unknown') + except AttributeError: + device_model = 'Tapo Camera' - return { + status = { 'available': True, 'reason': 'connected', 'device_model': device_model, 'privacy_mode': privacy_mode, 'connection_status': 'online', } + + with self._cache_lock: + self._last_status = status + self._last_update_time = time.time() + + return status except Exception as e: self.logger.error(f"Error getting camera status: {e}") return { @@ -103,26 +151,34 @@ def get_status(self) -> Dict[str, Any]: } def get_presets(self) -> List[Dict[str, Any]]: - """Get formatted list of camera presets.""" + """Get camera presets with caching.""" + # Use default presets if disabled or not yet connected + default_presets = [ + {'id': 1, 'name': 'Home Position'}, + {'id': 2, 'name': 'Sleep Area'}, + {'id': 3, 'name': 'Play Area'} + ] + + if not self._is_enabled: + return default_presets + + # Check cache + with self._cache_lock: + current_time = time.time() + if self._last_presets and (current_time - self._last_update_time < self._cache_duration): + return self._last_presets + controller = self._get_controller() if not controller: - # Return default presets when camera is not available - return [ - {'id': 1, 'name': 'Home Position'}, - {'id': 2, 'name': 'Sleep Area'}, - {'id': 3, 'name': 'Play Area'} - ] + return default_presets try: presets = controller.get_presets() - self.logger.debug(f"Raw presets from controller: {presets}") - formatted_presets = [] + if isinstance(presets, list) and len(presets) > 0: - # Handle the case where presets is a list with objects containing id-name pairs for preset_data in presets: if isinstance(preset_data, dict): - # Check if it's the format: {"1": "Bed", "2": "Gate", "3": "back"} for preset_id, preset_name in preset_data.items(): try: formatted_presets.append({ @@ -130,40 +186,32 @@ def get_presets(self) -> List[Dict[str, Any]]: 'name': preset_name }) except (ValueError, TypeError): - # Fallback if preset_id is not a number formatted_presets.append({ 'id': preset_id, 'name': preset_name }) else: - # Handle other formats formatted_presets.append({ 'id': len(formatted_presets) + 1, 'name': f'Preset {len(formatted_presets) + 1}' }) - # If no presets were found or parsed, use default presets if not formatted_presets: - formatted_presets = [ - {'id': 1, 'name': 'Home Position'}, - {'id': 2, 'name': 'Sleep Area'}, - {'id': 3, 'name': 'Play Area'} - ] + formatted_presets = default_presets + + with self._cache_lock: + self._last_presets = formatted_presets + # Note: last_update_time is shared with status return formatted_presets except Exception as e: self.logger.error(f"Error getting presets: {e}") - # Return default presets on error - return [ - {'id': 1, 'name': 'Home Position'}, - {'id': 2, 'name': 'Sleep Area'}, - {'id': 3, 'name': 'Play Area'} - ] + return default_presets class CameraControlService: - """Service layer for camera control operations (movements, settings).""" + """Service layer for camera control operations.""" def __init__(self, camera_service: CameraService): self.camera_service = camera_service @@ -171,9 +219,10 @@ def __init__(self, camera_service: CameraService): def set_preset(self, preset_id: int) -> Dict[str, Any]: """Set camera to specific preset position.""" - controller = self.camera_service._get_controller() - if not controller: - return {'success': False, 'error': 'Camera controller not available'} + if not self.camera_service.is_available(): + return {'success': False, 'error': 'Camera is not ready yet'} + + controller = self.camera_service._controller try: success = controller.set_preset(preset_id) @@ -191,13 +240,19 @@ def set_preset(self, preset_id: int) -> Dict[str, Any]: def set_privacy_mode(self, enabled: bool) -> Dict[str, Any]: """Toggle camera privacy mode.""" - controller = self.camera_service._get_controller() - if not controller: - return {'success': False, 'error': 'Camera controller not available'} + if not self.camera_service.is_available(): + return {'success': False, 'error': 'Camera is not ready yet'} + + controller = self.camera_service._controller try: success = controller.set_privacy_mode(enabled) if success: + # Update cache immediately on manual change + with self.camera_service._cache_lock: + if self.camera_service._last_status: + self.camera_service._last_status['privacy_mode'] = enabled + return { 'success': True, 'privacy_mode': enabled, diff --git a/services/streaming/rtsp_reader.py b/services/streaming/rtsp_reader.py deleted file mode 100644 index da86494..0000000 --- a/services/streaming/rtsp_reader.py +++ /dev/null @@ -1,128 +0,0 @@ -""" -RTSP streaming module for camera connectivity -""" -import cv2 -import time -import threading -from config.settings import config - - -class RTSPReader: - """Threaded RTSP stream reader with auto-reconnection""" - - def __init__(self, url): - """Initialize RTSP reader""" - self.url = url - self.cap = None - self.connect() - - self.lock = threading.Lock() - self.latest_frame = None - self.stopped = False - self.connection_lost = False - - # Start reading thread - t = threading.Thread(target=self.update, daemon=True) - t.start() - - def connect(self): - """Try to connect to RTSP stream with retries""" - for attempt in range(config.MAX_RETRIES): - try: - print(f"[RTSP] Attempting connection (attempt {attempt + 1}/{config.MAX_RETRIES})...") - - # Try different RTSP transport methods - transports = [ - f"{self.url}", # Default - f"{self.url.replace('rtsp://', 'rtsp://')}" # Ensure rtsp prefix - ] - - for transport_url in transports: - self.cap = cv2.VideoCapture(transport_url, cv2.CAP_FFMPEG) - # Set additional properties for better connection - self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) - self.cap.set(cv2.CAP_PROP_FPS, 25) - - if self.cap.isOpened(): - # Test if we can actually read a frame - ret, frame = self.cap.read() - if ret and frame is not None: - print(f"[SUCCESS] RTSP connection successful!") - return - else: - self.cap.release() - print(f"[WARNING] Could not read from stream, trying next method...") - else: - print(f"[WARNING] Could not open stream with {transport_url}") - - print(f"[ERROR] Connection attempt {attempt + 1} failed, retrying in {config.RETRY_DELAY} seconds...") - time.sleep(config.RETRY_DELAY) - - except Exception as e: - print(f"[ERROR] Connection error on attempt {attempt + 1}: {e}") - time.sleep(config.RETRY_DELAY) - - raise RuntimeError(self._get_connection_error_message()) - - def _get_connection_error_message(self): - """Generate detailed error message for connection failure""" - camera_ip = self.url.split('@')[1].split('/')[0] if '@' in self.url else 'unknown' - return (f"[ERROR] Cannot open RTSP stream after {config.MAX_RETRIES} attempts. Please check:\n" - f"1. Camera IP address: {camera_ip}\n" - f"2. Username/password credentials\n" - f"3. Network connectivity\n" - f"4. Camera is powered on and accessible") - - def update(self): - """Background thread for reading frames""" - consecutive_failures = 0 - - while not self.stopped: - if self.cap is None or not self.cap.isOpened(): - time.sleep(0.1) - continue - - grabbed = self.cap.grab() - if not grabbed: - consecutive_failures += 1 - if consecutive_failures >= config.MAX_FAILURES: - print("[WARNING] Too many consecutive failures, attempting to reconnect...") - try: - self.cap.release() - self.connect() - consecutive_failures = 0 - self.connection_lost = False - except Exception as e: - print(f"[ERROR] Reconnection failed: {e}") - self.connection_lost = True - time.sleep(5) # Wait longer before next attempt - else: - time.sleep(0.01) - continue - - ret, frame = self.cap.read() - if not ret or frame is None: - consecutive_failures += 1 - time.sleep(0.01) - continue - - # Reset failure counter on successful read - consecutive_failures = 0 - self.connection_lost = False - - with self.lock: - self.latest_frame = frame - - def read(self): - """Get the latest frame""" - with self.lock: - return self.latest_frame.copy() if self.latest_frame is not None else None - - def stop(self): - """Stop the reader and release resources""" - self.stopped = True - try: - if self.cap: - self.cap.release() - except Exception: - pass diff --git a/services/streaming/video_reader.py b/services/streaming/video_reader.py index e685b9f..5221275 100644 --- a/services/streaming/video_reader.py +++ b/services/streaming/video_reader.py @@ -156,7 +156,7 @@ def connect(self): self.cap = cv2.VideoCapture(self.url) else: # Assume RTSP or other ffmpeg supported stream - print(f"[VIDEO] Detected RTSP/Video stream: {self.url}") + print(f"[VIDEO] Fallback to RTSP/Video stream: {self.url}") self.cap = cv2.VideoCapture(self.url, cv2.CAP_FFMPEG) # Set additional properties for better RTSP connection self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) diff --git a/templates/scripts/script_components.html b/templates/scripts/script_components.html index 383bfeb..05f1ed2 100644 --- a/templates/scripts/script_components.html +++ b/templates/scripts/script_components.html @@ -1,732 +1,748 @@ - + + - - \ No newline at end of file + } + + // Auto-adjust quality based on performance + function autoAdjustQuality() { + if (performanceStats.frameDrops > 5) { + const currentQual = parseInt(qualitySlider.value); + if (currentQual > 30) { + qualitySlider.value = Math.max(30, currentQual - 10); + qualityValue.textContent = qualitySlider.value + '%'; + applyQuality.click(); + showNotification('Quality auto-adjusted due to performance', 'info'); + performanceStats.frameDrops = 0; + } + } + } + + // Check performance every 30 seconds + setInterval(autoAdjustQuality, 30000); + + // Enhanced video stream error handling + videoStream.addEventListener('error', function (e) { + console.error('Video stream error:', e); + showNotification('Video stream error occurred', 'error'); + }); + + videoStream.addEventListener('loadstart', function () { + console.log('Video stream loading started'); + }); + + videoStream.addEventListener('loadeddata', function () { + console.log('Video stream data loaded'); + }); + + // Keyboard shortcut hints + const shortcutHints = { + 's': 'Take Snapshot', + 'p': 'Pause/Resume', + 'f': 'Fullscreen', + 'q': 'Toggle Quality', + 'c': 'Clear Child Selection', + 'r': 'Recording Mode', + 'z': 'Sleep Detection', + 'h': 'Help' + }; + + // Show random shortcut hint occasionally + function showRandomShortcutHint() { + const shortcuts = Object.keys(shortcutHints); + const randomKey = shortcuts[Math.floor(Math.random() * shortcuts.length)]; + const hint = shortcutHints[randomKey]; + showNotification(`💡 Tip: Press "${randomKey.toUpperCase()}" to ${hint}`, 'info'); + } + + // Show hint every 5 minutes + setInterval(showRandomShortcutHint, 300000); + + console.log('🚀 AI Baby Monitor initialized successfully!'); + console.log('📹 Video stream ready'); + console.log('🔌 WebSocket connection established'); + console.log('⌨️ Press H for keyboard shortcuts'); + + \ No newline at end of file diff --git a/tests/test_mjpeg_client.py b/tests/test_mjpeg_client.py new file mode 100644 index 0000000..8fd1a8e --- /dev/null +++ b/tests/test_mjpeg_client.py @@ -0,0 +1,77 @@ +import time +import sys +import os +import cv2 + +# Add project root to path +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +try: + from services.streaming.video_reader import MJPEGClient +except ImportError: + # Handle case where imports might fail if not fully set up + print("Could not import MJPEGClient. Ensure you are running from project root or have paths set.") + sys.exit(1) + +def test_mjpeg_client(url): + print(f"Testing MJPEG Client with URL: {url}") + + client = None + try: + # Simple heuristic to choose client, similar to video_reader.py logic + if url.lower().startswith(('http://', 'https://')): + print("Using MJPEGClient (forcing for test)") + client = MJPEGClient(url, verify_ssl=False) + else: + print("Using OpenCV VideoCapture") + client = cv2.VideoCapture(url) + except Exception as e: + print(f"Failed to initialize client: {e}") + return + + is_cv2 = isinstance(client, cv2.VideoCapture) + + if is_cv2: + if not client.isOpened(): + print("Failed to open stream (OpenCV)") + return + elif not client.isOpened(): + print("Failed to open stream (MJPEGClient)") + return + + print("Stream opened successfully. Reading frames...") + + start_time = time.time() + frame_count = 0 + errors = 0 + + try: + while time.time() - start_time < 10: # Read for 10 seconds + if is_cv2: + ret, frame = client.read() + else: + ret, frame = client.read() + + if ret and frame is not None: + frame_count += 1 + if frame_count % 10 == 0: + print(f"Read {frame_count} frames. Last frame shape: {frame.shape}") + else: + errors += 1 + if errors % 10 == 0: + print(f"Failed to read frame (Total errors: {errors})") + time.sleep(0.01) + + except KeyboardInterrupt: + pass + + print(f"Test finished. Total frames: {frame_count}, Total errors: {errors}") + client.release() + +if __name__ == "__main__": + if len(sys.argv) < 2: + print("Usage: python3 test_mjpeg_client.py ") + sys.exit(1) + + url = sys.argv[1] + test_mjpeg_client(url) diff --git a/tests/test_websocket_client.py b/tests/test_websocket_client.py new file mode 100644 index 0000000..3dc2d0e --- /dev/null +++ b/tests/test_websocket_client.py @@ -0,0 +1,52 @@ +import socketio +import time +import sys + +# Create a Socket.IO client +sio = socketio.Client() + +frame_count = 0 +start_time = 0 + +@sio.event +def connect(): + print("Connected to WebSocket server") + # Join the streaming room? The server code shows: + # socket.emit('video_frame', ..., room='streaming_enabled') + # Use standard mechanisms if the server expects authentication or specific events. + # But usually joining a room is server-side logic based on auth. + # Let's see if we receive anything just by connecting. + +@sio.event +def connect_error(data): + print(f"Connection failed: {data}") + +@sio.event +def disconnect(): + print("Disconnected from server") + +@sio.on('video_frame') +def on_video_frame(data): + global frame_count, start_time + if frame_count == 0: + start_time = time.time() + frame_count += 1 + if frame_count % 10 == 0: + elapsed = time.time() - start_time + fps = frame_count / elapsed + print(f"Received {frame_count} frames. Average FPS: {fps:.2f}") + +def main(): + url = 'http://localhost:8847' + print(f"Connecting to {url}...") + try: + sio.connect(url, wait_timeout=10) + sio.wait() + except Exception as e: + print(f"Error: {e}") + except KeyboardInterrupt: + print("Stopping...") + sio.disconnect() + +if __name__ == '__main__': + main()