colmi_r02_client.cli

A python client for connecting to the Colmi R02 Smart ring

  1"""
  2A python client for connecting to the Colmi R02 Smart ring
  3"""
  4
  5import csv
  6import dataclasses
  7from datetime import datetime, timezone, timedelta
  8from io import StringIO
  9from pathlib import Path
 10import logging
 11import time
 12
 13import asyncclick as click
 14from bleak import BleakScanner
 15
 16from colmi_r02_client.client import Client
 17from colmi_r02_client import steps, pretty_print, db, date_utils, hr, real_time
 18
 19logging.basicConfig(level=logging.WARNING, format="%(name)s: %(message)s")
 20
 21logger = logging.getLogger(__name__)
 22
 23
 24@click.group()
 25@click.option("--debug/--no-debug", default=False)
 26@click.option(
 27    "--record/--no-record",
 28    default=False,
 29    help="Write all received packets to a file",
 30)
 31@click.option("--address", required=False, help="Bluetooth address")
 32@click.option("--name", required=False, help="Bluetooth name of the device, slower but will work on macOS")
 33@click.pass_context
 34async def cli_client(context: click.Context, debug: bool, record: bool, address: str | None, name: str | None) -> None:
 35    if (address is None and name is None) or (address is not None and name is not None):
 36        context.fail("You must pass either the address option(preferred) or the name option, but not both")
 37
 38    if debug:
 39        logging.getLogger().setLevel(logging.DEBUG)
 40        logging.getLogger("bleak").setLevel(logging.INFO)
 41
 42    record_to = None
 43    if record:
 44        now = int(time.time())
 45        captures = Path("captures")
 46        captures.mkdir(exist_ok=True)
 47        record_to = captures / Path(f"colmi_response_capture_{now}.bin")
 48        logger.info(f"Recording responses to {record_to}")
 49
 50    if name is not None:
 51        devices = await BleakScanner.discover()
 52        found = next((x for x in devices if x.name == name), None)
 53        if found is None:
 54            context.fail("No device found with given name")
 55        address = found.address
 56
 57    assert address
 58
 59    client = Client(address, record_to=record_to)
 60
 61    context.obj = client
 62
 63
 64@cli_client.command()
 65@click.pass_obj
 66async def info(client: Client) -> None:
 67    """Get device info and battery level"""
 68
 69    async with client:
 70        print("device info", await client.get_device_info())
 71        print("battery:", await client.get_battery())
 72
 73
 74@cli_client.command()
 75@click.option(
 76    "--target",
 77    type=click.DateTime(),
 78    required=True,
 79    help="The date you want logs for",
 80)
 81@click.pass_obj
 82async def get_heart_rate_log(client: Client, target: datetime) -> None:
 83    """Get heart rate for given date"""
 84
 85    async with client:
 86        log = await client.get_heart_rate_log(target)
 87        print("Data:", log)
 88        if isinstance(log, hr.HeartRateLog):
 89            for reading, ts in log.heart_rates_with_times():
 90                if reading != 0:
 91                    print(f"{ts.strftime('%H:%M')}, {reading}")
 92
 93
 94@cli_client.command()
 95@click.option(
 96    "--when",
 97    type=click.DateTime(),
 98    required=False,
 99    help="The date and time you want to set the ring to",
100)
101@click.pass_obj
102async def set_time(client: Client, when: datetime | None) -> None:
103    """
104    Set the time on the ring, required if you want to be able to interpret any of the logged data
105    """
106
107    if when is None:
108        when = datetime.now(tz=timezone.utc)
109    async with client:
110        await client.set_time(when)
111    click.echo("Time set successfully")
112    click.echo("Please ignore the unexpected packet. It's expectedly unexpected")
113
114
115@cli_client.command()
116@click.pass_obj
117async def get_heart_rate_log_settings(client: Client) -> None:
118    """Get heart rate log settings"""
119
120    async with client:
121        click.echo("heart rate log settings:")
122        click.echo(await client.get_heart_rate_log_settings())
123
124
125@cli_client.command()
126@click.option("--enable/--disable", default=True, show_default=True, help="Logging status")
127@click.option(
128    "--interval",
129    type=click.IntRange(0, 255),
130    help="Interval in minutes to measure heart rate",
131    default=60,
132    show_default=True,
133)
134@click.pass_obj
135async def set_heart_rate_log_settings(client: Client, enable: bool, interval: int) -> None:
136    """Get heart rate log settings"""
137
138    async with client:
139        click.echo("Changing heart rate log settings")
140        await client.set_heart_rate_log_settings(enable, interval)
141        click.echo(await client.get_heart_rate_log_settings())
142        click.echo("Done")
143
144
145@cli_client.command()
146@click.pass_obj
147@click.argument("reading", nargs=1, type=click.Choice(list(real_time.REAL_TIME_MAPPING.keys())))
148async def get_real_time(client: Client, reading: str) -> None:
149    """Get any real time measurement (like heart rate or SPO2)"""
150    async with client:
151        click.echo("Starting reading, please wait.")
152        reading_type = real_time.REAL_TIME_MAPPING[reading]
153        result = await client.get_realtime_reading(reading_type)
154        if result:
155            click.echo(result)
156        else:
157            click.echo(f"Error, no {reading.replace('-', ' ')} detected. Is the ring being worn?")
158
159
160@cli_client.command()
161@click.pass_obj
162@click.option(
163    "--when",
164    type=click.DateTime(),
165    required=False,
166    help="The date you want steps for",
167)
168@click.option("--as-csv", is_flag=True, help="Print as CSV", default=False)
169async def get_steps(client: Client, when: datetime | None = None, as_csv: bool = False) -> None:
170    """Get step data"""
171
172    if when is None:
173        when = datetime.now(tz=timezone.utc)
174    async with client:
175        result = await client.get_steps(when)
176        if isinstance(result, steps.NoData):
177            click.echo("No results for day")
178            return
179
180        if not as_csv:
181            click.echo(pretty_print.print_dataclasses(result))
182        else:
183            out = StringIO()
184            writer = csv.DictWriter(out, fieldnames=[f.name for f in dataclasses.fields(steps.SportDetail)])
185            writer.writeheader()
186            for r in result:
187                writer.writerow(dataclasses.asdict(r))
188            click.echo(out.getvalue())
189
190
191@cli_client.command()
192@click.pass_obj
193async def reboot(client: Client) -> None:
194    """Reboot the ring"""
195
196    async with client:
197        await client.reboot()
198        click.echo("Ring rebooted")
199
200
201@cli_client.command()
202@click.pass_obj
203@click.option(
204    "--command",
205    type=click.IntRange(min=0, max=255),
206    help="Raw command",
207)
208@click.option(
209    "--subdata",
210    type=str,
211    help="Hex encoded subdata array, will be parsed into a bytearray",
212)
213@click.option("--replies", type=click.IntRange(min=0), default=0, help="How many reply packets to wait for")
214async def raw(client: Client, command: int, subdata: str | None, replies: int) -> None:
215    """Send the ring a raw command"""
216
217    p_subdata = bytearray.fromhex(subdata) if subdata is not None else bytearray()
218
219    async with client:
220        results = await client.raw(command, p_subdata, replies)
221        click.echo(results)
222
223
224@cli_client.command()
225@click.pass_obj
226@click.option(
227    "--db",
228    "db_path",
229    type=click.Path(writable=True, path_type=Path),
230    help="Path to a directory or file to use as the database. If dir, then filename will be ring_data.sqlite",
231)
232@click.option(
233    "--start",
234    type=click.DateTime(),
235    required=False,
236    help="The date you want to start grabbing data from",
237)
238@click.option(
239    "--end",
240    type=click.DateTime(),
241    required=False,
242    help="The date you want to start grabbing data to",
243)
244async def sync(client: Client, db_path: Path | None, start: datetime | None, end: datetime | None) -> None:
245    """
246    Sync all data from the ring to a sqlite database
247
248    Currently grabs:
249        - heart rates
250    """
251
252    if db_path is None:
253        db_path = Path.cwd()
254    if db_path.is_dir():
255        db_path /= Path("ring_data.sqlite")
256
257    click.echo(f"Writing to {db_path}")
258    with db.get_db_session(db_path) as session:
259        if start is None:
260            start = db.get_last_sync(session)
261        if start is None:
262            start = date_utils.now() - timedelta(days=7)
263        if end is None:
264            end = date_utils.now()
265
266        click.echo(f"Syncing from {start} to {end}")
267
268        async with client:
269            fd = await client.get_full_data(start, end)
270            db.sync(session, fd)
271            when = datetime.now(tz=timezone.utc)
272            click.echo("Ignore unexpect packet")
273            await client.set_time(when)
274
275    click.echo("Done")
276
277
278DEVICE_NAME_PREFIXES = [
279    "R01",
280    "R02",
281    "R03",
282    "R04",
283    "R05",
284    "R06",
285    "R07",
286    "R10",
287    "COLMI",
288    "VK-5098",
289    "MERLIN",
290    "Hello Ring",
291    "RING1",
292    "boAtring",
293    "TR-R02",
294    "SE",
295    "EVOLVEO",
296    "GL-SR2",
297    "Blaupunkt",
298    "KSIX RING",
299]
300
301
302@click.group()
303async def util():
304    """Generic utilities for the R02 that don't need an address."""
305
306
307@util.command()
308@click.option("--all", is_flag=True, help="Print all devices, no name filtering", default=False)
309async def scan(all: bool) -> None:
310    """Scan for possible devices based on known prefixes and print the bluetooth address."""
311
312    # TODO maybe bluetooth specific stuff like this should be in another package?
313    devices = await BleakScanner.discover()
314
315    if len(devices) > 0:
316        click.echo("Found device(s)")
317        click.echo(f"{'Name':>20}  | Address")
318        click.echo("-" * 44)
319        for d in devices:
320            name = d.name
321            if name and (all or any(name for p in DEVICE_NAME_PREFIXES if name.startswith(p))):
322                click.echo(f"{name:>20}  |  {d.address}")
323    else:
324        click.echo("No devices found. Try moving the ring closer to computer")
logger = <Logger colmi_r02_client.cli (WARNING)>
DEVICE_NAME_PREFIXES = ['R01', 'R02', 'R03', 'R04', 'R05', 'R06', 'R07', 'R10', 'COLMI', 'VK-5098', 'MERLIN', 'Hello Ring', 'RING1', 'boAtring', 'TR-R02', 'SE', 'EVOLVEO', 'GL-SR2', 'Blaupunkt', 'KSIX RING']