colmi_r02_client.real_time
Stream real time data from the ring.
Currently heart rate and SPO2 seem reasonable.
HRV, ECG, blood pressure and blood sugar seem unlikely to be something you can correct
1""" 2Stream real time data from the ring. 3 4Currently heart rate and SPO2 seem reasonable. 5 6HRV, ECG, blood pressure and blood sugar seem unlikely to be something you 7can correct 8""" 9 10from dataclasses import dataclass 11from enum import IntEnum 12 13from colmi_r02_client.packet import make_packet 14 15 16class Action(IntEnum): 17 START = 1 18 PAUSE = 2 19 CONTINUE = 3 20 STOP = 4 21 22 23class RealTimeReading(IntEnum): 24 """ 25 Taken from https://colmi.puxtril.com/commands/#data-request 26 """ 27 28 HEART_RATE = 1 29 BLOOD_PRESSURE = 2 30 SPO2 = 3 31 FATIGUE = 4 32 HEALTH_CHECK = 5 33 # leaving this out as it's redundant 34 # REAL_TIME_HEART_RATE = 6 35 ECG = 7 36 PRESSURE = 8 37 BLOOD_SUGAR = 9 38 HRV = 10 39 40 41REAL_TIME_MAPPING: dict[str, RealTimeReading] = { 42 "heart-rate": RealTimeReading.HEART_RATE, 43 "blood-pressure": RealTimeReading.BLOOD_PRESSURE, 44 "spo2": RealTimeReading.SPO2, 45 "fatigue": RealTimeReading.FATIGUE, 46 "health-check": RealTimeReading.HEALTH_CHECK, 47 "ecg": RealTimeReading.ECG, 48 "pressure": RealTimeReading.PRESSURE, 49 "blood-sugar": RealTimeReading.BLOOD_SUGAR, 50 "hrv": RealTimeReading.HRV, 51} 52 53CMD_START_REAL_TIME = 105 54CMD_STOP_REAL_TIME = 106 55 56CMD_REAL_TIME_HEART_RATE = 30 57CONTINUE_HEART_RATE_PACKET = make_packet(CMD_REAL_TIME_HEART_RATE, bytearray(b"3")) 58 59 60@dataclass 61class Reading: 62 kind: RealTimeReading 63 value: int 64 65 66@dataclass 67class ReadingError: 68 kind: RealTimeReading 69 code: int 70 71 72def get_start_packet(reading_type: RealTimeReading) -> bytearray: 73 return make_packet(CMD_START_REAL_TIME, bytearray([reading_type, Action.START])) 74 75 76def get_continue_packet(reading_type: RealTimeReading) -> bytearray: 77 return make_packet(CMD_START_REAL_TIME, bytearray([reading_type, Action.CONTINUE])) 78 79 80def get_stop_packet(reading_type: RealTimeReading) -> bytearray: 81 return make_packet(CMD_STOP_REAL_TIME, bytearray([reading_type, 0, 0])) 82 83 84def parse_real_time_reading(packet: bytearray) -> Reading | ReadingError: 85 assert packet[0] == CMD_START_REAL_TIME 86 87 kind = RealTimeReading(packet[1]) 88 error_code = packet[2] 89 if error_code != 0: 90 return ReadingError(kind=kind, code=error_code) 91 92 return Reading(kind=kind, value=packet[3])
class
Action(enum.IntEnum):
START =
<Action.START: 1>
PAUSE =
<Action.PAUSE: 2>
CONTINUE =
<Action.CONTINUE: 3>
STOP =
<Action.STOP: 4>
Inherited Members
- enum.Enum
- name
- value
- builtins.int
- conjugate
- bit_length
- bit_count
- to_bytes
- from_bytes
- as_integer_ratio
- real
- imag
- numerator
- denominator
class
RealTimeReading(enum.IntEnum):
24class RealTimeReading(IntEnum): 25 """ 26 Taken from https://colmi.puxtril.com/commands/#data-request 27 """ 28 29 HEART_RATE = 1 30 BLOOD_PRESSURE = 2 31 SPO2 = 3 32 FATIGUE = 4 33 HEALTH_CHECK = 5 34 # leaving this out as it's redundant 35 # REAL_TIME_HEART_RATE = 6 36 ECG = 7 37 PRESSURE = 8 38 BLOOD_SUGAR = 9 39 HRV = 10
HEART_RATE =
<RealTimeReading.HEART_RATE: 1>
BLOOD_PRESSURE =
<RealTimeReading.BLOOD_PRESSURE: 2>
SPO2 =
<RealTimeReading.SPO2: 3>
FATIGUE =
<RealTimeReading.FATIGUE: 4>
HEALTH_CHECK =
<RealTimeReading.HEALTH_CHECK: 5>
ECG =
<RealTimeReading.ECG: 7>
PRESSURE =
<RealTimeReading.PRESSURE: 8>
BLOOD_SUGAR =
<RealTimeReading.BLOOD_SUGAR: 9>
HRV =
<RealTimeReading.HRV: 10>
Inherited Members
- enum.Enum
- name
- value
- builtins.int
- conjugate
- bit_length
- bit_count
- to_bytes
- from_bytes
- as_integer_ratio
- real
- imag
- numerator
- denominator
REAL_TIME_MAPPING: dict[str, RealTimeReading] =
{'heart-rate': <RealTimeReading.HEART_RATE: 1>, 'blood-pressure': <RealTimeReading.BLOOD_PRESSURE: 2>, 'spo2': <RealTimeReading.SPO2: 3>, 'fatigue': <RealTimeReading.FATIGUE: 4>, 'health-check': <RealTimeReading.HEALTH_CHECK: 5>, 'ecg': <RealTimeReading.ECG: 7>, 'pressure': <RealTimeReading.PRESSURE: 8>, 'blood-sugar': <RealTimeReading.BLOOD_SUGAR: 9>, 'hrv': <RealTimeReading.HRV: 10>}
CMD_START_REAL_TIME =
105
CMD_STOP_REAL_TIME =
106
CMD_REAL_TIME_HEART_RATE =
30
CONTINUE_HEART_RATE_PACKET =
bytearray(b'\x1e3\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00Q')
@dataclass
class
Reading:
Reading(kind: RealTimeReading, value: int)
kind: RealTimeReading
@dataclass
class
ReadingError:
ReadingError(kind: RealTimeReading, code: int)
kind: RealTimeReading
85def parse_real_time_reading(packet: bytearray) -> Reading | ReadingError: 86 assert packet[0] == CMD_START_REAL_TIME 87 88 kind = RealTimeReading(packet[1]) 89 error_code = packet[2] 90 if error_code != 0: 91 return ReadingError(kind=kind, code=error_code) 92 93 return Reading(kind=kind, value=packet[3])