FastAPI + YOLOv8 实时监控系统搭建:5步搞定RTSP流目标检测

📅 发布时间:2026/7/3 23:13:14 👁️ 浏览次数:
FastAPI + YOLOv8 实时监控系统搭建:5步搞定RTSP流目标检测
FastAPI YOLOv8 实时监控系统搭建5步搞定RTSP流目标检测最近在做一个智慧园区的项目客户要求对几十路摄像头进行实时的人车识别和统计。一开始我们尝试用传统的轮询截图分析结果延迟高得离谱后台服务器动不动就卡死。后来我们转向了基于WebSocket的实时流处理方案用FastAPI搭后端YOLOv8做检测效果提升了好几个量级。今天我就把这个实战方案拆开揉碎了讲给你听从环境配置到性能调优手把手带你避开我们踩过的那些坑。这套方案特别适合安防监控、智慧交通、工业质检这些需要低延迟、高并发处理视频流的场景。如果你正在为实时视频分析头疼或者想把手头的监控摄像头升级成“智能眼”那这篇文章就是为你准备的。我们不只讲代码怎么写更会深入讨论架构设计、性能瓶颈和那些教科书上不会写的实战细节。1. 环境准备与依赖管理搭建任何AI项目环境都是第一道坎。我见过太多人因为环境问题折腾好几天最后代码还没跑起来热情就耗光了。所以咱们先从最基础也是最容易出错的环节开始。Python版本选择我强烈推荐使用Python 3.8到3.10之间的版本。Python 3.11虽然新但有些深度学习库的兼容性还不够稳定。我们团队在3.11上就遇到过OpenCV的某些扩展模块无法编译的问题最后还是退回到了3.9。创建虚拟环境是必须的这能保证你的项目依赖不会污染系统环境也方便后续部署。我习惯用venv因为它轻量且是Python标准库的一部分# 创建虚拟环境 python -m venv yolo_env # 激活环境Linux/Mac source yolo_env/bin/activate # 激活环境Windows yolo_env\Scripts\activate如果你用conda命令会稍有不同conda create -n yolo_env python3.9 conda activate yolo_env接下来是依赖安装。别急着pip install一堆包先创建一个requirements.txt文件把需要的库都列清楚。这是我们的项目依赖清单fastapi0.104.1 uvicorn[standard]0.24.0 opencv-python4.8.1.78 ultralytics8.0.196 pydantic2.5.0 websockets12.0 python-multipart0.0.6注意Ultralytics库更新非常频繁有时候新版本会引入不兼容的API变化。如果你在生产环境部署建议锁定一个稳定的版本号比如我们项目就固定在了8.0.196。安装时有个小技巧先安装OpenCV因为它依赖一些系统库如果先装其他包可能会遇到编译问题。在Ubuntu系统上你可能需要先安装这些系统依赖sudo apt-get update sudo apt-get install -y libgl1-mesa-glx libglib2.0-0 libsm6 libxext6 libxrender-dev安装完依赖后验证一下关键库是否正常工作import cv2 print(fOpenCV版本: {cv2.__version__}) from ultralytics import YOLO print(YOLO库导入成功) import fastapi print(fFastAPI版本: {fastapi.__version__})如果这些都能正常执行恭喜你环境搭建已经完成了80%。剩下的20%是模型文件准备。2. YOLOv8模型选择与优化策略YOLOv8提供了从nano到extra large五个不同尺寸的预训练模型每个模型在精度和速度上都有不同的权衡。选对模型你的系统性能可能差出好几倍。先看看各个模型的特点对比模型版本参数量百万推理速度RTX 3080mAP50-95适用场景YOLOv8n3.2~0.8ms37.3边缘设备、移动端YOLOv8s11.2~1.2ms44.9实时监控、中等算力YOLOv8m25.9~2.1ms50.2服务器部署、高精度需求YOLOv8l43.7~3.4ms52.9科研、最高精度要求YOLOv8x68.2~5.1ms53.9基准测试、不计成本从官方仓库下载模型很简单from ultralytics import YOLO # 自动下载并加载模型 model YOLO(yolov8n.pt) # nano版本 # 或者 model YOLO(yolov8s.pt) # small版本但这里有个坑如果你在服务器环境网络可能无法直接访问GitHub。这时候可以手动下载模型文件。我通常的做法是在本地开发机下载好然后通过scp传到服务器# 本地下载 wget https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n.pt wget https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8s.pt # 上传到服务器 scp yolov8n.pt useryour-server:/path/to/project/模型选择经验谈在智慧园区项目中我们最初用了YOLOv8m觉得精度够高。但实际部署后发现16路摄像头同时处理时GPU显存直接爆了。后来换成了YOLOv8s精度只下降了5%但处理速度提升了近一倍显存占用减少了40%。如果你的场景对实时性要求极高比如交通违章抓拍YOLOv8n可能是更好的选择。模型加载后还可以进行一些优化。YOLOv8支持半精度推理FP16这能显著减少显存占用并提升速度# 启用半精度推理 model YOLO(yolov8s.pt) model.fp16 True # 启用FP16不过要注意FP16在某些边缘设备上可能不被支持。我们在Jetson Nano上测试时就遇到了兼容性问题最后还是用FP32跑的。另一个优化点是类别过滤。如果你只关心特定类别的检测比如只检测人和车可以在推理时指定类别ID这能减少计算量# 只检测人和车COCO数据集中person0, car2 results model(frame, classes[0, 2])COCO数据集的80个类别中前几个常用类别的ID是0: person1: bicycle2: car3: motorcycle5: bus7: truck3. RTSP流处理架构设计RTSPReal Time Streaming Protocol是监控摄像头最常用的流媒体协议但它有个让人头疼的特点延迟和稳定性问题。传统的OpenCVVideoCapture读取RTSP流经常会遇到卡顿、丢帧、甚至连接断开的情况。我们先看看最基础的RTSP读取代码import cv2 rtsp_url rtsp://username:password192.168.1.100:554/stream1 cap cv2.VideoCapture(rtsp_url) while True: ret, frame cap.read() if not ret: print(读取帧失败尝试重连...) cap.release() cap cv2.VideoCapture(rtsp_url) continue # 处理帧...这种简单循环的问题很明显read()是阻塞调用如果网络波动导致读取超时整个程序就会卡住。而且每次重连都要重新建立RTSP握手耗时可能达到2-3秒。我们的解决方案是生产者-消费者模式 缓冲区管理。核心思想是让一个独立线程专门负责从RTSP拉流不断更新一个共享的帧缓冲区。处理线程从这个缓冲区读取最新帧这样即使拉流线程偶尔卡顿处理线程也能继续工作。import threading import time import cv2 from queue import Queue from dataclasses import dataclass from typing import Optional dataclass class FrameBuffer: 线程安全的帧缓冲区 latest_frame: Optional[np.ndarray] None frame_timestamp: float 0 _lock: threading.Lock threading.Lock() def update(self, frame: np.ndarray): 更新缓冲区中的最新帧 with self._lock: self.latest_frame frame.copy() if frame is not None else None self.frame_timestamp time.time() def get_latest(self) - Optional[np.ndarray]: 获取最新帧线程安全 with self._lock: return self.latest_frame.copy() if self.latest_frame is not None else None class RTSPStreamReader: RTSP流读取器生产者 def __init__(self, rtsp_url: str, buffer: FrameBuffer): self.rtsp_url rtsp_url self.buffer buffer self.running False self.thread: Optional[threading.Thread] None def start(self): 启动读取线程 self.running True self.thread threading.Thread(targetself._read_loop, daemonTrue) self.thread.start() def stop(self): 停止读取线程 self.running False if self.thread: self.thread.join(timeout5) def _read_loop(self): 读取循环 # 设置OpenCV的RTSP参数减少延迟 cap cv2.VideoCapture(self.rtsp_url) cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # 最小化缓冲区 cap.set(cv2.CAP_PROP_FPS, 25) # 设置期望帧率 # 设置TCP传输更稳定但延迟稍高 # 如果是UDP可以改为rtsp_url ?transportudp if ? not in self.rtsp_url: rtsp_url_with_params f{self.rtsp_url}?transporttcp else: rtsp_url_with_params f{self.rtsp_url}transporttcp reconnect_attempts 0 max_reconnect_attempts 5 while self.running: try: ret, frame cap.read() if not ret: reconnect_attempts 1 if reconnect_attempts max_reconnect_attempts: print(fRTSP连接失败尝试重新连接: {self.rtsp_url}) cap.release() time.sleep(2) # 等待后重试 cap cv2.VideoCapture(rtsp_url_with_params) reconnect_attempts 0 continue # 重置重连计数 reconnect_attempts 0 # 更新缓冲区 self.buffer.update(frame) # 控制读取频率避免CPU占用过高 time.sleep(0.01) # 10ms间隔 except Exception as e: print(f读取RTSP流时出错: {e}) time.sleep(1) # 清理资源 cap.release()这个设计有几个关键优化点TCP传输优先虽然UDP延迟更低但在不稳定的网络环境下容易丢包。我们项目在跨网段传输时UDP的丢包率能达到15%换成TCP后降到1%以下。缓冲区最小化cv2.CAP_PROP_BUFFERSIZE设置为1让OpenCV只缓冲一帧这样能减少延迟。但要注意这会让CPU占用率稍微升高。智能重连机制不是一失败就重连而是累计失败次数避免网络短暂波动导致的频繁重连。独立线程读取和处理分离即使处理逻辑复杂导致某帧处理时间过长也不会影响后续帧的读取。提示有些摄像头厂商的RTSP URL格式比较特殊。海康威视通常是rtsp://username:passwordip:554/Streaming/Channels/101大华则是rtsp://username:passwordip:554/cam/realmonitor?channel1subtype0。如果连接不上先确认URL格式是否正确。4. FastAPI WebSocket实时传输实现有了稳定的视频流读取接下来要解决如何把检测结果实时推送给客户端。HTTP轮询的方式延迟太高WebSocket才是实时传输的正确选择。FastAPI对WebSocket的支持非常友好但要做好生产级应用还需要考虑连接管理、心跳检测、错误处理等细节。我们先搭建基础的FastAPI应用结构from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware import uvicorn import json import time from typing import Dict, List import asyncio app FastAPI(title实时目标检测API, version1.0.0) # 允许跨域开发环境 app.add_middleware( CORSMiddleware, allow_origins[*], # 生产环境要改成具体的域名 allow_credentialsTrue, allow_methods[*], allow_headers[*], ) # 连接管理器 class ConnectionManager: def __init__(self): self.active_connections: Dict[str, WebSocket] {} self.connection_info: Dict[str, dict] {} # 存储连接信息 async def connect(self, websocket: WebSocket, client_id: str): await websocket.accept() self.active_connections[client_id] websocket self.connection_info[client_id] { connected_at: time.time(), last_heartbeat: time.time(), rtsp_url: None } print(f客户端 {client_id} 已连接) def disconnect(self, client_id: str): if client_id in self.active_connections: del self.active_connections[client_id] if client_id in self.connection_info: del self.connection_info[client_id] print(f客户端 {client_id} 已断开) async def send_json(self, client_id: str, data: dict): 发送JSON数据到指定客户端 if client_id in self.active_connections: try: await self.active_connections[client_id].send_json(data) except Exception as e: print(f发送数据到客户端 {client_id} 失败: {e}) self.disconnect(client_id) async def broadcast(self, data: dict): 广播数据到所有客户端 disconnected [] for client_id, websocket in self.active_connections.items(): try: await websocket.send_json(data) except Exception as e: print(f广播到客户端 {client_id} 失败: {e}) disconnected.append(client_id) # 清理断开连接的客户端 for client_id in disconnected: self.disconnect(client_id) manager ConnectionManager()现在实现WebSocket端点。这里要处理几个关键问题参数验证、帧率控制、心跳维持。from pydantic import BaseModel, Field from typing import Optional import uuid class DetectionConfig(BaseModel): 检测配置模型 rtsp_url: str Field(..., descriptionRTSP流地址) model_size: str Field(s, description模型尺寸: n/s/m/l/x) confidence: float Field(0.3, ge0.0, le1.0, description置信度阈值) target_classes: Optional[str] Field(None, description目标类别逗号分隔) fps: int Field(10, ge1, le30, description处理帧率) app.websocket(/ws/detect) async def websocket_detection(websocket: WebSocket): 实时目标检测WebSocket端点 client_id str(uuid.uuid4()) try: # 等待客户端发送配置 config_data await websocket.receive_json() config DetectionConfig(**config_data) # 连接到管理器 await manager.connect(websocket, client_id) manager.connection_info[client_id][rtsp_url] config.rtsp_url # 发送连接成功消息 await manager.send_json(client_id, { type: connection_established, client_id: client_id, timestamp: time.time() }) # 初始化RTSP读取器 buffer FrameBuffer() reader RTSPStreamReader(config.rtsp_url, buffer) reader.start() # 加载YOLO模型 model_map { n: yolov8n.pt, s: yolov8s.pt, m: yolov8m.pt, l: yolov8l.pt, x: yolov8x.pt } model_path model_map.get(config.model_size, yolov8s.pt) model YOLO(model_path) # 解析目标类别 class_ids None if config.target_classes: class_names config.target_classes.split(,) # 将类别名转换为ID这里需要根据你的数据集调整 # 假设使用COCO数据集 coco_classes [person, bicycle, car, motorcycle, airplane, bus, train, truck, boat, traffic light] class_ids [coco_classes.index(cls) for cls in class_names if cls in coco_classes] # 主处理循环 frame_interval 1.0 / config.fps last_frame_time time.time() heartbeat_interval 30 # 30秒心跳 last_heartbeat time.time() while True: current_time time.time() # 发送心跳 if current_time - last_heartbeat heartbeat_interval: await manager.send_json(client_id, { type: heartbeat, timestamp: current_time }) last_heartbeat current_time # 控制帧率 elapsed current_time - last_frame_time if elapsed frame_interval: await asyncio.sleep(frame_interval - elapsed) continue last_frame_time time.time() # 获取最新帧 frame buffer.get_latest() if frame is None: await manager.send_json(client_id, { type: status, message: 等待视频流..., timestamp: current_time }) await asyncio.sleep(0.1) continue # 执行检测 try: start_time time.perf_counter() results model(frame, confconfig.confidence, classesclass_ids, verboseFalse) inference_time time.perf_counter() - start_time # 绘制检测框 annotated_frame results[0].plot() # 统计检测结果 detections [] if results[0].boxes is not None: for box in results[0].boxes: xyxy box.xyxy[0].cpu().numpy() conf box.conf[0].cpu().numpy() cls_id int(box.cls[0].cpu().numpy()) detections.append({ class_id: cls_id, class_name: model.names[cls_id], confidence: float(conf), bbox: xyxy.tolist(), # [x1, y1, x2, y2] center: [(xyxy[0] xyxy[2]) / 2, (xyxy[1] xyxy[3]) / 2] }) # 编码图像为base64 _, buffer cv2.imencode(.jpg, annotated_frame, [cv2.IMWRITE_JPEG_QUALITY, 85]) frame_base64 base64.b64encode(buffer).decode(utf-8) # 发送结果 await manager.send_json(client_id, { type: detection_result, timestamp: current_time, inference_time: inference_time, detection_count: len(detections), detections: detections, image_base64: fdata:image/jpeg;base64,{frame_base64}, fps: 1.0 / (time.time() - current_time elapsed) }) except Exception as e: await manager.send_json(client_id, { type: error, message: f检测失败: {str(e)}, timestamp: time.time() }) await asyncio.sleep(1) except WebSocketDisconnect: print(f客户端 {client_id} 主动断开连接) except Exception as e: print(fWebSocket处理异常: {e}) finally: # 清理资源 if reader in locals(): reader.stop() manager.disconnect(client_id)这个WebSocket实现有几个值得注意的设计配置驱动客户端连接时先发送配置JSON这样同一个端点可以服务不同的检测任务。帧率控制通过计算帧间隔时间精确控制处理频率避免CPU过载。心跳机制定期发送心跳包保持连接活跃同时可以检测连接是否正常。错误隔离每个客户端的处理逻辑相互独立一个客户端出错不会影响其他连接。资源清理在finally块中确保RTSP读取器被正确停止避免资源泄漏。前端连接这个WebSocket也很简单// 前端JavaScript代码 const ws new WebSocket(ws://localhost:8000/ws/detect); // 发送配置 ws.onopen () { ws.send(JSON.stringify({ rtsp_url: rtsp://your-camera-ip/stream, model_size: s, confidence: 0.5, target_classes: person,car, fps: 15 })); }; // 接收结果 ws.onmessage (event) { const data JSON.parse(event.data); switch(data.type) { case detection_result: // 更新页面显示 document.getElementById(video).src data.image_base64; document.getElementById(stats).innerHTML 检测到 ${data.detection_count} 个目标推理时间: ${data.inference_time.toFixed(3)}s; break; case heartbeat: console.log(心跳正常); break; case error: console.error(错误:, data.message); break; } };5. 性能优化与生产部署系统能跑起来只是第一步要真正在生产环境稳定运行还需要做大量的优化工作。我们项目从第一版到最终上线性能提升了近10倍这里分享几个最有效的优化技巧。5.1 多路视频流并发处理单个摄像头处理很简单但实际项目往往是几十甚至上百路摄像头。这时候就需要并发处理能力。import concurrent.futures from typing import List import numpy as np class MultiStreamProcessor: 多路视频流处理器 def __init__(self, max_workers: int 4): # 使用线程池处理多路流 self.executor concurrent.futures.ThreadPoolExecutor( max_workersmax_workers ) self.streams: Dict[str, dict] {} def add_stream(self, stream_id: str, rtsp_url: str, config: dict): 添加视频流 self.streams[stream_id] { rtsp_url: rtsp_url, config: config, buffer: FrameBuffer(), reader: None, active: False } def start_stream(self, stream_id: str): 启动指定视频流处理 if stream_id not in self.streams: return False stream self.streams[stream_id] # 启动RTSP读取器 stream[reader] RTSPStreamReader( stream[rtsp_url], stream[buffer] ) stream[reader].start() stream[active] True # 提交处理任务到线程池 future self.executor.submit( self._process_stream, stream_id, stream[config] ) stream[future] future return True def _process_stream(self, stream_id: str, config: dict): 处理单个视频流在工作线程中运行 stream self.streams.get(stream_id) if not stream: return # 加载模型每个线程有自己的模型实例 model YOLO(config.get(model_path, yolov8s.pt)) frame_interval 1.0 / config.get(fps, 10) while stream.get(active, False): start_time time.time() frame stream[buffer].get_latest() if frame is None: time.sleep(0.01) continue # 执行检测 results model(frame, confconfig.get(confidence, 0.3)) # 这里可以添加结果处理逻辑比如保存到数据库、触发告警等 # 控制处理频率 elapsed time.time() - start_time if elapsed frame_interval: time.sleep(frame_interval - elapsed) def stop_stream(self, stream_id: str): 停止指定视频流 if stream_id in self.streams: stream self.streams[stream_id] stream[active] False if stream[reader]: stream[reader].stop() if future in stream: stream[future].cancel()这个多流处理器的关键设计线程池管理避免为每个流创建新线程的开销独立模型实例每个处理线程有自己的YOLO模型避免GIL竞争优雅停止通过active标志控制循环退出确保资源正确释放5.2 GPU内存优化当同时处理多路视频时GPU内存很容易成为瓶颈。我们通过几个策略优化内存使用策略一动态批处理class DynamicBatchProcessor: 动态批处理器 def __init__(self, batch_size: int 4): self.batch_size batch_size self.frame_buffer [] self.result_buffer {} def add_frame(self, stream_id: str, frame: np.ndarray): 添加帧到批处理队列 self.frame_buffer.append({ stream_id: stream_id, frame: frame, timestamp: time.time() }) # 达到批处理大小时执行检测 if len(self.frame_buffer) self.batch_size: self._process_batch() def _process_batch(self): 批量处理帧 if not self.frame_buffer: return # 准备批处理数据 frames [item[frame] for item in self.frame_buffer] stream_ids [item[stream_id] for item in self.frame_buffer] # 批量推理YOLOv8支持批量推理 results model(frames, conf0.3) # 分发结果 for i, result in enumerate(results): stream_id stream_ids[i] self.result_buffer[stream_id] { result: result, timestamp: time.time() } # 清空缓冲区 self.frame_buffer.clear()策略二帧采样与跳帧不是每一帧都需要检测。对于移动缓慢的场景可以每2-3帧检测一次class FrameSampler: 帧采样器 def __init__(self, sample_rate: int 2): self.sample_rate sample_rate # 每N帧采样1帧 self.frame_count 0 def should_process(self) - bool: 判断当前帧是否需要处理 self.frame_count 1 if self.frame_count % self.sample_rate 0: self.frame_count 0 return True return False策略三分辨率调整降低输入分辨率能显著减少计算量def resize_frame(frame: np.ndarray, target_size: tuple (640, 640)): 调整帧尺寸 height, width frame.shape[:2] # 保持宽高比 scale min(target_size[0] / width, target_size[1] / height) new_width int(width * scale) new_height int(height * scale) resized cv2.resize(frame, (new_width, new_height)) # 填充到目标尺寸 delta_w target_size[0] - new_width delta_h target_size[1] - new_height top, bottom delta_h // 2, delta_h - (delta_h // 2) left, right delta_w // 2, delta_w - (delta_w // 2) padded cv2.copyMakeBorder( resized, top, bottom, left, right, cv2.BORDER_CONSTANT, value(114, 114, 114) ) return padded5.3 监控与日志系统生产环境必须有完善的监控。我们使用Prometheus Grafana监控系统状态from prometheus_client import Counter, Gauge, Histogram, start_http_server import psutil # 定义监控指标 FRAME_PROCESSED Counter(frames_processed_total, Total frames processed) DETECTION_COUNT Gauge(detections_current, Current detection count) PROCESSING_TIME Histogram(processing_time_seconds, Frame processing time) GPU_MEMORY_USAGE Gauge(gpu_memory_usage_mb, GPU memory usage in MB) ACTIVE_CONNECTIONS Gauge(active_connections, Active WebSocket connections) class SystemMonitor: 系统监控器 def __init__(self, prometheus_port: int 9090): self.prometheus_port prometheus_port def start(self): 启动监控服务 start_http_server(self.prometheus_port) print(f监控服务启动在端口 {self.prometheus_port}) def update_metrics(self, frames_processed: int 0, detections: int 0, processing_time: float 0): 更新监控指标 if frames_processed: FRAME_PROCESSED.inc(frames_processed) if detections 0: DETECTION_COUNT.set(detections) if processing_time 0: PROCESSING_TIME.observe(processing_time) # 更新系统指标 self._update_system_metrics() def _update_system_metrics(self): 更新系统级指标 # CPU使用率 cpu_percent psutil.cpu_percent(interval1) # 内存使用 memory psutil.virtual_memory() # 活跃连接数需要从连接管理器获取 # ACTIVE_CONNECTIONS.set(len(manager.active_connections)) # 如果有GPU监控GPU使用情况 try: import pynvml pynvml.nvmlInit() handle pynvml.nvmlDeviceGetHandleByIndex(0) gpu_memory pynvml.nvmlDeviceGetMemoryInfo(handle) GPU_MEMORY_USAGE.set(gpu_memory.used // 1024 // 1024) # MB except ImportError: pass # 没有GPU或pynvml未安装5.4 Docker容器化部署最后用Docker打包整个应用确保环境一致性# Dockerfile FROM nvidia/cuda:11.8.0-runtime-ubuntu22.04 # 设置环境变量 ENV DEBIAN_FRONTENDnoninteractive ENV PYTHONUNBUFFERED1 # 安装系统依赖 RUN apt-get update apt-get install -y \ python3.9 \ python3-pip \ libgl1-mesa-glx \ libglib2.0-0 \ libsm6 \ libxext6 \ libxrender-dev \ ffmpeg \ rm -rf /var/lib/apt/lists/* # 设置工作目录 WORKDIR /app # 复制依赖文件 COPY requirements.txt . # 安装Python依赖 RUN pip3 install --no-cache-dir -r requirements.txt # 复制应用代码 COPY . . # 下载模型文件可以在构建时下载或运行时下载 RUN python3 -c from ultralytics import YOLO; YOLO(yolov8s.pt) # 暴露端口 EXPOSE 8000 # 启动命令 CMD [uvicorn, main:app, --host, 0.0.0.0, --port, 8000, --workers, 4]对应的docker-compose.ymlversion: 3.8 services: detection-api: build: . ports: - 8000:8000 environment: - NVIDIA_VISIBLE_DEVICESall deploy: resources: reservations: devices: - driver: nvidia count: 1 capabilities: [gpu] volumes: - ./models:/app/models # 挂载模型目录 - ./logs:/app/logs # 挂载日志目录 restart: unless-stopped prometheus: image: prom/prometheus:latest ports: - 9090:9090 volumes: - ./prometheus.yml:/etc/prometheus/prometheus.yml - prometheus_data:/prometheus grafana: image: grafana/grafana:latest ports: - 3000:3000 volumes: - grafana_data:/var/lib/grafana environment: - GF_SECURITY_ADMIN_PASSWORDadmin volumes: prometheus_data: grafana_data:部署时还需要考虑的几个实际问题网络配置确保Docker容器能访问摄像头所在的网络模型热更新支持不重启服务更新模型配置管理使用环境变量或配置文件管理不同环境的参数日志收集集成ELK或Loki收集和分析日志自动扩缩容根据负载动态调整实例数量我们在实际部署中发现用Kubernetes管理多个检测节点比单机部署更灵活。通过Horizontal Pod Autoscaler可以根据CPU使用率自动扩缩容白天摄像头负载高时自动扩容夜间负载低时自动缩容能节省40%的云资源成本。整个系统上线后我们监控到单路1080p视频流的端到端延迟可以控制在200ms以内单台RTX 3080服务器能同时处理16路视频流使用YOLOv8s模型。最重要的是系统运行稳定连续30天无故障运行时间达到99.95%完全满足了生产环境的要求。