colmi_r02_client.client

  1import asyncio
  2from collections.abc import Callable
  3from datetime import datetime, timezone
  4import logging
  5from pathlib import Path
  6from types import TracebackType
  7from typing import Any
  8
  9from bleak import BleakClient
 10from bleak.backends.characteristic import BleakGATTCharacteristic
 11
 12from colmi_r02_client import (
 13    battery,
 14    real_time_hr,
 15    steps,
 16    set_time,
 17    blink_twice,
 18    hr,
 19    hr_settings,
 20    packet,
 21    reboot,
 22)
 23
 24UART_SERVICE_UUID = "6E40FFF0-B5A3-F393-E0A9-E50E24DCCA9E"
 25UART_RX_CHAR_UUID = "6E400002-B5A3-F393-E0A9-E50E24DCCA9E"
 26UART_TX_CHAR_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E"
 27
 28DEVICE_INFO_UUID = "0000180A-0000-1000-8000-00805F9B34FB"
 29DEVICE_HW_UUID = "00002A27-0000-1000-8000-00805F9B34FB"
 30DEVICE_FW_UUID = "00002A26-0000-1000-8000-00805F9B34FB"
 31
 32logger = logging.getLogger(__name__)
 33
 34
 35def empty_parse(_packet: bytearray) -> None:
 36    """Used for commands that we expect a response, but there's nothing in the response"""
 37    return None
 38
 39
 40def log_packet(packet: bytearray) -> None:
 41    print("received: ", packet)
 42
 43
 44COMMAND_HANDLERS: dict[int, Callable[[bytearray], Any]] = {
 45    battery.CMD_BATTERY: battery.parse_battery,
 46    real_time_hr.CMD_START_HEART_RATE: real_time_hr.parse_heart_rate,
 47    real_time_hr.CMD_STOP_HEART_RATE: empty_parse,
 48    steps.CMD_GET_STEP_SOMEDAY: steps.SportDetailParser().parse,
 49    hr.CMD_READ_HEART_RATE: hr.HeartRateLogParser().parse,
 50    set_time.CMD_SET_TIME: empty_parse,
 51    hr_settings.CMD_HEART_RATE_LOG_SETTINGS: hr_settings.parse_heart_rate_log_settings,
 52}
 53"""
 54TODO put these somewhere nice
 55
 56These are commands that we expect to have a response returned for
 57they must accept a packet as bytearray and then return a value to be put
 58in the queue for that command type
 59NOTE: if the value returned is None, it is not added to the queue, this is to support
 60multi packet messages where the parser has state
 61"""
 62
 63
 64class Client:
 65    def __init__(self, address: str, record_to: Path | None = None):
 66        self.address = address
 67        self.bleak_client = BleakClient(self.address)
 68        self.queues: dict[int, asyncio.Queue] = {cmd: asyncio.Queue() for cmd in COMMAND_HANDLERS}
 69        self.record_to = record_to
 70
 71    async def __aenter__(self) -> "Client":
 72        logger.info(f"Connecting to {self.address}")
 73        await self.connect()
 74        logger.info("Connected!")
 75        return self
 76
 77    async def __aexit__(
 78        self,
 79        exc_type: type[BaseException] | None,
 80        exc_val: BaseException | None,
 81        exc_tb: TracebackType | None,
 82    ) -> None:
 83        await self.disconnect()
 84
 85    async def connect(self):
 86        await self.bleak_client.connect()
 87
 88        nrf_uart_service = self.bleak_client.services.get_service(UART_SERVICE_UUID)
 89        assert nrf_uart_service
 90        rx_char = nrf_uart_service.get_characteristic(UART_RX_CHAR_UUID)
 91        assert rx_char
 92        self.rx_char = rx_char
 93
 94        await self.bleak_client.start_notify(UART_TX_CHAR_UUID, self._handle_tx)
 95
 96    async def disconnect(self):
 97        await self.bleak_client.disconnect()
 98
 99    def _handle_tx(self, _: BleakGATTCharacteristic, packet: bytearray) -> None:
100        """Bleak callback that handles new packets from the ring."""
101
102        logger.info(f"Received packet {packet}")
103
104        assert len(packet) == 16, f"Packet is the wrong length {packet}"
105        packet_type = packet[0]
106        assert packet_type < 127, f"Packet has error bit set {packet}"
107
108        if packet_type in COMMAND_HANDLERS:
109            result = COMMAND_HANDLERS[packet_type](packet)
110            if result is not None:
111                self.queues[packet_type].put_nowait(result)
112            else:
113                logger.debug(f"No result returned from parser for {packet_type}")
114        else:
115            logger.warning(f"Did not expect this packet: {packet}")
116
117        if self.record_to is not None:
118            with self.record_to.open("ab") as f:
119                f.write(packet)
120                f.write(b"\n")
121
122    async def send_packet(self, packet: bytearray) -> None:
123        logger.debug(f"Sending packet: {packet}")
124        await self.bleak_client.write_gatt_char(self.rx_char, packet, response=False)
125
126    async def get_battery(self) -> battery.BatteryInfo:
127        await self.send_packet(battery.BATTERY_PACKET)
128        result = await self.queues[battery.CMD_BATTERY].get()
129        assert isinstance(result, battery.BatteryInfo)
130        return result
131
132    async def get_realtime_heart_rate(self) -> list[int] | None:
133        return await self._poll_real_time_reading(real_time_hr.START_HEART_RATE_PACKET)
134
135    async def _poll_real_time_reading(self, start_packet: bytearray) -> list[int] | None:
136        await self.send_packet(start_packet)
137
138        valid_readings: list[int] = []
139        error = False
140        tries = 0
141        while len(valid_readings) < 6 and tries < 20:
142            try:
143                data: real_time_hr.Reading | real_time_hr.ReadingError = await asyncio.wait_for(
144                    self.queues[real_time_hr.CMD_START_HEART_RATE].get(),
145                    timeout=2,
146                )
147                if isinstance(data, real_time_hr.ReadingError):
148                    error = True
149                    break
150                if data.value != 0:
151                    valid_readings.append(data.value)
152            except TimeoutError:
153                tries += 1
154                await self.send_packet(real_time_hr.CONTINUE_HEART_RATE_PACKET)
155
156        await self.send_packet(
157            real_time_hr.STOP_HEART_RATE_PACKET,
158        )
159        if error:
160            return None
161        return valid_readings
162
163    async def get_realtime_spo2(self) -> list[int] | None:
164        return await self._poll_real_time_reading(real_time_hr.START_SPO2_PACKET)
165
166    async def set_time(self, ts: datetime) -> None:
167        await self.send_packet(set_time.set_time_packet(ts))
168
169    async def blink_twice(self) -> None:
170        await self.send_packet(blink_twice.BLINK_TWICE_PACKET)
171
172    async def get_device_info(self) -> dict[str, str]:
173        client = self.bleak_client
174        data = {}
175        device_info_service = client.services.get_service(DEVICE_INFO_UUID)
176        assert device_info_service
177
178        hw_info_char = device_info_service.get_characteristic(DEVICE_HW_UUID)
179        assert hw_info_char
180        hw_version = await client.read_gatt_char(hw_info_char)
181        data["hw_version"] = hw_version.decode("utf-8")
182
183        fw_info_char = device_info_service.get_characteristic(DEVICE_FW_UUID)
184        assert fw_info_char
185        fw_version = await client.read_gatt_char(fw_info_char)
186        data["fw_version"] = fw_version.decode("utf-8")
187
188        return data
189
190    async def get_heart_rate_log(self, target: datetime | None = None) -> hr.HeartRateLog | hr.NoData:
191        if target is None:
192            target = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0)
193        await self.send_packet(hr.read_heart_rate_packet(target))
194        return await asyncio.wait_for(
195            self.queues[hr.CMD_READ_HEART_RATE].get(),
196            timeout=2,
197        )
198
199    async def get_heart_rate_log_settings(self) -> hr_settings.HeartRateLogSettings:
200        await self.send_packet(hr_settings.READ_HEART_RATE_LOG_SETTINGS_PACKET)
201        return await asyncio.wait_for(
202            self.queues[hr_settings.CMD_HEART_RATE_LOG_SETTINGS].get(),
203            timeout=2,
204        )
205
206    async def set_heart_rate_log_settings(self, enabled: bool, interval: int) -> None:
207        await self.send_packet(hr_settings.hr_log_settings_packet(hr_settings.HeartRateLogSettings(enabled, interval)))
208
209        # clear response from queue as it's unused and wrong
210        await asyncio.wait_for(
211            self.queues[hr_settings.CMD_HEART_RATE_LOG_SETTINGS].get(),
212            timeout=2,
213        )
214
215    async def get_steps(self, target: datetime, today: datetime | None = None) -> list[steps.SportDetail] | steps.NoData:
216        if today is None:
217            today = datetime.now(timezone.utc)
218
219        if target.tzinfo != timezone.utc:
220            logger.info("Converting target time to utc")
221            target = target.astimezone(tz=timezone.utc)
222
223        days = (today.date() - target.date()).days
224        logger.debug(f"Looking back {days} days")
225
226        await self.send_packet(steps.read_steps_packet(days))
227        return await asyncio.wait_for(
228            self.queues[steps.CMD_GET_STEP_SOMEDAY].get(),
229            timeout=2,
230        )
231
232    async def reboot(self) -> None:
233        await self.send_packet(reboot.REBOOT_PACKET)
234
235    async def raw(self, command: int, subdata: bytearray, replies: int = 0) -> list[bytearray]:
236        p = packet.make_packet(command, subdata)
237        await self.send_packet(p)
238
239        results = []
240        while replies > 0:
241            data: bytearray = await asyncio.wait_for(
242                self.queues[command].get(),
243                timeout=2,
244            )
245            results.append(data)
246            replies -= 1
247
248        return results
UART_SERVICE_UUID = '6E40FFF0-B5A3-F393-E0A9-E50E24DCCA9E'
UART_RX_CHAR_UUID = '6E400002-B5A3-F393-E0A9-E50E24DCCA9E'
UART_TX_CHAR_UUID = '6E400003-B5A3-F393-E0A9-E50E24DCCA9E'
DEVICE_INFO_UUID = '0000180A-0000-1000-8000-00805F9B34FB'
DEVICE_HW_UUID = '00002A27-0000-1000-8000-00805F9B34FB'
DEVICE_FW_UUID = '00002A26-0000-1000-8000-00805F9B34FB'
logger = <Logger colmi_r02_client.client (WARNING)>
def empty_parse(_packet: bytearray) -> None:
36def empty_parse(_packet: bytearray) -> None:
37    """Used for commands that we expect a response, but there's nothing in the response"""
38    return None

Used for commands that we expect a response, but there's nothing in the response

def log_packet(packet: bytearray) -> None:
41def log_packet(packet: bytearray) -> None:
42    print("received: ", packet)
COMMAND_HANDLERS: dict[int, collections.abc.Callable[[bytearray], typing.Any]] = {3: <function parse_battery>, 105: <function parse_heart_rate>, 106: <function empty_parse>, 67: <bound method SportDetailParser.parse of <colmi_r02_client.steps.SportDetailParser object>>, 21: <bound method HeartRateLogParser.parse of <colmi_r02_client.hr.HeartRateLogParser object>>, 1: <function empty_parse>, 22: <function parse_heart_rate_log_settings>}

TODO put these somewhere nice

These are commands that we expect to have a response returned for they must accept a packet as bytearray and then return a value to be put in the queue for that command type NOTE: if the value returned is None, it is not added to the queue, this is to support multi packet messages where the parser has state

class Client:
 65class Client:
 66    def __init__(self, address: str, record_to: Path | None = None):
 67        self.address = address
 68        self.bleak_client = BleakClient(self.address)
 69        self.queues: dict[int, asyncio.Queue] = {cmd: asyncio.Queue() for cmd in COMMAND_HANDLERS}
 70        self.record_to = record_to
 71
 72    async def __aenter__(self) -> "Client":
 73        logger.info(f"Connecting to {self.address}")
 74        await self.connect()
 75        logger.info("Connected!")
 76        return self
 77
 78    async def __aexit__(
 79        self,
 80        exc_type: type[BaseException] | None,
 81        exc_val: BaseException | None,
 82        exc_tb: TracebackType | None,
 83    ) -> None:
 84        await self.disconnect()
 85
 86    async def connect(self):
 87        await self.bleak_client.connect()
 88
 89        nrf_uart_service = self.bleak_client.services.get_service(UART_SERVICE_UUID)
 90        assert nrf_uart_service
 91        rx_char = nrf_uart_service.get_characteristic(UART_RX_CHAR_UUID)
 92        assert rx_char
 93        self.rx_char = rx_char
 94
 95        await self.bleak_client.start_notify(UART_TX_CHAR_UUID, self._handle_tx)
 96
 97    async def disconnect(self):
 98        await self.bleak_client.disconnect()
 99
100    def _handle_tx(self, _: BleakGATTCharacteristic, packet: bytearray) -> None:
101        """Bleak callback that handles new packets from the ring."""
102
103        logger.info(f"Received packet {packet}")
104
105        assert len(packet) == 16, f"Packet is the wrong length {packet}"
106        packet_type = packet[0]
107        assert packet_type < 127, f"Packet has error bit set {packet}"
108
109        if packet_type in COMMAND_HANDLERS:
110            result = COMMAND_HANDLERS[packet_type](packet)
111            if result is not None:
112                self.queues[packet_type].put_nowait(result)
113            else:
114                logger.debug(f"No result returned from parser for {packet_type}")
115        else:
116            logger.warning(f"Did not expect this packet: {packet}")
117
118        if self.record_to is not None:
119            with self.record_to.open("ab") as f:
120                f.write(packet)
121                f.write(b"\n")
122
123    async def send_packet(self, packet: bytearray) -> None:
124        logger.debug(f"Sending packet: {packet}")
125        await self.bleak_client.write_gatt_char(self.rx_char, packet, response=False)
126
127    async def get_battery(self) -> battery.BatteryInfo:
128        await self.send_packet(battery.BATTERY_PACKET)
129        result = await self.queues[battery.CMD_BATTERY].get()
130        assert isinstance(result, battery.BatteryInfo)
131        return result
132
133    async def get_realtime_heart_rate(self) -> list[int] | None:
134        return await self._poll_real_time_reading(real_time_hr.START_HEART_RATE_PACKET)
135
136    async def _poll_real_time_reading(self, start_packet: bytearray) -> list[int] | None:
137        await self.send_packet(start_packet)
138
139        valid_readings: list[int] = []
140        error = False
141        tries = 0
142        while len(valid_readings) < 6 and tries < 20:
143            try:
144                data: real_time_hr.Reading | real_time_hr.ReadingError = await asyncio.wait_for(
145                    self.queues[real_time_hr.CMD_START_HEART_RATE].get(),
146                    timeout=2,
147                )
148                if isinstance(data, real_time_hr.ReadingError):
149                    error = True
150                    break
151                if data.value != 0:
152                    valid_readings.append(data.value)
153            except TimeoutError:
154                tries += 1
155                await self.send_packet(real_time_hr.CONTINUE_HEART_RATE_PACKET)
156
157        await self.send_packet(
158            real_time_hr.STOP_HEART_RATE_PACKET,
159        )
160        if error:
161            return None
162        return valid_readings
163
164    async def get_realtime_spo2(self) -> list[int] | None:
165        return await self._poll_real_time_reading(real_time_hr.START_SPO2_PACKET)
166
167    async def set_time(self, ts: datetime) -> None:
168        await self.send_packet(set_time.set_time_packet(ts))
169
170    async def blink_twice(self) -> None:
171        await self.send_packet(blink_twice.BLINK_TWICE_PACKET)
172
173    async def get_device_info(self) -> dict[str, str]:
174        client = self.bleak_client
175        data = {}
176        device_info_service = client.services.get_service(DEVICE_INFO_UUID)
177        assert device_info_service
178
179        hw_info_char = device_info_service.get_characteristic(DEVICE_HW_UUID)
180        assert hw_info_char
181        hw_version = await client.read_gatt_char(hw_info_char)
182        data["hw_version"] = hw_version.decode("utf-8")
183
184        fw_info_char = device_info_service.get_characteristic(DEVICE_FW_UUID)
185        assert fw_info_char
186        fw_version = await client.read_gatt_char(fw_info_char)
187        data["fw_version"] = fw_version.decode("utf-8")
188
189        return data
190
191    async def get_heart_rate_log(self, target: datetime | None = None) -> hr.HeartRateLog | hr.NoData:
192        if target is None:
193            target = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0)
194        await self.send_packet(hr.read_heart_rate_packet(target))
195        return await asyncio.wait_for(
196            self.queues[hr.CMD_READ_HEART_RATE].get(),
197            timeout=2,
198        )
199
200    async def get_heart_rate_log_settings(self) -> hr_settings.HeartRateLogSettings:
201        await self.send_packet(hr_settings.READ_HEART_RATE_LOG_SETTINGS_PACKET)
202        return await asyncio.wait_for(
203            self.queues[hr_settings.CMD_HEART_RATE_LOG_SETTINGS].get(),
204            timeout=2,
205        )
206
207    async def set_heart_rate_log_settings(self, enabled: bool, interval: int) -> None:
208        await self.send_packet(hr_settings.hr_log_settings_packet(hr_settings.HeartRateLogSettings(enabled, interval)))
209
210        # clear response from queue as it's unused and wrong
211        await asyncio.wait_for(
212            self.queues[hr_settings.CMD_HEART_RATE_LOG_SETTINGS].get(),
213            timeout=2,
214        )
215
216    async def get_steps(self, target: datetime, today: datetime | None = None) -> list[steps.SportDetail] | steps.NoData:
217        if today is None:
218            today = datetime.now(timezone.utc)
219
220        if target.tzinfo != timezone.utc:
221            logger.info("Converting target time to utc")
222            target = target.astimezone(tz=timezone.utc)
223
224        days = (today.date() - target.date()).days
225        logger.debug(f"Looking back {days} days")
226
227        await self.send_packet(steps.read_steps_packet(days))
228        return await asyncio.wait_for(
229            self.queues[steps.CMD_GET_STEP_SOMEDAY].get(),
230            timeout=2,
231        )
232
233    async def reboot(self) -> None:
234        await self.send_packet(reboot.REBOOT_PACKET)
235
236    async def raw(self, command: int, subdata: bytearray, replies: int = 0) -> list[bytearray]:
237        p = packet.make_packet(command, subdata)
238        await self.send_packet(p)
239
240        results = []
241        while replies > 0:
242            data: bytearray = await asyncio.wait_for(
243                self.queues[command].get(),
244                timeout=2,
245            )
246            results.append(data)
247            replies -= 1
248
249        return results
Client(address: str, record_to: pathlib.Path | None = None)
66    def __init__(self, address: str, record_to: Path | None = None):
67        self.address = address
68        self.bleak_client = BleakClient(self.address)
69        self.queues: dict[int, asyncio.Queue] = {cmd: asyncio.Queue() for cmd in COMMAND_HANDLERS}
70        self.record_to = record_to
address
bleak_client
queues: dict[int, asyncio.queues.Queue]
record_to
async def connect(self):
86    async def connect(self):
87        await self.bleak_client.connect()
88
89        nrf_uart_service = self.bleak_client.services.get_service(UART_SERVICE_UUID)
90        assert nrf_uart_service
91        rx_char = nrf_uart_service.get_characteristic(UART_RX_CHAR_UUID)
92        assert rx_char
93        self.rx_char = rx_char
94
95        await self.bleak_client.start_notify(UART_TX_CHAR_UUID, self._handle_tx)
async def disconnect(self):
97    async def disconnect(self):
98        await self.bleak_client.disconnect()
async def send_packet(self, packet: bytearray) -> None:
123    async def send_packet(self, packet: bytearray) -> None:
124        logger.debug(f"Sending packet: {packet}")
125        await self.bleak_client.write_gatt_char(self.rx_char, packet, response=False)
async def get_battery(self) -> colmi_r02_client.battery.BatteryInfo:
127    async def get_battery(self) -> battery.BatteryInfo:
128        await self.send_packet(battery.BATTERY_PACKET)
129        result = await self.queues[battery.CMD_BATTERY].get()
130        assert isinstance(result, battery.BatteryInfo)
131        return result
async def get_realtime_heart_rate(self) -> list[int] | None:
133    async def get_realtime_heart_rate(self) -> list[int] | None:
134        return await self._poll_real_time_reading(real_time_hr.START_HEART_RATE_PACKET)
async def get_realtime_spo2(self) -> list[int] | None:
164    async def get_realtime_spo2(self) -> list[int] | None:
165        return await self._poll_real_time_reading(real_time_hr.START_SPO2_PACKET)
async def set_time(self, ts: datetime.datetime) -> None:
167    async def set_time(self, ts: datetime) -> None:
168        await self.send_packet(set_time.set_time_packet(ts))
async def get_device_info(self) -> dict[str, str]:
173    async def get_device_info(self) -> dict[str, str]:
174        client = self.bleak_client
175        data = {}
176        device_info_service = client.services.get_service(DEVICE_INFO_UUID)
177        assert device_info_service
178
179        hw_info_char = device_info_service.get_characteristic(DEVICE_HW_UUID)
180        assert hw_info_char
181        hw_version = await client.read_gatt_char(hw_info_char)
182        data["hw_version"] = hw_version.decode("utf-8")
183
184        fw_info_char = device_info_service.get_characteristic(DEVICE_FW_UUID)
185        assert fw_info_char
186        fw_version = await client.read_gatt_char(fw_info_char)
187        data["fw_version"] = fw_version.decode("utf-8")
188
189        return data
async def get_heart_rate_log( self, target: datetime.datetime | None = None) -> colmi_r02_client.hr.HeartRateLog | colmi_r02_client.hr.NoData:
191    async def get_heart_rate_log(self, target: datetime | None = None) -> hr.HeartRateLog | hr.NoData:
192        if target is None:
193            target = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0)
194        await self.send_packet(hr.read_heart_rate_packet(target))
195        return await asyncio.wait_for(
196            self.queues[hr.CMD_READ_HEART_RATE].get(),
197            timeout=2,
198        )
async def get_heart_rate_log_settings(self) -> colmi_r02_client.hr_settings.HeartRateLogSettings:
200    async def get_heart_rate_log_settings(self) -> hr_settings.HeartRateLogSettings:
201        await self.send_packet(hr_settings.READ_HEART_RATE_LOG_SETTINGS_PACKET)
202        return await asyncio.wait_for(
203            self.queues[hr_settings.CMD_HEART_RATE_LOG_SETTINGS].get(),
204            timeout=2,
205        )
async def set_heart_rate_log_settings(self, enabled: bool, interval: int) -> None:
207    async def set_heart_rate_log_settings(self, enabled: bool, interval: int) -> None:
208        await self.send_packet(hr_settings.hr_log_settings_packet(hr_settings.HeartRateLogSettings(enabled, interval)))
209
210        # clear response from queue as it's unused and wrong
211        await asyncio.wait_for(
212            self.queues[hr_settings.CMD_HEART_RATE_LOG_SETTINGS].get(),
213            timeout=2,
214        )
async def get_steps( self, target: datetime.datetime, today: datetime.datetime | None = None) -> list[colmi_r02_client.steps.SportDetail] | colmi_r02_client.steps.NoData:
216    async def get_steps(self, target: datetime, today: datetime | None = None) -> list[steps.SportDetail] | steps.NoData:
217        if today is None:
218            today = datetime.now(timezone.utc)
219
220        if target.tzinfo != timezone.utc:
221            logger.info("Converting target time to utc")
222            target = target.astimezone(tz=timezone.utc)
223
224        days = (today.date() - target.date()).days
225        logger.debug(f"Looking back {days} days")
226
227        await self.send_packet(steps.read_steps_packet(days))
228        return await asyncio.wait_for(
229            self.queues[steps.CMD_GET_STEP_SOMEDAY].get(),
230            timeout=2,
231        )
async def reboot(self) -> None:
233    async def reboot(self) -> None:
234        await self.send_packet(reboot.REBOOT_PACKET)
async def raw( self, command: int, subdata: bytearray, replies: int = 0) -> list[bytearray]:
236    async def raw(self, command: int, subdata: bytearray, replies: int = 0) -> list[bytearray]:
237        p = packet.make_packet(command, subdata)
238        await self.send_packet(p)
239
240        results = []
241        while replies > 0:
242            data: bytearray = await asyncio.wait_for(
243                self.queues[command].get(),
244                timeout=2,
245            )
246            results.append(data)
247            replies -= 1
248
249        return results