colmi_r02_client.cli
A python client for connecting to the Colmi R02 Smart ring
1""" 2A python client for connecting to the Colmi R02 Smart ring 3""" 4 5import csv 6import dataclasses 7from datetime import datetime, timezone, timedelta 8from io import StringIO 9from pathlib import Path 10import logging 11import time 12 13import asyncclick as click 14from bleak import BleakScanner 15 16from colmi_r02_client.client import Client 17from colmi_r02_client import steps, pretty_print, db, date_utils, hr, real_time 18 19logging.basicConfig(level=logging.WARNING, format="%(name)s: %(message)s") 20 21logger = logging.getLogger(__name__) 22 23 24@click.group() 25@click.option("--debug/--no-debug", default=False) 26@click.option( 27 "--record/--no-record", 28 default=False, 29 help="Write all received packets to a file", 30) 31@click.option("--address", required=False, help="Bluetooth address") 32@click.option("--name", required=False, help="Bluetooth name of the device, slower but will work on macOS") 33@click.pass_context 34async def cli_client(context: click.Context, debug: bool, record: bool, address: str | None, name: str | None) -> None: 35 if (address is None and name is None) or (address is not None and name is not None): 36 context.fail("You must pass either the address option(preferred) or the name option, but not both") 37 38 if debug: 39 logging.getLogger().setLevel(logging.DEBUG) 40 logging.getLogger("bleak").setLevel(logging.INFO) 41 42 record_to = None 43 if record: 44 now = int(time.time()) 45 captures = Path("captures") 46 captures.mkdir(exist_ok=True) 47 record_to = captures / Path(f"colmi_response_capture_{now}.bin") 48 logger.info(f"Recording responses to {record_to}") 49 50 if name is not None: 51 devices = await BleakScanner.discover() 52 found = next((x for x in devices if x.name == name), None) 53 if found is None: 54 context.fail("No device found with given name") 55 address = found.address 56 57 assert address 58 59 client = Client(address, record_to=record_to) 60 61 context.obj = client 62 63 64@cli_client.command() 65@click.pass_obj 66async def info(client: Client) -> None: 67 """Get device info and battery level""" 68 69 async with client: 70 print("device info", await client.get_device_info()) 71 print("battery:", await client.get_battery()) 72 73 74@cli_client.command() 75@click.option( 76 "--target", 77 type=click.DateTime(), 78 required=True, 79 help="The date you want logs for", 80) 81@click.pass_obj 82async def get_heart_rate_log(client: Client, target: datetime) -> None: 83 """Get heart rate for given date""" 84 85 async with client: 86 log = await client.get_heart_rate_log(target) 87 print("Data:", log) 88 if isinstance(log, hr.HeartRateLog): 89 for reading, ts in log.heart_rates_with_times(): 90 if reading != 0: 91 print(f"{ts.strftime('%H:%M')}, {reading}") 92 93 94@cli_client.command() 95@click.option( 96 "--when", 97 type=click.DateTime(), 98 required=False, 99 help="The date and time you want to set the ring to", 100) 101@click.pass_obj 102async def set_time(client: Client, when: datetime | None) -> None: 103 """ 104 Set the time on the ring, required if you want to be able to interpret any of the logged data 105 """ 106 107 if when is None: 108 when = datetime.now(tz=timezone.utc) 109 async with client: 110 await client.set_time(when) 111 click.echo("Time set successfully") 112 click.echo("Please ignore the unexpected packet. It's expectedly unexpected") 113 114 115@cli_client.command() 116@click.pass_obj 117async def get_heart_rate_log_settings(client: Client) -> None: 118 """Get heart rate log settings""" 119 120 async with client: 121 click.echo("heart rate log settings:") 122 click.echo(await client.get_heart_rate_log_settings()) 123 124 125@cli_client.command() 126@click.option("--enable/--disable", default=True, show_default=True, help="Logging status") 127@click.option( 128 "--interval", 129 type=click.IntRange(0, 255), 130 help="Interval in minutes to measure heart rate", 131 default=60, 132 show_default=True, 133) 134@click.pass_obj 135async def set_heart_rate_log_settings(client: Client, enable: bool, interval: int) -> None: 136 """Get heart rate log settings""" 137 138 async with client: 139 click.echo("Changing heart rate log settings") 140 await client.set_heart_rate_log_settings(enable, interval) 141 click.echo(await client.get_heart_rate_log_settings()) 142 click.echo("Done") 143 144 145@cli_client.command() 146@click.pass_obj 147@click.argument("reading", nargs=1, type=click.Choice(list(real_time.REAL_TIME_MAPPING.keys()))) 148async def get_real_time(client: Client, reading: str) -> None: 149 """Get any real time measurement (like heart rate or SPO2)""" 150 async with client: 151 click.echo("Starting reading, please wait.") 152 reading_type = real_time.REAL_TIME_MAPPING[reading] 153 result = await client.get_realtime_reading(reading_type) 154 if result: 155 click.echo(result) 156 else: 157 click.echo(f"Error, no {reading.replace('-', ' ')} detected. Is the ring being worn?") 158 159 160@cli_client.command() 161@click.pass_obj 162@click.option( 163 "--when", 164 type=click.DateTime(), 165 required=False, 166 help="The date you want steps for", 167) 168@click.option("--as-csv", is_flag=True, help="Print as CSV", default=False) 169async def get_steps(client: Client, when: datetime | None = None, as_csv: bool = False) -> None: 170 """Get step data""" 171 172 if when is None: 173 when = datetime.now(tz=timezone.utc) 174 async with client: 175 result = await client.get_steps(when) 176 if isinstance(result, steps.NoData): 177 click.echo("No results for day") 178 return 179 180 if not as_csv: 181 click.echo(pretty_print.print_dataclasses(result)) 182 else: 183 out = StringIO() 184 writer = csv.DictWriter(out, fieldnames=[f.name for f in dataclasses.fields(steps.SportDetail)]) 185 writer.writeheader() 186 for r in result: 187 writer.writerow(dataclasses.asdict(r)) 188 click.echo(out.getvalue()) 189 190 191@cli_client.command() 192@click.pass_obj 193async def reboot(client: Client) -> None: 194 """Reboot the ring""" 195 196 async with client: 197 await client.reboot() 198 click.echo("Ring rebooted") 199 200 201@cli_client.command() 202@click.pass_obj 203@click.option( 204 "--command", 205 type=click.IntRange(min=0, max=255), 206 help="Raw command", 207) 208@click.option( 209 "--subdata", 210 type=str, 211 help="Hex encoded subdata array, will be parsed into a bytearray", 212) 213@click.option("--replies", type=click.IntRange(min=0), default=0, help="How many reply packets to wait for") 214async def raw(client: Client, command: int, subdata: str | None, replies: int) -> None: 215 """Send the ring a raw command""" 216 217 p_subdata = bytearray.fromhex(subdata) if subdata is not None else bytearray() 218 219 async with client: 220 results = await client.raw(command, p_subdata, replies) 221 click.echo(results) 222 223 224@cli_client.command() 225@click.pass_obj 226@click.option( 227 "--db", 228 "db_path", 229 type=click.Path(writable=True, path_type=Path), 230 help="Path to a directory or file to use as the database. If dir, then filename will be ring_data.sqlite", 231) 232@click.option( 233 "--start", 234 type=click.DateTime(), 235 required=False, 236 help="The date you want to start grabbing data from", 237) 238@click.option( 239 "--end", 240 type=click.DateTime(), 241 required=False, 242 help="The date you want to start grabbing data to", 243) 244async def sync(client: Client, db_path: Path | None, start: datetime | None, end: datetime | None) -> None: 245 """ 246 Sync all data from the ring to a sqlite database 247 248 Currently grabs: 249 - heart rates 250 """ 251 252 if db_path is None: 253 db_path = Path.cwd() 254 if db_path.is_dir(): 255 db_path /= Path("ring_data.sqlite") 256 257 click.echo(f"Writing to {db_path}") 258 with db.get_db_session(db_path) as session: 259 if start is None: 260 start = db.get_last_sync(session) 261 if start is None: 262 start = date_utils.now() - timedelta(days=7) 263 if end is None: 264 end = date_utils.now() 265 266 click.echo(f"Syncing from {start} to {end}") 267 268 async with client: 269 fd = await client.get_full_data(start, end) 270 db.sync(session, fd) 271 when = datetime.now(tz=timezone.utc) 272 click.echo("Ignore unexpect packet") 273 await client.set_time(when) 274 275 click.echo("Done") 276 277 278DEVICE_NAME_PREFIXES = [ 279 "R01", 280 "R02", 281 "R03", 282 "R04", 283 "R05", 284 "R06", 285 "R07", 286 "R10", 287 "COLMI", 288 "VK-5098", 289 "MERLIN", 290 "Hello Ring", 291 "RING1", 292 "boAtring", 293 "TR-R02", 294 "SE", 295 "EVOLVEO", 296 "GL-SR2", 297 "Blaupunkt", 298 "KSIX RING", 299] 300 301 302@click.group() 303async def util(): 304 """Generic utilities for the R02 that don't need an address.""" 305 306 307@util.command() 308@click.option("--all", is_flag=True, help="Print all devices, no name filtering", default=False) 309async def scan(all: bool) -> None: 310 """Scan for possible devices based on known prefixes and print the bluetooth address.""" 311 312 # TODO maybe bluetooth specific stuff like this should be in another package? 313 devices = await BleakScanner.discover() 314 315 if len(devices) > 0: 316 click.echo("Found device(s)") 317 click.echo(f"{'Name':>20} | Address") 318 click.echo("-" * 44) 319 for d in devices: 320 name = d.name 321 if name and (all or any(name for p in DEVICE_NAME_PREFIXES if name.startswith(p))): 322 click.echo(f"{name:>20} | {d.address}") 323 else: 324 click.echo("No devices found. Try moving the ring closer to computer")
logger =
<Logger colmi_r02_client.cli (WARNING)>
DEVICE_NAME_PREFIXES =
['R01', 'R02', 'R03', 'R04', 'R05', 'R06', 'R07', 'R10', 'COLMI', 'VK-5098', 'MERLIN', 'Hello Ring', 'RING1', 'boAtring', 'TR-R02', 'SE', 'EVOLVEO', 'GL-SR2', 'Blaupunkt', 'KSIX RING']