【蓝牙】一文掌握Bluez BLE数据交互:Python实战GATT通信

📅 发布时间:2026/7/4 18:09:29 👁️ 浏览次数:
【蓝牙】一文掌握Bluez BLE数据交互:Python实战GATT通信
1. 从零开始搭建你的Python BLE通信环境如果你对物联网开发感兴趣或者想自己动手做一个能和手机App“对话”的小硬件那么蓝牙低功耗BLE绝对是一个绕不开的技术。它省电、连接快非常适合像智能手环、温湿度计这类需要长时间运行并间歇性传输数据的设备。今天我就带你用Python和Linux下的Bluez蓝牙协议栈亲手搭建一个完整的BLE数据双向通信实例。整个过程就像搭积木我会把每一步都拆开揉碎了讲保证你跟着做就能跑起来。我们今天的实战目标是用一台运行Linux的电脑比如树莓派或者装了Ubuntu的笔记本模拟一个“心率传感器”然后用手机上的nRF Connect这个App作为“接收端”实现两者之间的实时数据收发。听起来是不是挺有意思别担心哪怕你之前没怎么接触过蓝牙开发只要会一点Python基础就能轻松上手。我当年第一次搞这个的时候也是查了无数资料踩了不少坑今天就把这些经验都分享给你让你少走弯路。首先你得准备好战场。最核心的就是Bluez。它不是什么新潮的框架而是Linux系统里老牌的、官方的蓝牙协议栈我们用的蓝牙命令行工具bluetoothctl就是它的一部分。我们的Python程序最终就是要通过D-Bus这个“中间人”去和Bluez打交道告诉它“嗨我要创建一个服务里面有个特征值可以收发数据”。所以第一步就是确保你的系统里有正确版本的Bluez以及Python的D-Bus绑定库。打开你的终端我们一步步来。如果你的系统是Ubuntu或者Debian系的安装依赖非常方便sudo apt update sudo apt install bluez bluez-tools libbluetooth-dev libdbus-1-dev python3-dev python3-pip这里bluez是协议栈本身bluez-tools包含了一些有用的工具libbluetooth-dev是开发头文件libdbus-1-dev是D-Bus的开发库。装好系统包之后再来安装Python的库pip3 install dbus-python pygobject这里有个小坑我踩过dbus-python这个库有时候通过pip安装可能会遇到编译问题。如果安装不顺利可以试试先用系统包管理器安装sudo apt install python3-dbus。确保安装完成后在Python里能import dbus和import dbus.service不报错我们的基础环境就差不多了。接下来我们需要确认Bluez的版本并且让它跑起来。在终端输入bluetoothd --version看看版本。我推荐使用5.50以上的版本对BLE的支持比较完善。然后启动蓝牙服务sudo systemctl start bluetooth sudo systemctl enable bluetooth为了让我们的Python程序能无障碍地和Bluez通信最好把当前用户加入到bluetooth组这样就不用每次都sudo了sudo usermod -a -G bluetooth $USER执行完这个命令后记得要注销当前用户再重新登录或者重启一下电脑这个组权限变更才会生效。不然你可能还是会遇到权限拒绝的错误。环境准备好之后我们就可以进入最核心的部分了理解GATT通信的模型并开始动手写代码。2. 庖丁解牛理解GATT通信的核心模型在真正写代码之前我们得先搞清楚BLE设备之间是怎么“说话”的。这就像两个人打电话得先有个电话号码服务然后决定是听Notify还是说Write。BLE的通信模型基于GATT通用属性协议它定义了一种结构化的数据交换方式。你可以把一个BLE设备我们称为“服务器”或“外围设备”想象成一个提供服务的超市。这个超市设备里有好几个服务每个服务就像超市里的一个商品分区比如“生鲜区”、“日用百货区”。每个服务都有一个唯一的UUID来标识。在我们的例子里我们会创建一个“心率服务”它的标准UUID是0000180d-0000-1000-8000-00805f9b34fb。每个服务分区里又有好几个特征值。特征值才是真正存放数据、进行读写操作的地方它就像分区里的具体商品比如“苹果”、“香蕉”。一个特征值同样有唯一的UUID。例如“心率测量”特征值的UUID是00002a37-0000-1000-8000-00805f9b34fb。每个特征值有一组属性这决定了你能对它做什么操作比如read手机可以读取这个值。write手机可以写入数据到这个值。notify设备可以主动向手机“通知”数据手机需要先订阅。indicate和notify类似但需要手机确认收到。我们的实战将聚焦于notify和write这两个最常用的操作实现双向通信。那么手机客户端和我们模拟的设备服务器具体怎么互动呢关键在于三个动作订阅通知、发送通知、处理写入。当手机上的nRF Connect连接上我们的设备并点开那个特征值详情时如果它支持notifyApp上就会显示一个“Enable Notifications”的开关。用户点击这个开关就触发了一个“订阅”动作。这个动作通过D-Bus传给我们Python程序里的StartNotify()方法。我们在StartNotify()里设置一个标志位比如self.notifying True然后就可以启动一个定时器或者循环不断地通过PropertiesChanged信号把模拟的心率数据“推”给手机。反过来当手机想发送一个指令给设备比如设置报警阈值它就会向特征值执行一个“Write”操作。这个操作会触发我们Python程序里定义的WriteValue()方法。我们在这个方法里就能接收到手机发来的原始字节数据然后解析它执行相应的逻辑。整个过程中Bluez和D-Bus扮演了“翻译官”和“邮差”的角色。我们的Python程序定义好服务框架超市的货架Bluez负责把这个框架通过蓝牙广播出去。手机连接后Bluez把手机的请求翻译成D-Bus方法调用比如调用我们的StartNotify又把我们通过D-Bus信号发出的数据PropertiesChanged翻译成蓝牙协议包发给手机。理解了这个数据流写代码的时候就不会迷糊了。接下来我们就进入实战环节从创建一个最基础的服务骨架开始。3. 实战第一步用Python注册你的第一个GATT服务现在我们开始动手写代码。我不会让你从头造轮子Bluez源码里其实藏着一个非常好的例子。我们到Bluez的源码目录下找到test/example-gatt-server.py这个文件。如果你没有下载源码也没关系我会把核心部分摘出来并详细解释。我们基于这个官方例子进行改造让它更清晰更适合我们“心率传感器”的场景。首先创建一个新的Python文件比如叫my_ble_server.py。开头我们需要导入必要的模块import dbus import dbus.service import dbus.mainloop.glib from gi.repository import GLib import random import struct import time # 使用GLib作为D-Bus的主事件循环 dbus.mainloop.glib.DBusGMainLoop(set_as_defaultTrue) # 定义一些Bluez D-Bus接口的常量直接抄过来就行 BUS_NAME org.bluez ADAPTER_INTERFACE org.bluez.Adapter1 DEVICE_INTERFACE org.bluez.Device1 GATT_MANAGER_IFACE org.bluez.GattManager1 GATT_SERVICE_IFACE org.bluez.GattService1 GATT_CHRC_IFACE org.bluez.GattCharacteristic1 GATT_DESC_IFACE org.bluez.GattDescriptor1 # 对象路径的前缀可以自定义 OBJECT_PATH_BASE /com/example/myble接下来我们要定义三个核心类它们对应GATT模型的三个层级服务、特征值、描述符。我们先从最基础的Descriptor和Characteristic类开始这些类继承自dbus.service.Object并使用dbus.service.method或dbus.service.signal装饰器来暴露D-Bus方法或信号。我们先看特征值类这是数据交互的核心。我们创建一个HeartRateMeasurementChrc类它代表心率测量特征class HeartRateMeasurementChrc(dbus.service.Object): 心率测量特征值属性为 notify用于设备主动向手机发送心率数据。 HR_MSRMT_UUID 00002a37-0000-1000-8000-00805f9b34fb def __init__(self, bus, index, service): # 父类初始化需要提供对象路径 self.path service.path /hr_msrmt_ str(index) super().__init__(bus, self.path) # 特征值的UUID和属性 self.uuid self.HR_MSRMT_UUID self.service service self.notifying False # 标记是否正在通知 self.hr_value 60 # 初始心率值 # 特征值的属性列表这里我们只定义notify self.properties { GATT_CHRC_IFACE: { Service: self.service.path, UUID: self.uuid, Flags: [notify], # 关键声明此特征支持通知 } } def get_properties(self): return dbus.Dictionary(self.properties, signaturesv) def get_path(self): return dbus.ObjectPath(self.path) dbus.service.method(GATT_CHRC_IFACE, in_signature, out_signature) def StartNotify(self): 手机端启用通知时Bluez会调用此方法 if self.notifying: print([特征值] 已经在发送通知忽略请求) return print([特征值] 收到StartNotify请求开始模拟发送心率数据) self.notifying True # 启动一个定时模拟任务 self._simulate_heart_rate() dbus.service.method(GATT_CHRC_IFACE, in_signature, out_signature) def StopNotify(self): 手机端禁用通知时Bluez会调用此方法 if not self.notifying: print([特征值] 未在发送通知忽略请求) return print([特征值] 收到StopNotify请求停止发送数据) self.notifying False def _simulate_heart_rate(self): 模拟心率变化并通过PropertiesChanged信号发送数据 if not self.notifying: return # 模拟心率在60-100之间随机波动 self.hr_value random.randint(60, 100) print(f[特征值] 模拟心率值: {self.hr_value} bpm) # 构建发送的数据包。BLE心率测量数据有固定格式第一个字节是标志位。 # 这里我们简单模拟标志位0x01表示8位心率值后跟心率值。 flags 0x01 value [dbus.Byte(flags), dbus.Byte(self.hr_value)] # 最关键的一步发出PropertiesChanged信号Bluez会将其转为BLE通知发给手机 self.PropertiesChanged(GATT_CHRC_IFACE, {Value: value}, []) # 每隔2秒发送一次 GLib.timeout_add_seconds(2, self._simulate_heart_rate)代码有点长我解释几个关键点。StartNotify和StopNotify这两个方法是被dbus.service.method装饰的这意味着它们会被暴露为D-Bus方法。当手机App点击“启用通知”时Bluez就会通过D-Bus远程调用我们的StartNotify()。我们在里面把self.notifying设为True并启动一个定时函数_simulate_heart_rate。在_simulate_heart_rate函数里我们生成了一个模拟的心率值然后把它包装成一个字节列表value。最后调用self.PropertiesChanged方法。这个方法是父类dbus.service.Object提供的用于发射一个D-Bus信号。这个信号会被Bluez捕获Bluez就知道“哦这个特征值有新数据了”然后它自动把数据通过BLE协议打包发送给已经订阅了通知的手机客户端。这就是设备主动发数据的完整流程。有了特征值我们还需要一个服务来容纳它。创建一个HeartRateService类class HeartRateService(dbus.service.Object): 心率服务包含一个心率测量特征值。 HR_SVC_UUID 0000180d-0000-1000-8000-00805f9b34fb def __init__(self, bus, index): self.path OBJECT_PATH_BASE /service str(index) super().__init__(bus, self.path) self.uuid self.HR_SVC_UUID self.characteristics [] self.add_characteristic(HeartRateMeasurementChrc(bus, 0, self)) def get_properties(self): return { GATT_SERVICE_IFACE: { UUID: self.uuid, Primary: True, Characteristics: dbus.Array( self.get_characteristic_paths(), signatureo ) } } def get_path(self): return dbus.ObjectPath(self.path) def add_characteristic(self, characteristic): self.characteristics.append(characteristic) def get_characteristic_paths(self): return [c.get_path() for c in self.characteristics]服务类相对简单主要管理它包含的特征值列表。最后我们需要一个应用类来注册这个服务到系统的GATT管理器并启动事件循环class Application(dbus.service.Object): def __init__(self, bus): self.path OBJECT_PATH_BASE super().__init__(bus, self.path) self.services [] self.add_service(HeartRateService(bus, 0)) def get_path(self): return dbus.ObjectPath(self.path) def add_service(self, service): self.services.append(service) dbus.service.method(dbus.PROPERTIES_IFACE, in_signatures, out_signaturea{sv}) def GetAll(self, interface): if interface org.freedesktop.DBus.ObjectManager: # 返回所有对象服务和特征值的路径 objects {} for s in self.services: objects[s.path] s.get_properties() for c in s.characteristics: objects[c.path] c.get_properties() return objects else: raise dbus.exceptions.DBusException( org.freedesktop.DBus.UnknownInterface, f此对象不支持接口: {interface} ) def register_app(): bus dbus.SystemBus() adapter_path /org/bluez/hci0 # 默认蓝牙适配器 adapter dbus.Interface(bus.get_object(BUS_NAME, adapter_path), GATT_MANAGER_IFACE) app Application(bus) app_path app.get_path() print(f正在注册GATT应用路径: {app_path}) adapter.RegisterApplication(app_path, {}) print(GATT应用注册成功) return app if __name__ __main__: app register_app() print(BLE心率服务器已启动。请用手机蓝牙扫描并连接。) print(在nRF Connect中找到心率服务(180D)和特征值(2A37)启用通知。) loop GLib.MainLoop() try: loop.run() except KeyboardInterrupt: print(\n服务器关闭。) loop.quit()Application类实现了GetAll方法这是为了满足D-Bus对象管理器的要求让Bluez能发现我们创建的所有服务和特征值。register_app函数是注册的入口它获取系统总线找到蓝牙适配器然后调用适配器的RegisterApplication方法把我们整个应用对象树注册进去。最后启动一个GLib事件循环让程序保持运行等待D-Bus的调用和发送信号。现在你可以运行这个程序了python3 my_ble_server.py。如果一切正常你会看到注册成功的提示。接下来就该请出我们的测试伙伴——手机App了。4. 双向通信闭环处理手机写入与数据流观测我们的服务器已经能主动发送心率数据了但这只是单向通信。一个完整的交互还需要能接收来自手机的指令。比如手机App想设置一个心率报警上限或者发送一个校准命令。这就需要用上特征值的Write属性。让我们升级一下之前的心率测量特征值。我们新增一个特征值专门用于接收手机的写入数据。同时为了更直观地观测整个数据流我还会介绍两个非常实用的调试工具bluetoothctl和dbus-monitor。首先在HeartRateService里增加一个新的特征值类ControlPointChrc这个特征值具有write属性class ControlPointChrc(dbus.service.Object): 控制点特征值属性为 write用于接收手机发送的指令。 CTRL_PT_UUID 00002a39-0000-1000-8000-00805f9b34fb # 示例UUID可自定义 def __init__(self, bus, index, service): self.path service.path /ctrl_pt_ str(index) super().__init__(bus, self.path) self.uuid self.CTRL_PT_UUID self.service service self.properties { GATT_CHRC_IFACE: { Service: self.service.path, UUID: self.uuid, Flags: [write], # 关键声明此特征支持写入 } } def get_properties(self): return dbus.Dictionary(self.properties, signaturesv) def get_path(self): return dbus.ObjectPath(self.path) dbus.service.method(GATT_CHRC_IFACE, in_signatureaya{sv}, out_signature) def WriteValue(self, value, options): 手机端写入数据时Bluez会调用此方法。 value: 一个字节数组dbus.Array of dbus.Byte即手机发来的原始数据。 options: 一个字典可能包含写入类型如request或command等信息。 print(f[控制点] 收到写入请求) print(f 原始数据(bytes): {list(value)}) print(f 写入选项: {options}) # 尝试将字节数据解码为字符串假设手机发送的是文本指令 try: # 将dbus.Byte数组转换为普通bytes byte_list [bytes([v]) for v in value] data_bytes b.join(byte_list) command data_bytes.decode(utf-8).strip() print(f 解码指令: {command}) # 根据指令执行不同操作 if command.lower() get_log: print( 执行获取设备日志) # 这里可以触发一个通知将日志发回手机 elif command.lower() reset: print( 执行重置设备参数) # 这里可以重置内部状态 elif command.startswith(set_alarm:): _, threshold command.split(:) print(f 执行设置报警阈值为 {threshold}) # 这里可以更新报警阈值变量 else: print(f 未知指令: {command}) except Exception as e: print(f 解析指令时出错: {e}) # 即使出错也最好返回一个错误响应通过另一个特征值通知回去重点看WriteValue方法。它的参数value就是手机App发过来的原始字节数据。我们在这里面可以自由地解析这些字节把它转换成数字、字符串或者任何我们约定的格式然后执行相应的业务逻辑。这样双向通信的闭环就完成了设备通过Notify主动上报数据手机通过Write下发控制命令。代码写好了我们怎么验证它是否正常工作呢光靠打印日志还不够直观。这里我分享两个我调试时最爱用的“神器”。第一个是bluetoothctl。在另一个终端窗口运行它你可以用它来管理蓝牙、查看服务甚至手动读写特征值。bluetoothctl [bluetooth]# power on [bluetooth]# scan on ... (这里会扫描到你的设备记下它的MAC地址比如 AA:BB:CC:DD:EE:FF) [bluetooth]# scan off [bluetooth]# connect AA:BB:CC:DD:EE:FF [bluetooth]# menu gatt [bluetooth]# list-attributeslist-attributes会列出设备的所有服务和特征值路径。你可以用select-attribute 路径选择一个特征值然后用read或write命令手动操作它这对于测试Write功能非常方便。第二个是dbus-monitor。它是观察D-Bus通信的“显微镜”。我们的Python程序和Bluez的所有交互都是通过D-Bus进行的。打开一个终端运行dbus-monitor --system interfaceorg.bluez.GattCharacteristic1这个命令会过滤并显示所有与GATT特征值相关的D-Bus信号和方法调用。当你用手机App点击“Enable Notifications”时你会在终端里看到StartNotify方法被调用的记录。当你的程序调用PropertiesChanged发送数据时你也能看到对应的信号发出。这能让你清晰地确认数据流是否按照预期在D-Bus层流动。把新增的控制点特征值添加到服务中然后重新运行服务器。用手机nRF Connect连接后你应该能看到两个特征值一个用于订阅通知心率数据一个用于写入控制点。尝试在控制点特征值里写入一段字符串比如“get_log”看看服务器的控制台是否打印出了相应的解析信息。同时观察心率测量特征值是否每隔2秒就有新的通知数据发出。通过这一节的实践你已经构建了一个具备完整双向通信能力的BLE服务器框架。从环境搭建、模型理解、服务注册到双向数据流处理和调试我们走完了一个完整的开发闭环。这个框架虽然模拟的是心率传感器但其骨架完全适用于智能开关、环境传感器、防丢器等任何BLE设备。你可以在此基础上增加更多的服务和特征值定义更复杂的数据协议把它变成你想象中的物联网设备。