colmi_r02_client.client
1import asyncio 2from collections.abc import Callable 3from datetime import datetime, timezone 4from dataclasses import dataclass 5import logging 6from pathlib import Path 7from types import TracebackType 8from typing import Any 9 10from bleak import BleakClient 11from bleak.backends.characteristic import BleakGATTCharacteristic 12 13from colmi_r02_client import battery, date_utils, steps, set_time, blink_twice, hr, hr_settings, packet, reboot, real_time 14 15UART_SERVICE_UUID = "6E40FFF0-B5A3-F393-E0A9-E50E24DCCA9E" 16UART_RX_CHAR_UUID = "6E400002-B5A3-F393-E0A9-E50E24DCCA9E" 17UART_TX_CHAR_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E" 18 19DEVICE_INFO_UUID = "0000180A-0000-1000-8000-00805F9B34FB" 20DEVICE_HW_UUID = "00002A27-0000-1000-8000-00805F9B34FB" 21DEVICE_FW_UUID = "00002A26-0000-1000-8000-00805F9B34FB" 22 23logger = logging.getLogger(__name__) 24 25 26def empty_parse(_packet: bytearray) -> None: 27 """Used for commands that we expect a response, but there's nothing in the response""" 28 return None 29 30 31def log_packet(packet: bytearray) -> None: 32 print("received: ", packet) 33 34 35# TODO move this maybe? 36@dataclass 37class FullData: 38 address: str 39 heart_rates: list[hr.HeartRateLog | hr.NoData] 40 41 42COMMAND_HANDLERS: dict[int, Callable[[bytearray], Any]] = { 43 battery.CMD_BATTERY: battery.parse_battery, 44 real_time.CMD_START_REAL_TIME: real_time.parse_real_time_reading, 45 real_time.CMD_STOP_REAL_TIME: empty_parse, 46 steps.CMD_GET_STEP_SOMEDAY: steps.SportDetailParser().parse, 47 hr.CMD_READ_HEART_RATE: hr.HeartRateLogParser().parse, 48 set_time.CMD_SET_TIME: empty_parse, 49 hr_settings.CMD_HEART_RATE_LOG_SETTINGS: hr_settings.parse_heart_rate_log_settings, 50} 51""" 52TODO put these somewhere nice 53 54These are commands that we expect to have a response returned for 55they must accept a packet as bytearray and then return a value to be put 56in the queue for that command type 57NOTE: if the value returned is None, it is not added to the queue, this is to support 58multi packet messages where the parser has state 59""" 60 61 62class Client: 63 def __init__(self, address: str, record_to: Path | None = None): 64 self.address = address 65 self.bleak_client = BleakClient(self.address) 66 self.queues: dict[int, asyncio.Queue] = {cmd: asyncio.Queue() for cmd in COMMAND_HANDLERS} 67 self.record_to = record_to 68 69 async def __aenter__(self) -> "Client": 70 logger.info(f"Connecting to {self.address}") 71 await self.connect() 72 logger.info("Connected!") 73 return self 74 75 async def __aexit__( 76 self, 77 exc_type: type[BaseException] | None, 78 exc_val: BaseException | None, 79 exc_tb: TracebackType | None, 80 ) -> None: 81 logger.info("Disconnecting") 82 if exc_val is not None: 83 logger.error("had an error") 84 await self.disconnect() 85 86 async def connect(self): 87 await self.bleak_client.connect() 88 89 nrf_uart_service = self.bleak_client.services.get_service(UART_SERVICE_UUID) 90 assert nrf_uart_service 91 rx_char = nrf_uart_service.get_characteristic(UART_RX_CHAR_UUID) 92 assert rx_char 93 self.rx_char = rx_char 94 95 await self.bleak_client.start_notify(UART_TX_CHAR_UUID, self._handle_tx) 96 97 async def disconnect(self): 98 await self.bleak_client.disconnect() 99 100 def _handle_tx(self, _: BleakGATTCharacteristic, packet: bytearray) -> None: 101 """Bleak callback that handles new packets from the ring.""" 102 103 logger.info(f"Received packet {packet}") 104 105 assert len(packet) == 16, f"Packet is the wrong length {packet}" 106 packet_type = packet[0] 107 assert packet_type < 127, f"Packet has error bit set {packet}" 108 109 if packet_type in COMMAND_HANDLERS: 110 result = COMMAND_HANDLERS[packet_type](packet) 111 if result is not None: 112 self.queues[packet_type].put_nowait(result) 113 else: 114 logger.debug(f"No result returned from parser for {packet_type}") 115 else: 116 logger.warning(f"Did not expect this packet: {packet}") 117 118 if self.record_to is not None: 119 with self.record_to.open("ab") as f: 120 f.write(packet) 121 f.write(b"\n") 122 123 async def send_packet(self, packet: bytearray) -> None: 124 logger.debug(f"Sending packet: {packet}") 125 await self.bleak_client.write_gatt_char(self.rx_char, packet, response=False) 126 127 async def get_battery(self) -> battery.BatteryInfo: 128 await self.send_packet(battery.BATTERY_PACKET) 129 result = await self.queues[battery.CMD_BATTERY].get() 130 assert isinstance(result, battery.BatteryInfo) 131 return result 132 133 async def _poll_real_time_reading(self, reading_type: real_time.RealTimeReading) -> list[int] | None: 134 start_packet = real_time.get_start_packet(reading_type) 135 stop_packet = real_time.get_stop_packet(reading_type) 136 137 await self.send_packet(start_packet) 138 139 valid_readings: list[int] = [] 140 error = False 141 tries = 0 142 while len(valid_readings) < 6 and tries < 20: 143 try: 144 data: real_time.Reading | real_time.ReadingError = await asyncio.wait_for( 145 self.queues[real_time.CMD_START_REAL_TIME].get(), 146 timeout=2, 147 ) 148 if isinstance(data, real_time.ReadingError): 149 error = True 150 break 151 if data.value != 0: 152 valid_readings.append(data.value) 153 except TimeoutError: 154 tries += 1 155 156 await self.send_packet(stop_packet) 157 if error: 158 return None 159 return valid_readings 160 161 async def get_realtime_reading(self, reading_type: real_time.RealTimeReading) -> list[int] | None: 162 return await self._poll_real_time_reading(reading_type) 163 164 async def set_time(self, ts: datetime) -> None: 165 await self.send_packet(set_time.set_time_packet(ts)) 166 167 async def blink_twice(self) -> None: 168 await self.send_packet(blink_twice.BLINK_TWICE_PACKET) 169 170 async def get_device_info(self) -> dict[str, str]: 171 client = self.bleak_client 172 data = {} 173 device_info_service = client.services.get_service(DEVICE_INFO_UUID) 174 assert device_info_service 175 176 hw_info_char = device_info_service.get_characteristic(DEVICE_HW_UUID) 177 assert hw_info_char 178 hw_version = await client.read_gatt_char(hw_info_char) 179 data["hw_version"] = hw_version.decode("utf-8") 180 181 fw_info_char = device_info_service.get_characteristic(DEVICE_FW_UUID) 182 assert fw_info_char 183 fw_version = await client.read_gatt_char(fw_info_char) 184 data["fw_version"] = fw_version.decode("utf-8") 185 186 return data 187 188 async def get_heart_rate_log(self, target: datetime | None = None) -> hr.HeartRateLog | hr.NoData: 189 if target is None: 190 target = date_utils.start_of_day(date_utils.now()) 191 await self.send_packet(hr.read_heart_rate_packet(target)) 192 return await asyncio.wait_for( 193 self.queues[hr.CMD_READ_HEART_RATE].get(), 194 timeout=2, 195 ) 196 197 async def get_heart_rate_log_settings(self) -> hr_settings.HeartRateLogSettings: 198 await self.send_packet(hr_settings.READ_HEART_RATE_LOG_SETTINGS_PACKET) 199 return await asyncio.wait_for( 200 self.queues[hr_settings.CMD_HEART_RATE_LOG_SETTINGS].get(), 201 timeout=2, 202 ) 203 204 async def set_heart_rate_log_settings(self, enabled: bool, interval: int) -> None: 205 await self.send_packet(hr_settings.hr_log_settings_packet(hr_settings.HeartRateLogSettings(enabled, interval))) 206 207 # clear response from queue as it's unused and wrong 208 await asyncio.wait_for( 209 self.queues[hr_settings.CMD_HEART_RATE_LOG_SETTINGS].get(), 210 timeout=2, 211 ) 212 213 async def get_steps(self, target: datetime, today: datetime | None = None) -> list[steps.SportDetail] | steps.NoData: 214 if today is None: 215 today = datetime.now(timezone.utc) 216 217 if target.tzinfo != timezone.utc: 218 logger.info("Converting target time to utc") 219 target = target.astimezone(tz=timezone.utc) 220 221 days = (today.date() - target.date()).days 222 logger.debug(f"Looking back {days} days") 223 224 await self.send_packet(steps.read_steps_packet(days)) 225 return await asyncio.wait_for( 226 self.queues[steps.CMD_GET_STEP_SOMEDAY].get(), 227 timeout=2, 228 ) 229 230 async def reboot(self) -> None: 231 await self.send_packet(reboot.REBOOT_PACKET) 232 233 async def raw(self, command: int, subdata: bytearray, replies: int = 0) -> list[bytearray]: 234 p = packet.make_packet(command, subdata) 235 await self.send_packet(p) 236 237 results = [] 238 while replies > 0: 239 data: bytearray = await asyncio.wait_for( 240 self.queues[command].get(), 241 timeout=2, 242 ) 243 results.append(data) 244 replies -= 1 245 246 return results 247 248 async def get_full_data(self, start: datetime, end: datetime) -> FullData: 249 """ 250 Fetches all data from the ring between start and end. Useful for syncing. 251 """ 252 253 logs = [] 254 for d in date_utils.dates_between(start, end): 255 logs.append(await self.get_heart_rate_log(d)) 256 257 return FullData(self.address, logs)
UART_SERVICE_UUID =
'6E40FFF0-B5A3-F393-E0A9-E50E24DCCA9E'
UART_RX_CHAR_UUID =
'6E400002-B5A3-F393-E0A9-E50E24DCCA9E'
UART_TX_CHAR_UUID =
'6E400003-B5A3-F393-E0A9-E50E24DCCA9E'
DEVICE_INFO_UUID =
'0000180A-0000-1000-8000-00805F9B34FB'
DEVICE_HW_UUID =
'00002A27-0000-1000-8000-00805F9B34FB'
DEVICE_FW_UUID =
'00002A26-0000-1000-8000-00805F9B34FB'
logger =
<Logger colmi_r02_client.client (WARNING)>
def
empty_parse(_packet: bytearray) -> None:
27def empty_parse(_packet: bytearray) -> None: 28 """Used for commands that we expect a response, but there's nothing in the response""" 29 return None
Used for commands that we expect a response, but there's nothing in the response
def
log_packet(packet: bytearray) -> None:
@dataclass
class
FullData:
FullData( address: str, heart_rates: list[colmi_r02_client.hr.HeartRateLog | colmi_r02_client.hr.NoData])
heart_rates: list[colmi_r02_client.hr.HeartRateLog | colmi_r02_client.hr.NoData]
COMMAND_HANDLERS: dict[int, Callable[[bytearray], typing.Any]] =
{3: <function parse_battery>, 105: <function parse_real_time_reading>, 106: <function empty_parse>, 67: <bound method SportDetailParser.parse of <colmi_r02_client.steps.SportDetailParser object>>, 21: <bound method HeartRateLogParser.parse of <colmi_r02_client.hr.HeartRateLogParser object>>, 1: <function empty_parse>, 22: <function parse_heart_rate_log_settings>}
TODO put these somewhere nice
These are commands that we expect to have a response returned for they must accept a packet as bytearray and then return a value to be put in the queue for that command type NOTE: if the value returned is None, it is not added to the queue, this is to support multi packet messages where the parser has state
class
Client:
63class Client: 64 def __init__(self, address: str, record_to: Path | None = None): 65 self.address = address 66 self.bleak_client = BleakClient(self.address) 67 self.queues: dict[int, asyncio.Queue] = {cmd: asyncio.Queue() for cmd in COMMAND_HANDLERS} 68 self.record_to = record_to 69 70 async def __aenter__(self) -> "Client": 71 logger.info(f"Connecting to {self.address}") 72 await self.connect() 73 logger.info("Connected!") 74 return self 75 76 async def __aexit__( 77 self, 78 exc_type: type[BaseException] | None, 79 exc_val: BaseException | None, 80 exc_tb: TracebackType | None, 81 ) -> None: 82 logger.info("Disconnecting") 83 if exc_val is not None: 84 logger.error("had an error") 85 await self.disconnect() 86 87 async def connect(self): 88 await self.bleak_client.connect() 89 90 nrf_uart_service = self.bleak_client.services.get_service(UART_SERVICE_UUID) 91 assert nrf_uart_service 92 rx_char = nrf_uart_service.get_characteristic(UART_RX_CHAR_UUID) 93 assert rx_char 94 self.rx_char = rx_char 95 96 await self.bleak_client.start_notify(UART_TX_CHAR_UUID, self._handle_tx) 97 98 async def disconnect(self): 99 await self.bleak_client.disconnect() 100 101 def _handle_tx(self, _: BleakGATTCharacteristic, packet: bytearray) -> None: 102 """Bleak callback that handles new packets from the ring.""" 103 104 logger.info(f"Received packet {packet}") 105 106 assert len(packet) == 16, f"Packet is the wrong length {packet}" 107 packet_type = packet[0] 108 assert packet_type < 127, f"Packet has error bit set {packet}" 109 110 if packet_type in COMMAND_HANDLERS: 111 result = COMMAND_HANDLERS[packet_type](packet) 112 if result is not None: 113 self.queues[packet_type].put_nowait(result) 114 else: 115 logger.debug(f"No result returned from parser for {packet_type}") 116 else: 117 logger.warning(f"Did not expect this packet: {packet}") 118 119 if self.record_to is not None: 120 with self.record_to.open("ab") as f: 121 f.write(packet) 122 f.write(b"\n") 123 124 async def send_packet(self, packet: bytearray) -> None: 125 logger.debug(f"Sending packet: {packet}") 126 await self.bleak_client.write_gatt_char(self.rx_char, packet, response=False) 127 128 async def get_battery(self) -> battery.BatteryInfo: 129 await self.send_packet(battery.BATTERY_PACKET) 130 result = await self.queues[battery.CMD_BATTERY].get() 131 assert isinstance(result, battery.BatteryInfo) 132 return result 133 134 async def _poll_real_time_reading(self, reading_type: real_time.RealTimeReading) -> list[int] | None: 135 start_packet = real_time.get_start_packet(reading_type) 136 stop_packet = real_time.get_stop_packet(reading_type) 137 138 await self.send_packet(start_packet) 139 140 valid_readings: list[int] = [] 141 error = False 142 tries = 0 143 while len(valid_readings) < 6 and tries < 20: 144 try: 145 data: real_time.Reading | real_time.ReadingError = await asyncio.wait_for( 146 self.queues[real_time.CMD_START_REAL_TIME].get(), 147 timeout=2, 148 ) 149 if isinstance(data, real_time.ReadingError): 150 error = True 151 break 152 if data.value != 0: 153 valid_readings.append(data.value) 154 except TimeoutError: 155 tries += 1 156 157 await self.send_packet(stop_packet) 158 if error: 159 return None 160 return valid_readings 161 162 async def get_realtime_reading(self, reading_type: real_time.RealTimeReading) -> list[int] | None: 163 return await self._poll_real_time_reading(reading_type) 164 165 async def set_time(self, ts: datetime) -> None: 166 await self.send_packet(set_time.set_time_packet(ts)) 167 168 async def blink_twice(self) -> None: 169 await self.send_packet(blink_twice.BLINK_TWICE_PACKET) 170 171 async def get_device_info(self) -> dict[str, str]: 172 client = self.bleak_client 173 data = {} 174 device_info_service = client.services.get_service(DEVICE_INFO_UUID) 175 assert device_info_service 176 177 hw_info_char = device_info_service.get_characteristic(DEVICE_HW_UUID) 178 assert hw_info_char 179 hw_version = await client.read_gatt_char(hw_info_char) 180 data["hw_version"] = hw_version.decode("utf-8") 181 182 fw_info_char = device_info_service.get_characteristic(DEVICE_FW_UUID) 183 assert fw_info_char 184 fw_version = await client.read_gatt_char(fw_info_char) 185 data["fw_version"] = fw_version.decode("utf-8") 186 187 return data 188 189 async def get_heart_rate_log(self, target: datetime | None = None) -> hr.HeartRateLog | hr.NoData: 190 if target is None: 191 target = date_utils.start_of_day(date_utils.now()) 192 await self.send_packet(hr.read_heart_rate_packet(target)) 193 return await asyncio.wait_for( 194 self.queues[hr.CMD_READ_HEART_RATE].get(), 195 timeout=2, 196 ) 197 198 async def get_heart_rate_log_settings(self) -> hr_settings.HeartRateLogSettings: 199 await self.send_packet(hr_settings.READ_HEART_RATE_LOG_SETTINGS_PACKET) 200 return await asyncio.wait_for( 201 self.queues[hr_settings.CMD_HEART_RATE_LOG_SETTINGS].get(), 202 timeout=2, 203 ) 204 205 async def set_heart_rate_log_settings(self, enabled: bool, interval: int) -> None: 206 await self.send_packet(hr_settings.hr_log_settings_packet(hr_settings.HeartRateLogSettings(enabled, interval))) 207 208 # clear response from queue as it's unused and wrong 209 await asyncio.wait_for( 210 self.queues[hr_settings.CMD_HEART_RATE_LOG_SETTINGS].get(), 211 timeout=2, 212 ) 213 214 async def get_steps(self, target: datetime, today: datetime | None = None) -> list[steps.SportDetail] | steps.NoData: 215 if today is None: 216 today = datetime.now(timezone.utc) 217 218 if target.tzinfo != timezone.utc: 219 logger.info("Converting target time to utc") 220 target = target.astimezone(tz=timezone.utc) 221 222 days = (today.date() - target.date()).days 223 logger.debug(f"Looking back {days} days") 224 225 await self.send_packet(steps.read_steps_packet(days)) 226 return await asyncio.wait_for( 227 self.queues[steps.CMD_GET_STEP_SOMEDAY].get(), 228 timeout=2, 229 ) 230 231 async def reboot(self) -> None: 232 await self.send_packet(reboot.REBOOT_PACKET) 233 234 async def raw(self, command: int, subdata: bytearray, replies: int = 0) -> list[bytearray]: 235 p = packet.make_packet(command, subdata) 236 await self.send_packet(p) 237 238 results = [] 239 while replies > 0: 240 data: bytearray = await asyncio.wait_for( 241 self.queues[command].get(), 242 timeout=2, 243 ) 244 results.append(data) 245 replies -= 1 246 247 return results 248 249 async def get_full_data(self, start: datetime, end: datetime) -> FullData: 250 """ 251 Fetches all data from the ring between start and end. Useful for syncing. 252 """ 253 254 logs = [] 255 for d in date_utils.dates_between(start, end): 256 logs.append(await self.get_heart_rate_log(d)) 257 258 return FullData(self.address, logs)
async def
connect(self):
87 async def connect(self): 88 await self.bleak_client.connect() 89 90 nrf_uart_service = self.bleak_client.services.get_service(UART_SERVICE_UUID) 91 assert nrf_uart_service 92 rx_char = nrf_uart_service.get_characteristic(UART_RX_CHAR_UUID) 93 assert rx_char 94 self.rx_char = rx_char 95 96 await self.bleak_client.start_notify(UART_TX_CHAR_UUID, self._handle_tx)
async def
get_realtime_reading( self, reading_type: colmi_r02_client.real_time.RealTimeReading) -> list[int] | None:
async def
get_device_info(self) -> dict[str, str]:
171 async def get_device_info(self) -> dict[str, str]: 172 client = self.bleak_client 173 data = {} 174 device_info_service = client.services.get_service(DEVICE_INFO_UUID) 175 assert device_info_service 176 177 hw_info_char = device_info_service.get_characteristic(DEVICE_HW_UUID) 178 assert hw_info_char 179 hw_version = await client.read_gatt_char(hw_info_char) 180 data["hw_version"] = hw_version.decode("utf-8") 181 182 fw_info_char = device_info_service.get_characteristic(DEVICE_FW_UUID) 183 assert fw_info_char 184 fw_version = await client.read_gatt_char(fw_info_char) 185 data["fw_version"] = fw_version.decode("utf-8") 186 187 return data
async def
get_heart_rate_log( self, target: datetime.datetime | None = None) -> colmi_r02_client.hr.HeartRateLog | colmi_r02_client.hr.NoData:
189 async def get_heart_rate_log(self, target: datetime | None = None) -> hr.HeartRateLog | hr.NoData: 190 if target is None: 191 target = date_utils.start_of_day(date_utils.now()) 192 await self.send_packet(hr.read_heart_rate_packet(target)) 193 return await asyncio.wait_for( 194 self.queues[hr.CMD_READ_HEART_RATE].get(), 195 timeout=2, 196 )
async def
set_heart_rate_log_settings(self, enabled: bool, interval: int) -> None:
205 async def set_heart_rate_log_settings(self, enabled: bool, interval: int) -> None: 206 await self.send_packet(hr_settings.hr_log_settings_packet(hr_settings.HeartRateLogSettings(enabled, interval))) 207 208 # clear response from queue as it's unused and wrong 209 await asyncio.wait_for( 210 self.queues[hr_settings.CMD_HEART_RATE_LOG_SETTINGS].get(), 211 timeout=2, 212 )
async def
get_steps( self, target: datetime.datetime, today: datetime.datetime | None = None) -> list[colmi_r02_client.steps.SportDetail] | colmi_r02_client.steps.NoData:
214 async def get_steps(self, target: datetime, today: datetime | None = None) -> list[steps.SportDetail] | steps.NoData: 215 if today is None: 216 today = datetime.now(timezone.utc) 217 218 if target.tzinfo != timezone.utc: 219 logger.info("Converting target time to utc") 220 target = target.astimezone(tz=timezone.utc) 221 222 days = (today.date() - target.date()).days 223 logger.debug(f"Looking back {days} days") 224 225 await self.send_packet(steps.read_steps_packet(days)) 226 return await asyncio.wait_for( 227 self.queues[steps.CMD_GET_STEP_SOMEDAY].get(), 228 timeout=2, 229 )
async def
raw( self, command: int, subdata: bytearray, replies: int = 0) -> list[bytearray]:
234 async def raw(self, command: int, subdata: bytearray, replies: int = 0) -> list[bytearray]: 235 p = packet.make_packet(command, subdata) 236 await self.send_packet(p) 237 238 results = [] 239 while replies > 0: 240 data: bytearray = await asyncio.wait_for( 241 self.queues[command].get(), 242 timeout=2, 243 ) 244 results.append(data) 245 replies -= 1 246 247 return results
249 async def get_full_data(self, start: datetime, end: datetime) -> FullData: 250 """ 251 Fetches all data from the ring between start and end. Useful for syncing. 252 """ 253 254 logs = [] 255 for d in date_utils.dates_between(start, end): 256 logs.append(await self.get_heart_rate_log(d)) 257 258 return FullData(self.address, logs)
Fetches all data from the ring between start and end. Useful for syncing.