diff --git a/.env.example b/.env.example
index 1b6954c..07377af 100644
--- a/.env.example
+++ b/.env.example
@@ -1,5 +1,5 @@
-RTSP_URL=rtsp://username:password@192.168.1.3:554/stream1
+STREAM_URL=rtsp://username:password@192.168.1.3:554/stream1
CAMERA_HOST=192.168.1.100
CAMERA_USERNAME=your_tapo_email@example.com
diff --git a/.gitignore b/.gitignore
index b7faf40..e8fece7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -173,7 +173,7 @@ cython_debug/
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
-#.idea/
+.idea/
# Abstra
# Abstra is an AI-powered process automation framework.
diff --git a/Dockerfile b/Dockerfile
index 22afe08..d8ee054 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -84,4 +84,4 @@ HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8847/ || exit 1
# Default command - run Flask in production mode with Gunicorn
-CMD ["gunicorn", "--bind", "0.0.0.0:8847", "--timeout", "120", "app:app"]
+CMD ["gunicorn", "--bind", "0.0.0.0:8847", "--workers", "1", "--threads", "4", "--worker-class", "gthread", "--timeout", "120", "app:app"]
diff --git a/README.md b/README.md
index 4c9a0c1..b2001d6 100644
--- a/README.md
+++ b/README.md
@@ -43,6 +43,7 @@
### 🔄 **Smart Connectivity**
- **RTSP Stream Support**: Compatible with IP cameras supporting RTSP protocol
+- **MJPEG Stream Support**: Compatible with variety of devices that stream this protocol
- **Auto-Reconnection**: Intelligent reconnection on network interruptions
- **Buffering Optimization**: Low-latency streaming with minimal delay
- **Connection Status**: Visual indicators for stream health
diff --git a/api/camera_api.py b/api/camera_api.py
index 001fc27..5d3bbd3 100644
--- a/api/camera_api.py
+++ b/api/camera_api.py
@@ -17,18 +17,22 @@
def get_camera_info():
"""Get camera basic information, connection status, and presets."""
try:
- # Get camera status
+ # Get camera status (now handles background init and caching)
status = camera_service.get_status()
- # Get camera presets
+ # Get camera presets (now handles background init and caching)
presets = camera_service.get_presets()
+ # We return success: True if we at least got a valid JSON response from the service
+ # even if the camera itself is offline or initializing.
return jsonify({
- 'success': status['available'],
+ 'success': True,
+ 'is_available': status['available'],
'device_model': status['device_model'],
'privacy_mode': status['privacy_mode'],
'connection_status': status['connection_status'],
'presets': presets,
+ 'reason': status.get('reason'),
'error': status.get('error') if not status['available'] else None
})
except Exception as e:
diff --git a/api/metrics_route.py b/api/metrics_route.py
index ec9d5cd..dd0b15c 100644
--- a/api/metrics_route.py
+++ b/api/metrics_route.py
@@ -20,7 +20,7 @@ def get_metrics():
# Return basic metrics when streaming is not available
import psutil
return jsonify({
- 'cpu': psutil.cpu_percent(interval=0.1),
+ 'cpu': psutil.cpu_percent(interval=None),
'memory': psutil.virtual_memory().percent,
'network': 0,
'detection_rate': 0,
diff --git a/api/stream_route.py b/api/stream_route.py
new file mode 100644
index 0000000..d6852e5
--- /dev/null
+++ b/api/stream_route.py
@@ -0,0 +1,36 @@
+"""
+MJPEG Streaming Routes
+"""
+from flask import Blueprint, Response
+from flask_login import login_required
+from services.streaming.streaming_service import get_streaming_service
+import cv2
+import time
+
+stream_bp = Blueprint('stream', __name__)
+
+def gen_frames():
+ """Frame generator for MJPEG stream"""
+ streaming_service = get_streaming_service()
+ if not streaming_service:
+ return
+
+ while True:
+ if streaming_service.ai_streamer:
+ frame = streaming_service.ai_streamer.get_latest_frame()
+ if frame is not None:
+ # Use slightly lower quality for MJPEG to save bandwidth
+ ret, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 70])
+ if ret:
+ yield (b'--frame\r\n'
+ b'Content-Type: image/jpeg\r\n\r\n' + buffer.tobytes() + b'\r\n')
+
+ # Limit to ~20 FPS for the web preview
+ time.sleep(0.05)
+
+@stream_bp.route('/video_feed')
+@login_required
+def video_feed():
+ """Video streaming route. Put this in the src attribute of an img tag."""
+ return Response(gen_frames(),
+ mimetype='multipart/x-mixed-replace; boundary=frame')
diff --git a/app.py b/app.py
index 7e79c42..6b2f9fb 100644
--- a/app.py
+++ b/app.py
@@ -12,6 +12,7 @@
from api.error_handlers import errors_bp
from api.websocket_handlers import register_socketio_events
from api.camera_api import camera_info_bp, camera_control_bp
+from api.stream_route import stream_bp
from services.streaming.streaming_service import initialize_streaming_service
app = Flask(__name__)
@@ -51,6 +52,7 @@ def load_user(user_id):
# Import and register active users blueprint
from api.active_users_route import active_users_bp
app.register_blueprint(active_users_bp)
+app.register_blueprint(stream_bp)
# Initialize databases (notification manager first, then auth)
notification_manager.init_app(app)
diff --git a/config/settings.py b/config/settings.py
index 93dcc43..55a2275 100644
--- a/config/settings.py
+++ b/config/settings.py
@@ -10,8 +10,9 @@
class BabyMonitorSettings:
"""Main configuration class for RTSP Recorder"""
- # ==================== RTSP Settings ====================
- RTSP_URL = os.getenv("RTSP_URL")
+ # ==================== Video Input Settings ====================
+ STREAM_URL = os.getenv("STREAM_URL", os.getenv("RTSP_URL")) # Can be RTSP URI or HTTP/HTTPS MJPEG stream URL
+ STREAM_SELF_SIGNED_CERT = os.getenv("STREAM_SELF_SIGNED_CERT", "false").lower() == "true"
RTSP_TIMEOUT = int(os.getenv("RTSP_TIMEOUT", 10))
YOLO_MODEL_NAME = os.getenv("MODEL_NAME", "yolov8n.pt")
@@ -33,7 +34,7 @@ class BabyMonitorSettings:
# Use Docker-compatible paths if running in container
CONFIDENCE_THRESHOLD = 0.4 # detection confidence
- TARGET_FPS = 30.0 # reduced fps for CPU processing
+ TARGET_FPS = 30.0 # reduced fps for CPU processing. Higher values starve the web server.
DEBUG_VIDEO = True # enable extra video debugging output
# GPU usage flag
diff --git a/docker-compose.yml b/docker-compose.yml
index cb67ed2..afa0fb2 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -1,26 +1,35 @@
services:
ai-baby-monitor:
- build:
+ build:
context: .
dockerfile: Dockerfile
container_name: AI-Baby-Monitor
restart: unless-stopped
ports:
- "8847:8847"
-
+
# Environment variables
environment:
- - RTSP_URL=${RTSP_URL}
+ - STREAM_URL=${STREAM_URL}
+ - STREAM_SELF_SIGNED_CERT=${STREAM_SELF_SIGNED_CERT}
- FLASK_ENV=production
- PYTHONUNBUFFERED=1
-
+
# Volume mappings
volumes:
# Map recordings directory (MONITOR_RECORDINGS_DIR)
- ${HOME}/baby-monitor:/app/baby-monitor
- # Health check override
+ # Mount source code for development/debugging
+ #- ./services:/app/services
+ #- ./templates:/app/templates
+ #- ./static:/app/static
+ #- ./api:/app/api
+ #- ./config:/app/config
+ #- ./utils:/app/utils
+ #- ./app.py:/app/app.py
+ # Health check override
healthcheck:
- test: ["CMD", "curl", "-f", "http://localhost:8847/health"]
+ test: [ "CMD", "curl", "-f", "http://localhost:8847/health" ]
interval: 30s
timeout: 10s
retries: 3
diff --git a/requirements.txt b/requirements.txt
index 597ed07..586b376 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -17,4 +17,6 @@ flask-wtf==1.2.2
wtforms==3.2.1
werkzeug==3.1.3
bcrypt==4.3.0
-pytapo==3.3.49
\ No newline at end of file
+pytapo==3.3.49
+requests==2.32.5
+email-validator==2.3.0
\ No newline at end of file
diff --git a/services/controller/camera_service.py b/services/controller/camera_service.py
index 64e9833..46f2bbf 100644
--- a/services/controller/camera_service.py
+++ b/services/controller/camera_service.py
@@ -2,6 +2,8 @@
Camera Service Layer - Handles camera operations and abstracts camera controller logic
"""
import logging
+import threading
+import time
from typing import Dict, List, Optional, Any
from services.controller.tapo_camera import TapoCameraController
from config.settings import config
@@ -17,37 +19,68 @@ def __init__(self):
self._host = config.CAMERA_HOST
self._username = config.CAMERA_USERNAME
self._password = config.CAMERA_PASSWORD
-
+
+ # Caching and locking
+ self._cache_lock = threading.Lock()
+ self._last_status = None
+ self._last_presets = None
+ self._last_update_time = 0
+ self._cache_duration = 30 # Cache for 30 seconds
+
+ # Thread for background initialization
+ self._init_thread = None
+ self._is_initializing = False
+
def _get_controller(self):
"""Get or create camera controller instance."""
if not self._is_enabled:
return None
if self._controller is None:
+ if self._is_initializing:
+ return None
+
try:
# Ensure we have valid credentials
if not self._host or not self._username or not self._password:
self.logger.error("Camera credentials not properly configured")
return None
-
- self._controller = TapoCameraController(
- host=self._host,
- username=self._username,
- password=self._password,
- debug=False
- )
- self.logger.info("Camera controller initialized successfully")
+
+ # Start initialization in background to not block the web thread
+ self._is_initializing = True
+ self._init_thread = threading.Thread(target=self._background_init, daemon=True)
+ self._init_thread.start()
+ return None
except Exception as e:
- self.logger.error(f"Failed to initialize camera controller: {e}")
+ self.logger.error(f"Failed to start camera controller initialization: {e}")
+ self._is_initializing = False
return None
+
return self._controller
-
+
+ def _background_init(self):
+ """Perform camera initialization in background."""
+ try:
+ self.logger.info(f"Starting background camera initialization for {self._host}...")
+ controller = TapoCameraController(
+ host=self._host,
+ username=self._username,
+ password=self._password,
+ debug=False
+ )
+ self._controller = controller
+ self.logger.info("Camera controller initialized successfully via background thread")
+ except Exception as e:
+ self.logger.error(f"Background camera initialization failed: {e}")
+ finally:
+ self._is_initializing = False
+
def is_available(self) -> bool:
"""Check if camera service is available."""
- return self._is_enabled and self._get_controller() is not None
+ return self._is_enabled and self._controller is not None
def get_status(self) -> Dict[str, Any]:
- """Get camera status and basic information."""
+ """Get camera status with caching."""
if not self._is_enabled:
return {
'available': False,
@@ -57,17 +90,25 @@ def get_status(self) -> Dict[str, Any]:
'connection_status': 'disabled'
}
+ # Check cache
+ with self._cache_lock:
+ current_time = time.time()
+ if self._last_status and (current_time - self._last_update_time < self._cache_duration):
+ return self._last_status
+
controller = self._get_controller()
if not controller:
return {
'available': False,
- 'reason': 'offline',
- 'device_model': 'Offline',
+ 'reason': 'initializing' if self._is_initializing else 'offline',
+ 'device_model': 'Initializing...' if self._is_initializing else 'Offline',
'privacy_mode': False,
- 'connection_status': 'offline'
+ 'connection_status': 'initializing' if self._is_initializing else 'offline'
}
try:
+ # Add timeout protection specifically for these calls if the underlying lib allows it,
+ # otherwise trust threading to keep the app responsive.
basic_info = controller.get_basic_info()
privacy_mode = controller.get_privacy_mode()
@@ -75,22 +116,29 @@ def get_status(self) -> Dict[str, Any]:
if privacy_mode is None:
privacy_mode = False
elif not isinstance(privacy_mode, bool):
- # Convert to boolean if it's not already
privacy_mode = bool(privacy_mode)
- # Extract device model from various possible fields
+ # Extract device model
device_model = 'Unknown'
if isinstance(basic_info, dict):
- device_model = basic_info.get('device_info').get('basic_info').get('device_alias')
- self.logger.info(f"Camera status: available=True, privacy_mode={privacy_mode}, device_model={device_model}")
+ try:
+ device_model = basic_info.get('device_info', {}).get('basic_info', {}).get('device_alias', 'Unknown')
+ except AttributeError:
+ device_model = 'Tapo Camera'
- return {
+ status = {
'available': True,
'reason': 'connected',
'device_model': device_model,
'privacy_mode': privacy_mode,
'connection_status': 'online',
}
+
+ with self._cache_lock:
+ self._last_status = status
+ self._last_update_time = time.time()
+
+ return status
except Exception as e:
self.logger.error(f"Error getting camera status: {e}")
return {
@@ -103,26 +151,34 @@ def get_status(self) -> Dict[str, Any]:
}
def get_presets(self) -> List[Dict[str, Any]]:
- """Get formatted list of camera presets."""
+ """Get camera presets with caching."""
+ # Use default presets if disabled or not yet connected
+ default_presets = [
+ {'id': 1, 'name': 'Home Position'},
+ {'id': 2, 'name': 'Sleep Area'},
+ {'id': 3, 'name': 'Play Area'}
+ ]
+
+ if not self._is_enabled:
+ return default_presets
+
+ # Check cache
+ with self._cache_lock:
+ current_time = time.time()
+ if self._last_presets and (current_time - self._last_update_time < self._cache_duration):
+ return self._last_presets
+
controller = self._get_controller()
if not controller:
- # Return default presets when camera is not available
- return [
- {'id': 1, 'name': 'Home Position'},
- {'id': 2, 'name': 'Sleep Area'},
- {'id': 3, 'name': 'Play Area'}
- ]
+ return default_presets
try:
presets = controller.get_presets()
- self.logger.debug(f"Raw presets from controller: {presets}")
-
formatted_presets = []
+
if isinstance(presets, list) and len(presets) > 0:
- # Handle the case where presets is a list with objects containing id-name pairs
for preset_data in presets:
if isinstance(preset_data, dict):
- # Check if it's the format: {"1": "Bed", "2": "Gate", "3": "back"}
for preset_id, preset_name in preset_data.items():
try:
formatted_presets.append({
@@ -130,40 +186,32 @@ def get_presets(self) -> List[Dict[str, Any]]:
'name': preset_name
})
except (ValueError, TypeError):
- # Fallback if preset_id is not a number
formatted_presets.append({
'id': preset_id,
'name': preset_name
})
else:
- # Handle other formats
formatted_presets.append({
'id': len(formatted_presets) + 1,
'name': f'Preset {len(formatted_presets) + 1}'
})
- # If no presets were found or parsed, use default presets
if not formatted_presets:
- formatted_presets = [
- {'id': 1, 'name': 'Home Position'},
- {'id': 2, 'name': 'Sleep Area'},
- {'id': 3, 'name': 'Play Area'}
- ]
+ formatted_presets = default_presets
+
+ with self._cache_lock:
+ self._last_presets = formatted_presets
+ # Note: last_update_time is shared with status
return formatted_presets
except Exception as e:
self.logger.error(f"Error getting presets: {e}")
- # Return default presets on error
- return [
- {'id': 1, 'name': 'Home Position'},
- {'id': 2, 'name': 'Sleep Area'},
- {'id': 3, 'name': 'Play Area'}
- ]
+ return default_presets
class CameraControlService:
- """Service layer for camera control operations (movements, settings)."""
+ """Service layer for camera control operations."""
def __init__(self, camera_service: CameraService):
self.camera_service = camera_service
@@ -171,9 +219,10 @@ def __init__(self, camera_service: CameraService):
def set_preset(self, preset_id: int) -> Dict[str, Any]:
"""Set camera to specific preset position."""
- controller = self.camera_service._get_controller()
- if not controller:
- return {'success': False, 'error': 'Camera controller not available'}
+ if not self.camera_service.is_available():
+ return {'success': False, 'error': 'Camera is not ready yet'}
+
+ controller = self.camera_service._controller
try:
success = controller.set_preset(preset_id)
@@ -191,13 +240,19 @@ def set_preset(self, preset_id: int) -> Dict[str, Any]:
def set_privacy_mode(self, enabled: bool) -> Dict[str, Any]:
"""Toggle camera privacy mode."""
- controller = self.camera_service._get_controller()
- if not controller:
- return {'success': False, 'error': 'Camera controller not available'}
+ if not self.camera_service.is_available():
+ return {'success': False, 'error': 'Camera is not ready yet'}
+
+ controller = self.camera_service._controller
try:
success = controller.set_privacy_mode(enabled)
if success:
+ # Update cache immediately on manual change
+ with self.camera_service._cache_lock:
+ if self.camera_service._last_status:
+ self.camera_service._last_status['privacy_mode'] = enabled
+
return {
'success': True,
'privacy_mode': enabled,
diff --git a/services/streaming/rtsp_reader.py b/services/streaming/rtsp_reader.py
deleted file mode 100644
index da86494..0000000
--- a/services/streaming/rtsp_reader.py
+++ /dev/null
@@ -1,128 +0,0 @@
-"""
-RTSP streaming module for camera connectivity
-"""
-import cv2
-import time
-import threading
-from config.settings import config
-
-
-class RTSPReader:
- """Threaded RTSP stream reader with auto-reconnection"""
-
- def __init__(self, url):
- """Initialize RTSP reader"""
- self.url = url
- self.cap = None
- self.connect()
-
- self.lock = threading.Lock()
- self.latest_frame = None
- self.stopped = False
- self.connection_lost = False
-
- # Start reading thread
- t = threading.Thread(target=self.update, daemon=True)
- t.start()
-
- def connect(self):
- """Try to connect to RTSP stream with retries"""
- for attempt in range(config.MAX_RETRIES):
- try:
- print(f"[RTSP] Attempting connection (attempt {attempt + 1}/{config.MAX_RETRIES})...")
-
- # Try different RTSP transport methods
- transports = [
- f"{self.url}", # Default
- f"{self.url.replace('rtsp://', 'rtsp://')}" # Ensure rtsp prefix
- ]
-
- for transport_url in transports:
- self.cap = cv2.VideoCapture(transport_url, cv2.CAP_FFMPEG)
- # Set additional properties for better connection
- self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
- self.cap.set(cv2.CAP_PROP_FPS, 25)
-
- if self.cap.isOpened():
- # Test if we can actually read a frame
- ret, frame = self.cap.read()
- if ret and frame is not None:
- print(f"[SUCCESS] RTSP connection successful!")
- return
- else:
- self.cap.release()
- print(f"[WARNING] Could not read from stream, trying next method...")
- else:
- print(f"[WARNING] Could not open stream with {transport_url}")
-
- print(f"[ERROR] Connection attempt {attempt + 1} failed, retrying in {config.RETRY_DELAY} seconds...")
- time.sleep(config.RETRY_DELAY)
-
- except Exception as e:
- print(f"[ERROR] Connection error on attempt {attempt + 1}: {e}")
- time.sleep(config.RETRY_DELAY)
-
- raise RuntimeError(self._get_connection_error_message())
-
- def _get_connection_error_message(self):
- """Generate detailed error message for connection failure"""
- camera_ip = self.url.split('@')[1].split('/')[0] if '@' in self.url else 'unknown'
- return (f"[ERROR] Cannot open RTSP stream after {config.MAX_RETRIES} attempts. Please check:\n"
- f"1. Camera IP address: {camera_ip}\n"
- f"2. Username/password credentials\n"
- f"3. Network connectivity\n"
- f"4. Camera is powered on and accessible")
-
- def update(self):
- """Background thread for reading frames"""
- consecutive_failures = 0
-
- while not self.stopped:
- if self.cap is None or not self.cap.isOpened():
- time.sleep(0.1)
- continue
-
- grabbed = self.cap.grab()
- if not grabbed:
- consecutive_failures += 1
- if consecutive_failures >= config.MAX_FAILURES:
- print("[WARNING] Too many consecutive failures, attempting to reconnect...")
- try:
- self.cap.release()
- self.connect()
- consecutive_failures = 0
- self.connection_lost = False
- except Exception as e:
- print(f"[ERROR] Reconnection failed: {e}")
- self.connection_lost = True
- time.sleep(5) # Wait longer before next attempt
- else:
- time.sleep(0.01)
- continue
-
- ret, frame = self.cap.read()
- if not ret or frame is None:
- consecutive_failures += 1
- time.sleep(0.01)
- continue
-
- # Reset failure counter on successful read
- consecutive_failures = 0
- self.connection_lost = False
-
- with self.lock:
- self.latest_frame = frame
-
- def read(self):
- """Get the latest frame"""
- with self.lock:
- return self.latest_frame.copy() if self.latest_frame is not None else None
-
- def stop(self):
- """Stop the reader and release resources"""
- self.stopped = True
- try:
- if self.cap:
- self.cap.release()
- except Exception:
- pass
diff --git a/services/streaming/streaming_service.py b/services/streaming/streaming_service.py
index e1d1ee0..75553e9 100644
--- a/services/streaming/streaming_service.py
+++ b/services/streaming/streaming_service.py
@@ -13,7 +13,7 @@
from services.tracking.deepsort_tracker import DeepSortTracker
from services.monitoring.monitors import SleepMonitor, SafetyMonitor
from services.visualization.visualizer import Visualizer
-from services.streaming.rtsp_reader import RTSPReader
+from services.streaming.video_reader import VideoReader
class FrameMemoryPool:
@@ -49,6 +49,7 @@ def __init__(self, socketio):
self.adaptive_quality = 80
self.adaptive_fps = 25
self.daemon = True
+ self.emitted_frame_count = 0
def add_frame(self, frame):
"""Add frame to streaming queue (called by AI thread)"""
@@ -105,34 +106,22 @@ def update_client_count(self, count):
else:
self.adaptive_fps = 25
self.adaptive_quality = 80
+
+ print(f"[WebStreamManager] Client count updated: {count}. Mode: FPS={self.adaptive_fps}, Quality={self.adaptive_quality}")
def run(self):
- """WebSocket streaming loop"""
+ """WebStreamManager loop - Reduced to a low-frequency stats update since MJPEG is used"""
while self.running:
- frame_data, quality = self.get_latest_frame()
- if frame_data is not None:
- try:
- # Encode frame with adaptive quality
- quality_val = int(quality) if quality is not None else 80
- encode_params = [cv2.IMWRITE_JPEG_QUALITY, quality_val]
- ret, buffer = cv2.imencode('.jpg', frame_data, encode_params)
-
- if ret:
- # Convert to base64 for WebSocket transmission
- frame_b64 = base64.b64encode(buffer).decode('utf-8')
-
- # Send to only users with streaming permissions via WebSocket room
- self.socketio.emit('video_frame', {'frame': frame_b64}, room='streaming_enabled')
-
- # Memory cleanup
- del buffer, frame_b64
-
- except Exception as e:
- print(f"WebSocket streaming error: {e}")
+ # MJPEG is now used for streaming directly via the /video_feed route.
+ # This thread no longer needs to encode and emit frames via WebSockets.
+ # We keep it alive for stats tracking if needed, but remove the heavy cv2.imencode.
- # Adaptive sleep based on client load
- sleep_time = 1.0 / self.adaptive_fps
- time.sleep(sleep_time)
+ # self.emitted_frame_count += 1
+ # if self.emitted_frame_count % 100 == 0:
+ # print(f"[WebStreamManager] Streaming heartbeat.")
+
+ # Long sleep as we don't need real-time loops here anymore
+ time.sleep(1.0)
def stop(self):
self.running = False
@@ -148,7 +137,7 @@ def __init__(self, web_stream_manager):
self.tracker = DeepSortTracker()
self.sleep_monitor = SleepMonitor()
self.safety_monitor = SafetyMonitor()
- self.reader = RTSPReader(config.RTSP_URL)
+ self.reader = VideoReader(config.STREAM_URL)
# Initialize frame dimensions
frame = None
@@ -240,7 +229,7 @@ def run(self):
# Send to web streaming thread
self.web_stream_manager.add_frame(annotated)
- # Periodic garbage collection
+ # Periodic garbage collection
self.frame_count += 1
if self.frame_count % self.gc_interval == 0:
gc.collect()
@@ -249,7 +238,14 @@ def run(self):
print(f"AI Pipeline error: {e}")
continue
- time.sleep(1.0 / config.TARGET_FPS)
+ # Dynamic sleep based on active viewers (save CPU if no one is watching)
+ client_count = self.web_stream_manager.client_connections
+ if client_count == 0:
+ # No one watching? Slow down to 2 FPS to reduce idle CPU usage
+ time.sleep(0.5)
+ else:
+ # Someone is watching? Process at TARGET_FPS (max 10 FPS on CPU)
+ time.sleep(1.0 / config.TARGET_FPS)
def stop(self):
self.running = False
@@ -313,7 +309,7 @@ def stop(self):
def get_metrics(self):
"""Get system and streaming metrics"""
# System metrics
- cpu = psutil.cpu_percent(interval=0.1)
+ cpu = psutil.cpu_percent(interval=None)
memory = psutil.virtual_memory().percent
net_io = psutil.net_io_counters()
network = min(100, (net_io.bytes_sent + net_io.bytes_recv) / 1e7)
@@ -325,8 +321,27 @@ def get_metrics(self):
# Sleep metrics (only if streamer is available)
if self.ai_streamer is not None:
sleep_state, sleep_time = self.ai_streamer.get_sleep_metrics()
+ # Child status
+ child_id = self.ai_streamer.tracker.child_id
+ if child_id is not None:
+ child_status = {
+ 'selected': True,
+ 'child_id': child_id,
+ 'confidence': self.ai_streamer.tracker.track_confidences.get(child_id, 0)
+ }
+ else:
+ child_status = {'selected': False}
else:
sleep_state, sleep_time = 'Offline', '0m'
+ child_status = {'selected': False}
+
+ # Notification count
+ from models.notification import Notification
+ try:
+ # We wrap this in try-except because it requires app context
+ notification_count = Notification.query.count()
+ except Exception:
+ notification_count = 0
# Streaming metrics
active_client_count = len(self.active_clients)
@@ -341,6 +356,8 @@ def get_metrics(self):
'sleep_state': sleep_state,
'room_temp': room_temp,
'sleep_time': sleep_time,
+ 'child_status': child_status,
+ 'notification_count': notification_count,
'streaming': {
'active_clients': active_client_count,
'adaptive_fps': streaming_fps,
diff --git a/services/streaming/video_reader.py b/services/streaming/video_reader.py
new file mode 100644
index 0000000..5221275
--- /dev/null
+++ b/services/streaming/video_reader.py
@@ -0,0 +1,255 @@
+"""
+Video streaming module for camera connectivity (RTSP & MJPEG)
+"""
+import cv2
+import time
+import threading
+import numpy as np
+import requests
+from config.settings import config
+
+
+class MJPEGClient:
+ """
+ Custom MJPEG stream reader using requests.
+ Used primarily when specific SSL/TLS control is needed (e.g. self-signed certs).
+ Mimics a subset of cv2.VideoCapture interface.
+ """
+ def __init__(self, url, verify_ssl=True):
+ self.url = url
+ self.verify_ssl = verify_ssl
+ self.stream_response = None
+ self.bytes_buffer = bytes()
+ self._is_opened = False
+ self._connect()
+
+ def _connect(self):
+ try:
+ # Open the stream with requests
+ # timeout is for the connection, not the stream duration
+ print(f"[MJPEGClient] Connecting to {self.url} with verify_ssl={self.verify_ssl}...")
+ self.stream_response = requests.get(
+ self.url,
+ stream=True,
+ verify=self.verify_ssl,
+ timeout=10
+ )
+ print(f"[MJPEGClient] Connection status: {self.stream_response.status_code}")
+ if self.stream_response.status_code == 200:
+ self._is_opened = True
+ # We need to iterate over the content
+ self.iterator = self.stream_response.iter_content(chunk_size=1024)
+ else:
+ self._is_opened = False
+ except Exception as e:
+ print(f"[MJPEGClient] Connection error: {e}")
+ self._is_opened = False
+
+ def isOpened(self):
+ return self._is_opened
+
+ def read(self):
+ """
+ Reads the next frame from the MJPEG stream.
+ Returns (ret, frame) similar to cv2.VideoCapture.
+ """
+ if not self._is_opened:
+ return False, None
+
+ # Basic MJPEG parsing logic: look for JPEG start/end markers
+ # SOI (Start of Image): 0xFF 0xD8
+ # EOI (End of Image): 0xFF 0xD9
+
+ # We read chunks until we find a full frame
+ # This is a blocking operation, but in a thread usually.
+ # For simplicity and robustness, we buffer slightly efficiently.
+
+ try:
+ while True:
+ # Search for start
+ a = self.bytes_buffer.find(b'\xff\xd8')
+ # Search for end
+ b = self.bytes_buffer.find(b'\xff\xd9')
+
+ if a != -1 and b != -1:
+ # We have a candidate frame
+ jpg = self.bytes_buffer[a:b+2]
+ # Shift buffer
+ self.bytes_buffer = self.bytes_buffer[b+2:]
+
+ # Decode
+ frame = cv2.imdecode(np.frombuffer(jpg, dtype=np.uint8), cv2.IMREAD_COLOR)
+ if frame is not None:
+ return True, frame
+ # If decode fails, we might have had garbage bytes looking like markers, continue
+ continue
+
+ # If we don't have a full frame, read more
+ chunk = next(self.iterator)
+ self.bytes_buffer += chunk
+
+ # Safety break for buffer size to prevent OOM on broken streams
+ if len(self.bytes_buffer) > 10 * 1024 * 1024: # 10MB limit
+ print(f"[MJPEGClient] Buffer size limit exceeded ({len(self.bytes_buffer)} bytes). Resetting buffer.")
+ self.bytes_buffer = bytes()
+ return False, None
+
+ except StopIteration:
+ self._is_opened = False
+ return False, None
+ except Exception as e:
+ # print(f"[MJPEGClient] Read error: {e}")
+ self._is_opened = False
+ return False, None
+
+ def grab(self):
+ # Requests stream is linear, we can't easily skip without reading.
+ # So grab() acts like read() but discards result?
+ # Or we just return True to simulate that a frame "could" be read.
+ # For compatibility with the update loop which calls grab() then read(),
+ # we can make grab() a no-op that returns True if opened.
+ return self._is_opened
+
+ def release(self):
+ self._is_opened = False
+ if self.stream_response:
+ self.stream_response.close()
+
+ def set(self, prop, val):
+ pass # ignore properties
+
+
+class VideoReader:
+ """Threaded Video stream reader with auto-reconnection (Supports RTSP & MJPEG)"""
+
+ def __init__(self, url):
+ """Initialize Video reader"""
+ self.url = url
+ self.cap = None
+ self.connect()
+
+ self.lock = threading.Lock()
+ self.latest_frame = None
+ self.stopped = False
+ self.connection_lost = False
+
+ # Start reading thread
+ t = threading.Thread(target=self.update, daemon=True)
+ t.start()
+
+ def connect(self):
+ """Try to connect to video stream (RTSP or MJPEG) with retries"""
+ for attempt in range(config.MAX_RETRIES):
+ try:
+ print(f"[VIDEO] Attempting connection (attempt {attempt + 1}/{config.MAX_RETRIES})...")
+
+ # Check if it's an HTTP/MJPEG stream
+ if self.url.lower().startswith(('http://', 'https://')):
+ print(f"[VIDEO] Detected HTTP/MJPEG stream: {self.url}")
+
+ if config.STREAM_SELF_SIGNED_CERT:
+ print("[VIDEO] Allowing self-signed certificates (using MJPEGClient)")
+ # Use custom client if we need to ignore SSL certs
+ self.cap = MJPEGClient(self.url, verify_ssl=False)
+ else:
+ # Use OpenCV default if standard
+ self.cap = cv2.VideoCapture(self.url)
+ else:
+ # Assume RTSP or other ffmpeg supported stream
+ print(f"[VIDEO] Fallback to RTSP/Video stream: {self.url}")
+ self.cap = cv2.VideoCapture(self.url, cv2.CAP_FFMPEG)
+ # Set additional properties for better RTSP connection
+ self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
+ self.cap.set(cv2.CAP_PROP_FPS, 25)
+
+ if self.cap.isOpened():
+ # Test if we can actually read a frame
+ ret, frame = self.cap.read()
+ if ret and frame is not None:
+ print(f"[SUCCESS] Video connection successful!")
+ return
+ else:
+ self.cap.release()
+ print(f"[WARNING] Could not read from stream...")
+ else:
+ print(f"[WARNING] Could not open stream with {self.url}")
+
+ print(f"[ERROR] Connection attempt {attempt + 1} failed, retrying in {config.RETRY_DELAY} seconds...")
+ time.sleep(config.RETRY_DELAY)
+
+ except Exception as e:
+ print(f"[ERROR] Connection error on attempt {attempt + 1}: {e}")
+ time.sleep(config.RETRY_DELAY)
+
+ raise RuntimeError(self._get_connection_error_message())
+
+ def _get_connection_error_message(self):
+ """Generate detailed error message for connection failure"""
+ camera_ip = 'unknown'
+ if '@' in self.url:
+ camera_ip = self.url.split('@')[1].split('/')[0]
+ elif '://' in self.url:
+ camera_ip = self.url.split('://')[1].split('/')[0]
+
+ return (f"[ERROR] Cannot open Video stream after {config.MAX_RETRIES} attempts. Please check:\n"
+ f"1. Camera IP address: {camera_ip}\n"
+ f"2. Username/password credentials\n"
+ f"3. Network connectivity\n"
+ f"4. Camera is powered on and accessible\n"
+ f"5. URL is correct (RTSP or HTTP/MJPEG)")
+
+ def update(self):
+ """Background thread for reading frames"""
+ consecutive_failures = 0
+
+ while not self.stopped:
+ if self.cap is None or not self.cap.isOpened():
+ time.sleep(0.1)
+ continue
+
+ # Note: For OpenCV, grab() separates decoding from reading.
+ # For our MJPEGClient, grab() is a no-op returning True.
+ grabbed = self.cap.grab()
+ if not grabbed:
+ consecutive_failures += 1
+ if consecutive_failures >= config.MAX_FAILURES:
+ print("[WARNING] Too many consecutive failures, attempting to reconnect...")
+ try:
+ self.cap.release()
+ self.connect()
+ consecutive_failures = 0
+ self.connection_lost = False
+ except Exception as e:
+ print(f"[ERROR] Reconnection failed: {e}")
+ self.connection_lost = True
+ time.sleep(5) # Wait longer before next attempt
+ else:
+ time.sleep(0.01)
+ continue
+
+ ret, frame = self.cap.read()
+ if not ret or frame is None:
+ consecutive_failures += 1
+ time.sleep(0.01)
+ continue
+
+ # Reset failure counter on successful read
+ consecutive_failures = 0
+ self.connection_lost = False
+
+ with self.lock:
+ self.latest_frame = frame
+
+ def read(self):
+ """Get the latest frame"""
+ with self.lock:
+ return self.latest_frame.copy() if self.latest_frame is not None else None
+
+ def stop(self):
+ """Stop the reader and release resources"""
+ self.stopped = True
+ try:
+ if self.cap:
+ self.cap.release()
+ except Exception:
+ pass
diff --git a/templates/scripts/script_components.html b/templates/scripts/script_components.html
index 383bfeb..05f1ed2 100644
--- a/templates/scripts/script_components.html
+++ b/templates/scripts/script_components.html
@@ -1,732 +1,748 @@
-
+
+
-
-
\ No newline at end of file
+ }
+
+ // Auto-adjust quality based on performance
+ function autoAdjustQuality() {
+ if (performanceStats.frameDrops > 5) {
+ const currentQual = parseInt(qualitySlider.value);
+ if (currentQual > 30) {
+ qualitySlider.value = Math.max(30, currentQual - 10);
+ qualityValue.textContent = qualitySlider.value + '%';
+ applyQuality.click();
+ showNotification('Quality auto-adjusted due to performance', 'info');
+ performanceStats.frameDrops = 0;
+ }
+ }
+ }
+
+ // Check performance every 30 seconds
+ setInterval(autoAdjustQuality, 30000);
+
+ // Enhanced video stream error handling
+ videoStream.addEventListener('error', function (e) {
+ console.error('Video stream error:', e);
+ showNotification('Video stream error occurred', 'error');
+ });
+
+ videoStream.addEventListener('loadstart', function () {
+ console.log('Video stream loading started');
+ });
+
+ videoStream.addEventListener('loadeddata', function () {
+ console.log('Video stream data loaded');
+ });
+
+ // Keyboard shortcut hints
+ const shortcutHints = {
+ 's': 'Take Snapshot',
+ 'p': 'Pause/Resume',
+ 'f': 'Fullscreen',
+ 'q': 'Toggle Quality',
+ 'c': 'Clear Child Selection',
+ 'r': 'Recording Mode',
+ 'z': 'Sleep Detection',
+ 'h': 'Help'
+ };
+
+ // Show random shortcut hint occasionally
+ function showRandomShortcutHint() {
+ const shortcuts = Object.keys(shortcutHints);
+ const randomKey = shortcuts[Math.floor(Math.random() * shortcuts.length)];
+ const hint = shortcutHints[randomKey];
+ showNotification(`💡 Tip: Press "${randomKey.toUpperCase()}" to ${hint}`, 'info');
+ }
+
+ // Show hint every 5 minutes
+ setInterval(showRandomShortcutHint, 300000);
+
+ console.log('🚀 AI Baby Monitor initialized successfully!');
+ console.log('📹 Video stream ready');
+ console.log('🔌 WebSocket connection established');
+ console.log('⌨️ Press H for keyboard shortcuts');
+
+
\ No newline at end of file
diff --git a/tests/test_mjpeg_client.py b/tests/test_mjpeg_client.py
new file mode 100644
index 0000000..8fd1a8e
--- /dev/null
+++ b/tests/test_mjpeg_client.py
@@ -0,0 +1,77 @@
+import time
+import sys
+import os
+import cv2
+
+# Add project root to path
+sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
+
+try:
+ from services.streaming.video_reader import MJPEGClient
+except ImportError:
+ # Handle case where imports might fail if not fully set up
+ print("Could not import MJPEGClient. Ensure you are running from project root or have paths set.")
+ sys.exit(1)
+
+def test_mjpeg_client(url):
+ print(f"Testing MJPEG Client with URL: {url}")
+
+ client = None
+ try:
+ # Simple heuristic to choose client, similar to video_reader.py logic
+ if url.lower().startswith(('http://', 'https://')):
+ print("Using MJPEGClient (forcing for test)")
+ client = MJPEGClient(url, verify_ssl=False)
+ else:
+ print("Using OpenCV VideoCapture")
+ client = cv2.VideoCapture(url)
+ except Exception as e:
+ print(f"Failed to initialize client: {e}")
+ return
+
+ is_cv2 = isinstance(client, cv2.VideoCapture)
+
+ if is_cv2:
+ if not client.isOpened():
+ print("Failed to open stream (OpenCV)")
+ return
+ elif not client.isOpened():
+ print("Failed to open stream (MJPEGClient)")
+ return
+
+ print("Stream opened successfully. Reading frames...")
+
+ start_time = time.time()
+ frame_count = 0
+ errors = 0
+
+ try:
+ while time.time() - start_time < 10: # Read for 10 seconds
+ if is_cv2:
+ ret, frame = client.read()
+ else:
+ ret, frame = client.read()
+
+ if ret and frame is not None:
+ frame_count += 1
+ if frame_count % 10 == 0:
+ print(f"Read {frame_count} frames. Last frame shape: {frame.shape}")
+ else:
+ errors += 1
+ if errors % 10 == 0:
+ print(f"Failed to read frame (Total errors: {errors})")
+ time.sleep(0.01)
+
+ except KeyboardInterrupt:
+ pass
+
+ print(f"Test finished. Total frames: {frame_count}, Total errors: {errors}")
+ client.release()
+
+if __name__ == "__main__":
+ if len(sys.argv) < 2:
+ print("Usage: python3 test_mjpeg_client.py ")
+ sys.exit(1)
+
+ url = sys.argv[1]
+ test_mjpeg_client(url)
diff --git a/tests/test_websocket_client.py b/tests/test_websocket_client.py
new file mode 100644
index 0000000..3dc2d0e
--- /dev/null
+++ b/tests/test_websocket_client.py
@@ -0,0 +1,52 @@
+import socketio
+import time
+import sys
+
+# Create a Socket.IO client
+sio = socketio.Client()
+
+frame_count = 0
+start_time = 0
+
+@sio.event
+def connect():
+ print("Connected to WebSocket server")
+ # Join the streaming room? The server code shows:
+ # socket.emit('video_frame', ..., room='streaming_enabled')
+ # Use standard mechanisms if the server expects authentication or specific events.
+ # But usually joining a room is server-side logic based on auth.
+ # Let's see if we receive anything just by connecting.
+
+@sio.event
+def connect_error(data):
+ print(f"Connection failed: {data}")
+
+@sio.event
+def disconnect():
+ print("Disconnected from server")
+
+@sio.on('video_frame')
+def on_video_frame(data):
+ global frame_count, start_time
+ if frame_count == 0:
+ start_time = time.time()
+ frame_count += 1
+ if frame_count % 10 == 0:
+ elapsed = time.time() - start_time
+ fps = frame_count / elapsed
+ print(f"Received {frame_count} frames. Average FPS: {fps:.2f}")
+
+def main():
+ url = 'http://localhost:8847'
+ print(f"Connecting to {url}...")
+ try:
+ sio.connect(url, wait_timeout=10)
+ sio.wait()
+ except Exception as e:
+ print(f"Error: {e}")
+ except KeyboardInterrupt:
+ print("Stopping...")
+ sio.disconnect()
+
+if __name__ == '__main__':
+ main()