colmi_r02_client.real_time

Stream real time data from the ring.

Currently heart rate and SPO2 seem reasonable.

HRV, ECG, blood pressure and blood sugar seem unlikely to be something you can correct

 1"""
 2Stream real time data from the ring.
 3
 4Currently heart rate and SPO2 seem reasonable.
 5
 6HRV, ECG, blood pressure and blood sugar seem unlikely to be something you
 7can correct
 8"""
 9
10from dataclasses import dataclass
11from enum import IntEnum
12
13from colmi_r02_client.packet import make_packet
14
15
16class Action(IntEnum):
17    START = 1
18    PAUSE = 2
19    CONTINUE = 3
20    STOP = 4
21
22
23class RealTimeReading(IntEnum):
24    """
25    Taken from https://colmi.puxtril.com/commands/#data-request
26    """
27
28    HEART_RATE = 1
29    BLOOD_PRESSURE = 2
30    SPO2 = 3
31    FATIGUE = 4
32    HEALTH_CHECK = 5
33    # leaving this out as it's redundant
34    # REAL_TIME_HEART_RATE = 6
35    ECG = 7
36    PRESSURE = 8
37    BLOOD_SUGAR = 9
38    HRV = 10
39
40
41REAL_TIME_MAPPING: dict[str, RealTimeReading] = {
42    "heart-rate": RealTimeReading.HEART_RATE,
43    "blood-pressure": RealTimeReading.BLOOD_PRESSURE,
44    "spo2": RealTimeReading.SPO2,
45    "fatigue": RealTimeReading.FATIGUE,
46    "health-check": RealTimeReading.HEALTH_CHECK,
47    "ecg": RealTimeReading.ECG,
48    "pressure": RealTimeReading.PRESSURE,
49    "blood-sugar": RealTimeReading.BLOOD_SUGAR,
50    "hrv": RealTimeReading.HRV,
51}
52
53CMD_START_REAL_TIME = 105
54CMD_STOP_REAL_TIME = 106
55
56CMD_REAL_TIME_HEART_RATE = 30
57CONTINUE_HEART_RATE_PACKET = make_packet(CMD_REAL_TIME_HEART_RATE, bytearray(b"3"))
58
59
60@dataclass
61class Reading:
62    kind: RealTimeReading
63    value: int
64
65
66@dataclass
67class ReadingError:
68    kind: RealTimeReading
69    code: int
70
71
72def get_start_packet(reading_type: RealTimeReading) -> bytearray:
73    return make_packet(CMD_START_REAL_TIME, bytearray([reading_type, Action.START]))
74
75
76def get_continue_packet(reading_type: RealTimeReading) -> bytearray:
77    return make_packet(CMD_START_REAL_TIME, bytearray([reading_type, Action.CONTINUE]))
78
79
80def get_stop_packet(reading_type: RealTimeReading) -> bytearray:
81    return make_packet(CMD_STOP_REAL_TIME, bytearray([reading_type, 0, 0]))
82
83
84def parse_real_time_reading(packet: bytearray) -> Reading | ReadingError:
85    assert packet[0] == CMD_START_REAL_TIME
86
87    kind = RealTimeReading(packet[1])
88    error_code = packet[2]
89    if error_code != 0:
90        return ReadingError(kind=kind, code=error_code)
91
92    return Reading(kind=kind, value=packet[3])
class Action(enum.IntEnum):
17class Action(IntEnum):
18    START = 1
19    PAUSE = 2
20    CONTINUE = 3
21    STOP = 4
START = <Action.START: 1>
PAUSE = <Action.PAUSE: 2>
CONTINUE = <Action.CONTINUE: 3>
STOP = <Action.STOP: 4>
Inherited Members
enum.Enum
name
value
builtins.int
conjugate
bit_length
bit_count
to_bytes
from_bytes
as_integer_ratio
real
imag
numerator
denominator
class RealTimeReading(enum.IntEnum):
24class RealTimeReading(IntEnum):
25    """
26    Taken from https://colmi.puxtril.com/commands/#data-request
27    """
28
29    HEART_RATE = 1
30    BLOOD_PRESSURE = 2
31    SPO2 = 3
32    FATIGUE = 4
33    HEALTH_CHECK = 5
34    # leaving this out as it's redundant
35    # REAL_TIME_HEART_RATE = 6
36    ECG = 7
37    PRESSURE = 8
38    BLOOD_SUGAR = 9
39    HRV = 10
HEART_RATE = <RealTimeReading.HEART_RATE: 1>
BLOOD_PRESSURE = <RealTimeReading.BLOOD_PRESSURE: 2>
SPO2 = <RealTimeReading.SPO2: 3>
FATIGUE = <RealTimeReading.FATIGUE: 4>
HEALTH_CHECK = <RealTimeReading.HEALTH_CHECK: 5>
ECG = <RealTimeReading.ECG: 7>
PRESSURE = <RealTimeReading.PRESSURE: 8>
BLOOD_SUGAR = <RealTimeReading.BLOOD_SUGAR: 9>
HRV = <RealTimeReading.HRV: 10>
Inherited Members
enum.Enum
name
value
builtins.int
conjugate
bit_length
bit_count
to_bytes
from_bytes
as_integer_ratio
real
imag
numerator
denominator
REAL_TIME_MAPPING: dict[str, RealTimeReading] = {'heart-rate': <RealTimeReading.HEART_RATE: 1>, 'blood-pressure': <RealTimeReading.BLOOD_PRESSURE: 2>, 'spo2': <RealTimeReading.SPO2: 3>, 'fatigue': <RealTimeReading.FATIGUE: 4>, 'health-check': <RealTimeReading.HEALTH_CHECK: 5>, 'ecg': <RealTimeReading.ECG: 7>, 'pressure': <RealTimeReading.PRESSURE: 8>, 'blood-sugar': <RealTimeReading.BLOOD_SUGAR: 9>, 'hrv': <RealTimeReading.HRV: 10>}
CMD_START_REAL_TIME = 105
CMD_STOP_REAL_TIME = 106
CMD_REAL_TIME_HEART_RATE = 30
CONTINUE_HEART_RATE_PACKET = bytearray(b'\x1e3\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00Q')
@dataclass
class Reading:
61@dataclass
62class Reading:
63    kind: RealTimeReading
64    value: int
Reading(kind: RealTimeReading, value: int)
value: int
@dataclass
class ReadingError:
67@dataclass
68class ReadingError:
69    kind: RealTimeReading
70    code: int
ReadingError(kind: RealTimeReading, code: int)
code: int
def get_start_packet(reading_type: RealTimeReading) -> bytearray:
73def get_start_packet(reading_type: RealTimeReading) -> bytearray:
74    return make_packet(CMD_START_REAL_TIME, bytearray([reading_type, Action.START]))
def get_continue_packet(reading_type: RealTimeReading) -> bytearray:
77def get_continue_packet(reading_type: RealTimeReading) -> bytearray:
78    return make_packet(CMD_START_REAL_TIME, bytearray([reading_type, Action.CONTINUE]))
def get_stop_packet(reading_type: RealTimeReading) -> bytearray:
81def get_stop_packet(reading_type: RealTimeReading) -> bytearray:
82    return make_packet(CMD_STOP_REAL_TIME, bytearray([reading_type, 0, 0]))
def parse_real_time_reading( packet: bytearray) -> Reading | ReadingError:
85def parse_real_time_reading(packet: bytearray) -> Reading | ReadingError:
86    assert packet[0] == CMD_START_REAL_TIME
87
88    kind = RealTimeReading(packet[1])
89    error_code = packet[2]
90    if error_code != 0:
91        return ReadingError(kind=kind, code=error_code)
92
93    return Reading(kind=kind, value=packet[3])