colmi_r02_client.client
1import asyncio 2from collections.abc import Callable 3from datetime import datetime, timezone 4import logging 5from pathlib import Path 6from types import TracebackType 7from typing import Any 8 9from bleak import BleakClient 10from bleak.backends.characteristic import BleakGATTCharacteristic 11 12from colmi_r02_client import ( 13 battery, 14 real_time_hr, 15 steps, 16 set_time, 17 blink_twice, 18 hr, 19 hr_settings, 20 packet, 21 reboot, 22) 23 24UART_SERVICE_UUID = "6E40FFF0-B5A3-F393-E0A9-E50E24DCCA9E" 25UART_RX_CHAR_UUID = "6E400002-B5A3-F393-E0A9-E50E24DCCA9E" 26UART_TX_CHAR_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E" 27 28DEVICE_INFO_UUID = "0000180A-0000-1000-8000-00805F9B34FB" 29DEVICE_HW_UUID = "00002A27-0000-1000-8000-00805F9B34FB" 30DEVICE_FW_UUID = "00002A26-0000-1000-8000-00805F9B34FB" 31 32logger = logging.getLogger(__name__) 33 34 35def empty_parse(_packet: bytearray) -> None: 36 """Used for commands that we expect a response, but there's nothing in the response""" 37 return None 38 39 40def log_packet(packet: bytearray) -> None: 41 print("received: ", packet) 42 43 44COMMAND_HANDLERS: dict[int, Callable[[bytearray], Any]] = { 45 battery.CMD_BATTERY: battery.parse_battery, 46 real_time_hr.CMD_START_HEART_RATE: real_time_hr.parse_heart_rate, 47 real_time_hr.CMD_STOP_HEART_RATE: empty_parse, 48 steps.CMD_GET_STEP_SOMEDAY: steps.SportDetailParser().parse, 49 hr.CMD_READ_HEART_RATE: hr.HeartRateLogParser().parse, 50 set_time.CMD_SET_TIME: empty_parse, 51 hr_settings.CMD_HEART_RATE_LOG_SETTINGS: hr_settings.parse_heart_rate_log_settings, 52} 53""" 54TODO put these somewhere nice 55 56These are commands that we expect to have a response returned for 57they must accept a packet as bytearray and then return a value to be put 58in the queue for that command type 59NOTE: if the value returned is None, it is not added to the queue, this is to support 60multi packet messages where the parser has state 61""" 62 63 64class Client: 65 def __init__(self, address: str, record_to: Path | None = None): 66 self.address = address 67 self.bleak_client = BleakClient(self.address) 68 self.queues: dict[int, asyncio.Queue] = {cmd: asyncio.Queue() for cmd in COMMAND_HANDLERS} 69 self.record_to = record_to 70 71 async def __aenter__(self) -> "Client": 72 logger.info(f"Connecting to {self.address}") 73 await self.connect() 74 logger.info("Connected!") 75 return self 76 77 async def __aexit__( 78 self, 79 exc_type: type[BaseException] | None, 80 exc_val: BaseException | None, 81 exc_tb: TracebackType | None, 82 ) -> None: 83 await self.disconnect() 84 85 async def connect(self): 86 await self.bleak_client.connect() 87 88 nrf_uart_service = self.bleak_client.services.get_service(UART_SERVICE_UUID) 89 assert nrf_uart_service 90 rx_char = nrf_uart_service.get_characteristic(UART_RX_CHAR_UUID) 91 assert rx_char 92 self.rx_char = rx_char 93 94 await self.bleak_client.start_notify(UART_TX_CHAR_UUID, self._handle_tx) 95 96 async def disconnect(self): 97 await self.bleak_client.disconnect() 98 99 def _handle_tx(self, _: BleakGATTCharacteristic, packet: bytearray) -> None: 100 """Bleak callback that handles new packets from the ring.""" 101 102 logger.info(f"Received packet {packet}") 103 104 assert len(packet) == 16, f"Packet is the wrong length {packet}" 105 packet_type = packet[0] 106 assert packet_type < 127, f"Packet has error bit set {packet}" 107 108 if packet_type in COMMAND_HANDLERS: 109 result = COMMAND_HANDLERS[packet_type](packet) 110 if result is not None: 111 self.queues[packet_type].put_nowait(result) 112 else: 113 logger.debug(f"No result returned from parser for {packet_type}") 114 else: 115 logger.warning(f"Did not expect this packet: {packet}") 116 117 if self.record_to is not None: 118 with self.record_to.open("ab") as f: 119 f.write(packet) 120 f.write(b"\n") 121 122 async def send_packet(self, packet: bytearray) -> None: 123 logger.debug(f"Sending packet: {packet}") 124 await self.bleak_client.write_gatt_char(self.rx_char, packet, response=False) 125 126 async def get_battery(self) -> battery.BatteryInfo: 127 await self.send_packet(battery.BATTERY_PACKET) 128 result = await self.queues[battery.CMD_BATTERY].get() 129 assert isinstance(result, battery.BatteryInfo) 130 return result 131 132 async def get_realtime_heart_rate(self) -> list[int] | None: 133 return await self._poll_real_time_reading(real_time_hr.START_HEART_RATE_PACKET) 134 135 async def _poll_real_time_reading(self, start_packet: bytearray) -> list[int] | None: 136 await self.send_packet(start_packet) 137 138 valid_readings: list[int] = [] 139 error = False 140 tries = 0 141 while len(valid_readings) < 6 and tries < 20: 142 try: 143 data: real_time_hr.Reading | real_time_hr.ReadingError = await asyncio.wait_for( 144 self.queues[real_time_hr.CMD_START_HEART_RATE].get(), 145 timeout=2, 146 ) 147 if isinstance(data, real_time_hr.ReadingError): 148 error = True 149 break 150 if data.value != 0: 151 valid_readings.append(data.value) 152 except TimeoutError: 153 tries += 1 154 await self.send_packet(real_time_hr.CONTINUE_HEART_RATE_PACKET) 155 156 await self.send_packet( 157 real_time_hr.STOP_HEART_RATE_PACKET, 158 ) 159 if error: 160 return None 161 return valid_readings 162 163 async def get_realtime_spo2(self) -> list[int] | None: 164 return await self._poll_real_time_reading(real_time_hr.START_SPO2_PACKET) 165 166 async def set_time(self, ts: datetime) -> None: 167 await self.send_packet(set_time.set_time_packet(ts)) 168 169 async def blink_twice(self) -> None: 170 await self.send_packet(blink_twice.BLINK_TWICE_PACKET) 171 172 async def get_device_info(self) -> dict[str, str]: 173 client = self.bleak_client 174 data = {} 175 device_info_service = client.services.get_service(DEVICE_INFO_UUID) 176 assert device_info_service 177 178 hw_info_char = device_info_service.get_characteristic(DEVICE_HW_UUID) 179 assert hw_info_char 180 hw_version = await client.read_gatt_char(hw_info_char) 181 data["hw_version"] = hw_version.decode("utf-8") 182 183 fw_info_char = device_info_service.get_characteristic(DEVICE_FW_UUID) 184 assert fw_info_char 185 fw_version = await client.read_gatt_char(fw_info_char) 186 data["fw_version"] = fw_version.decode("utf-8") 187 188 return data 189 190 async def get_heart_rate_log(self, target: datetime | None = None) -> hr.HeartRateLog | hr.NoData: 191 if target is None: 192 target = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0) 193 await self.send_packet(hr.read_heart_rate_packet(target)) 194 return await asyncio.wait_for( 195 self.queues[hr.CMD_READ_HEART_RATE].get(), 196 timeout=2, 197 ) 198 199 async def get_heart_rate_log_settings(self) -> hr_settings.HeartRateLogSettings: 200 await self.send_packet(hr_settings.READ_HEART_RATE_LOG_SETTINGS_PACKET) 201 return await asyncio.wait_for( 202 self.queues[hr_settings.CMD_HEART_RATE_LOG_SETTINGS].get(), 203 timeout=2, 204 ) 205 206 async def set_heart_rate_log_settings(self, enabled: bool, interval: int) -> None: 207 await self.send_packet(hr_settings.hr_log_settings_packet(hr_settings.HeartRateLogSettings(enabled, interval))) 208 209 # clear response from queue as it's unused and wrong 210 await asyncio.wait_for( 211 self.queues[hr_settings.CMD_HEART_RATE_LOG_SETTINGS].get(), 212 timeout=2, 213 ) 214 215 async def get_steps(self, target: datetime, today: datetime | None = None) -> list[steps.SportDetail] | steps.NoData: 216 if today is None: 217 today = datetime.now(timezone.utc) 218 219 if target.tzinfo != timezone.utc: 220 logger.info("Converting target time to utc") 221 target = target.astimezone(tz=timezone.utc) 222 223 days = (today.date() - target.date()).days 224 logger.debug(f"Looking back {days} days") 225 226 await self.send_packet(steps.read_steps_packet(days)) 227 return await asyncio.wait_for( 228 self.queues[steps.CMD_GET_STEP_SOMEDAY].get(), 229 timeout=2, 230 ) 231 232 async def reboot(self) -> None: 233 await self.send_packet(reboot.REBOOT_PACKET) 234 235 async def raw(self, command: int, subdata: bytearray, replies: int = 0) -> list[bytearray]: 236 p = packet.make_packet(command, subdata) 237 await self.send_packet(p) 238 239 results = [] 240 while replies > 0: 241 data: bytearray = await asyncio.wait_for( 242 self.queues[command].get(), 243 timeout=2, 244 ) 245 results.append(data) 246 replies -= 1 247 248 return results
UART_SERVICE_UUID =
'6E40FFF0-B5A3-F393-E0A9-E50E24DCCA9E'
UART_RX_CHAR_UUID =
'6E400002-B5A3-F393-E0A9-E50E24DCCA9E'
UART_TX_CHAR_UUID =
'6E400003-B5A3-F393-E0A9-E50E24DCCA9E'
DEVICE_INFO_UUID =
'0000180A-0000-1000-8000-00805F9B34FB'
DEVICE_HW_UUID =
'00002A27-0000-1000-8000-00805F9B34FB'
DEVICE_FW_UUID =
'00002A26-0000-1000-8000-00805F9B34FB'
logger =
<Logger colmi_r02_client.client (WARNING)>
def
empty_parse(_packet: bytearray) -> None:
36def empty_parse(_packet: bytearray) -> None: 37 """Used for commands that we expect a response, but there's nothing in the response""" 38 return None
Used for commands that we expect a response, but there's nothing in the response
def
log_packet(packet: bytearray) -> None:
COMMAND_HANDLERS: dict[int, collections.abc.Callable[[bytearray], typing.Any]] =
{3: <function parse_battery>, 105: <function parse_heart_rate>, 106: <function empty_parse>, 67: <bound method SportDetailParser.parse of <colmi_r02_client.steps.SportDetailParser object>>, 21: <bound method HeartRateLogParser.parse of <colmi_r02_client.hr.HeartRateLogParser object>>, 1: <function empty_parse>, 22: <function parse_heart_rate_log_settings>}
TODO put these somewhere nice
These are commands that we expect to have a response returned for they must accept a packet as bytearray and then return a value to be put in the queue for that command type NOTE: if the value returned is None, it is not added to the queue, this is to support multi packet messages where the parser has state
class
Client:
65class Client: 66 def __init__(self, address: str, record_to: Path | None = None): 67 self.address = address 68 self.bleak_client = BleakClient(self.address) 69 self.queues: dict[int, asyncio.Queue] = {cmd: asyncio.Queue() for cmd in COMMAND_HANDLERS} 70 self.record_to = record_to 71 72 async def __aenter__(self) -> "Client": 73 logger.info(f"Connecting to {self.address}") 74 await self.connect() 75 logger.info("Connected!") 76 return self 77 78 async def __aexit__( 79 self, 80 exc_type: type[BaseException] | None, 81 exc_val: BaseException | None, 82 exc_tb: TracebackType | None, 83 ) -> None: 84 await self.disconnect() 85 86 async def connect(self): 87 await self.bleak_client.connect() 88 89 nrf_uart_service = self.bleak_client.services.get_service(UART_SERVICE_UUID) 90 assert nrf_uart_service 91 rx_char = nrf_uart_service.get_characteristic(UART_RX_CHAR_UUID) 92 assert rx_char 93 self.rx_char = rx_char 94 95 await self.bleak_client.start_notify(UART_TX_CHAR_UUID, self._handle_tx) 96 97 async def disconnect(self): 98 await self.bleak_client.disconnect() 99 100 def _handle_tx(self, _: BleakGATTCharacteristic, packet: bytearray) -> None: 101 """Bleak callback that handles new packets from the ring.""" 102 103 logger.info(f"Received packet {packet}") 104 105 assert len(packet) == 16, f"Packet is the wrong length {packet}" 106 packet_type = packet[0] 107 assert packet_type < 127, f"Packet has error bit set {packet}" 108 109 if packet_type in COMMAND_HANDLERS: 110 result = COMMAND_HANDLERS[packet_type](packet) 111 if result is not None: 112 self.queues[packet_type].put_nowait(result) 113 else: 114 logger.debug(f"No result returned from parser for {packet_type}") 115 else: 116 logger.warning(f"Did not expect this packet: {packet}") 117 118 if self.record_to is not None: 119 with self.record_to.open("ab") as f: 120 f.write(packet) 121 f.write(b"\n") 122 123 async def send_packet(self, packet: bytearray) -> None: 124 logger.debug(f"Sending packet: {packet}") 125 await self.bleak_client.write_gatt_char(self.rx_char, packet, response=False) 126 127 async def get_battery(self) -> battery.BatteryInfo: 128 await self.send_packet(battery.BATTERY_PACKET) 129 result = await self.queues[battery.CMD_BATTERY].get() 130 assert isinstance(result, battery.BatteryInfo) 131 return result 132 133 async def get_realtime_heart_rate(self) -> list[int] | None: 134 return await self._poll_real_time_reading(real_time_hr.START_HEART_RATE_PACKET) 135 136 async def _poll_real_time_reading(self, start_packet: bytearray) -> list[int] | None: 137 await self.send_packet(start_packet) 138 139 valid_readings: list[int] = [] 140 error = False 141 tries = 0 142 while len(valid_readings) < 6 and tries < 20: 143 try: 144 data: real_time_hr.Reading | real_time_hr.ReadingError = await asyncio.wait_for( 145 self.queues[real_time_hr.CMD_START_HEART_RATE].get(), 146 timeout=2, 147 ) 148 if isinstance(data, real_time_hr.ReadingError): 149 error = True 150 break 151 if data.value != 0: 152 valid_readings.append(data.value) 153 except TimeoutError: 154 tries += 1 155 await self.send_packet(real_time_hr.CONTINUE_HEART_RATE_PACKET) 156 157 await self.send_packet( 158 real_time_hr.STOP_HEART_RATE_PACKET, 159 ) 160 if error: 161 return None 162 return valid_readings 163 164 async def get_realtime_spo2(self) -> list[int] | None: 165 return await self._poll_real_time_reading(real_time_hr.START_SPO2_PACKET) 166 167 async def set_time(self, ts: datetime) -> None: 168 await self.send_packet(set_time.set_time_packet(ts)) 169 170 async def blink_twice(self) -> None: 171 await self.send_packet(blink_twice.BLINK_TWICE_PACKET) 172 173 async def get_device_info(self) -> dict[str, str]: 174 client = self.bleak_client 175 data = {} 176 device_info_service = client.services.get_service(DEVICE_INFO_UUID) 177 assert device_info_service 178 179 hw_info_char = device_info_service.get_characteristic(DEVICE_HW_UUID) 180 assert hw_info_char 181 hw_version = await client.read_gatt_char(hw_info_char) 182 data["hw_version"] = hw_version.decode("utf-8") 183 184 fw_info_char = device_info_service.get_characteristic(DEVICE_FW_UUID) 185 assert fw_info_char 186 fw_version = await client.read_gatt_char(fw_info_char) 187 data["fw_version"] = fw_version.decode("utf-8") 188 189 return data 190 191 async def get_heart_rate_log(self, target: datetime | None = None) -> hr.HeartRateLog | hr.NoData: 192 if target is None: 193 target = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0) 194 await self.send_packet(hr.read_heart_rate_packet(target)) 195 return await asyncio.wait_for( 196 self.queues[hr.CMD_READ_HEART_RATE].get(), 197 timeout=2, 198 ) 199 200 async def get_heart_rate_log_settings(self) -> hr_settings.HeartRateLogSettings: 201 await self.send_packet(hr_settings.READ_HEART_RATE_LOG_SETTINGS_PACKET) 202 return await asyncio.wait_for( 203 self.queues[hr_settings.CMD_HEART_RATE_LOG_SETTINGS].get(), 204 timeout=2, 205 ) 206 207 async def set_heart_rate_log_settings(self, enabled: bool, interval: int) -> None: 208 await self.send_packet(hr_settings.hr_log_settings_packet(hr_settings.HeartRateLogSettings(enabled, interval))) 209 210 # clear response from queue as it's unused and wrong 211 await asyncio.wait_for( 212 self.queues[hr_settings.CMD_HEART_RATE_LOG_SETTINGS].get(), 213 timeout=2, 214 ) 215 216 async def get_steps(self, target: datetime, today: datetime | None = None) -> list[steps.SportDetail] | steps.NoData: 217 if today is None: 218 today = datetime.now(timezone.utc) 219 220 if target.tzinfo != timezone.utc: 221 logger.info("Converting target time to utc") 222 target = target.astimezone(tz=timezone.utc) 223 224 days = (today.date() - target.date()).days 225 logger.debug(f"Looking back {days} days") 226 227 await self.send_packet(steps.read_steps_packet(days)) 228 return await asyncio.wait_for( 229 self.queues[steps.CMD_GET_STEP_SOMEDAY].get(), 230 timeout=2, 231 ) 232 233 async def reboot(self) -> None: 234 await self.send_packet(reboot.REBOOT_PACKET) 235 236 async def raw(self, command: int, subdata: bytearray, replies: int = 0) -> list[bytearray]: 237 p = packet.make_packet(command, subdata) 238 await self.send_packet(p) 239 240 results = [] 241 while replies > 0: 242 data: bytearray = await asyncio.wait_for( 243 self.queues[command].get(), 244 timeout=2, 245 ) 246 results.append(data) 247 replies -= 1 248 249 return results
async def
connect(self):
86 async def connect(self): 87 await self.bleak_client.connect() 88 89 nrf_uart_service = self.bleak_client.services.get_service(UART_SERVICE_UUID) 90 assert nrf_uart_service 91 rx_char = nrf_uart_service.get_characteristic(UART_RX_CHAR_UUID) 92 assert rx_char 93 self.rx_char = rx_char 94 95 await self.bleak_client.start_notify(UART_TX_CHAR_UUID, self._handle_tx)
async def
get_device_info(self) -> dict[str, str]:
173 async def get_device_info(self) -> dict[str, str]: 174 client = self.bleak_client 175 data = {} 176 device_info_service = client.services.get_service(DEVICE_INFO_UUID) 177 assert device_info_service 178 179 hw_info_char = device_info_service.get_characteristic(DEVICE_HW_UUID) 180 assert hw_info_char 181 hw_version = await client.read_gatt_char(hw_info_char) 182 data["hw_version"] = hw_version.decode("utf-8") 183 184 fw_info_char = device_info_service.get_characteristic(DEVICE_FW_UUID) 185 assert fw_info_char 186 fw_version = await client.read_gatt_char(fw_info_char) 187 data["fw_version"] = fw_version.decode("utf-8") 188 189 return data
async def
get_heart_rate_log( self, target: datetime.datetime | None = None) -> colmi_r02_client.hr.HeartRateLog | colmi_r02_client.hr.NoData:
191 async def get_heart_rate_log(self, target: datetime | None = None) -> hr.HeartRateLog | hr.NoData: 192 if target is None: 193 target = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0) 194 await self.send_packet(hr.read_heart_rate_packet(target)) 195 return await asyncio.wait_for( 196 self.queues[hr.CMD_READ_HEART_RATE].get(), 197 timeout=2, 198 )
async def
set_heart_rate_log_settings(self, enabled: bool, interval: int) -> None:
207 async def set_heart_rate_log_settings(self, enabled: bool, interval: int) -> None: 208 await self.send_packet(hr_settings.hr_log_settings_packet(hr_settings.HeartRateLogSettings(enabled, interval))) 209 210 # clear response from queue as it's unused and wrong 211 await asyncio.wait_for( 212 self.queues[hr_settings.CMD_HEART_RATE_LOG_SETTINGS].get(), 213 timeout=2, 214 )
async def
get_steps( self, target: datetime.datetime, today: datetime.datetime | None = None) -> list[colmi_r02_client.steps.SportDetail] | colmi_r02_client.steps.NoData:
216 async def get_steps(self, target: datetime, today: datetime | None = None) -> list[steps.SportDetail] | steps.NoData: 217 if today is None: 218 today = datetime.now(timezone.utc) 219 220 if target.tzinfo != timezone.utc: 221 logger.info("Converting target time to utc") 222 target = target.astimezone(tz=timezone.utc) 223 224 days = (today.date() - target.date()).days 225 logger.debug(f"Looking back {days} days") 226 227 await self.send_packet(steps.read_steps_packet(days)) 228 return await asyncio.wait_for( 229 self.queues[steps.CMD_GET_STEP_SOMEDAY].get(), 230 timeout=2, 231 )
async def
raw( self, command: int, subdata: bytearray, replies: int = 0) -> list[bytearray]:
236 async def raw(self, command: int, subdata: bytearray, replies: int = 0) -> list[bytearray]: 237 p = packet.make_packet(command, subdata) 238 await self.send_packet(p) 239 240 results = [] 241 while replies > 0: 242 data: bytearray = await asyncio.wait_for( 243 self.queues[command].get(), 244 timeout=2, 245 ) 246 results.append(data) 247 replies -= 1 248 249 return results