colmi_r02_client.hr
This is called the DailyHeartRate in Java.
1"""This is called the DailyHeartRate in Java.""" 2 3from datetime import datetime, timezone, timedelta 4from dataclasses import dataclass 5import logging 6import struct 7 8from colmi_r02_client.packet import make_packet 9from colmi_r02_client import date_utils 10 11CMD_READ_HEART_RATE = 21 # 0x15 12 13logger = logging.getLogger(__name__) 14 15 16def read_heart_rate_packet(target: datetime) -> bytearray: 17 """target datetime should be at midnight for the day of interest""" 18 data = bytearray(struct.pack("<L", int(target.timestamp()))) 19 20 return make_packet(CMD_READ_HEART_RATE, data) 21 22 23def _add_times(heart_rates: list[int], ts: datetime) -> list[tuple[int, datetime]]: 24 assert len(heart_rates) == 288, "Need exactly 288 points at 5 minute intervals" 25 result = [] 26 m = datetime(ts.year, ts.month, ts.day, tzinfo=ts.tzinfo) 27 five_min = timedelta(minutes=5) 28 for hr in heart_rates: 29 result.append((hr, m)) 30 m += five_min 31 32 return result 33 34 35@dataclass 36class HeartRateLog: 37 heart_rates: list[int] 38 timestamp: datetime 39 size: int 40 index: int 41 range: int 42 43 def heart_rates_with_times(self): 44 return _add_times(self.heart_rates, self.timestamp) 45 46 47class NoData: 48 """Returned when there's no heart rate data""" 49 50 51class HeartRateLogParser: 52 def __init__(self): 53 self.reset() 54 55 def reset(self) -> None: 56 self._raw_heart_rates: list[int] = [] 57 self.timestamp: datetime | None = None 58 self.size = 0 59 self.index = 0 60 self.end = False 61 self.range = 5 62 63 def is_today(self) -> bool: 64 d = self.timestamp 65 if d is None: 66 return False 67 return date_utils.is_today(d) 68 69 def parse(self, packet: bytearray) -> HeartRateLog | NoData | None: 70 r""" 71 first byte of packet should always be CMD_READ_HEART_RATE (21) 72 second byte is the sub_type 73 74 sub_type 0 contains the lengths of things 75 byte 2 is the number of expected packets after this. 76 77 example: bytearray(b'\x15\x00\x18\x05\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x002') 78 """ 79 80 sub_type = packet[1] 81 if sub_type == 255: 82 logger.info("error response from heart rate log request") 83 self.reset() 84 return NoData() 85 if self.is_today() and sub_type == 23: 86 assert self.timestamp 87 result = HeartRateLog( 88 heart_rates=self.heart_rates, 89 timestamp=self.timestamp, 90 size=self.size, 91 range=self.range, 92 index=self.index, 93 ) 94 self.reset() 95 return result 96 if sub_type == 0: 97 self.end = False 98 self.size = packet[2] # number of expected packets 99 self.range = packet[3] 100 self._raw_heart_rates = [-1] * (self.size * 13) 101 return None 102 elif sub_type == 1: 103 # next 4 bytes are a timestamp 104 ts = struct.unpack_from("<l", packet, offset=2)[0] 105 self.timestamp = datetime.fromtimestamp(ts, timezone.utc) 106 # TODO timezone? 107 108 # remaining 16 - type - subtype - 4 - crc = 9 109 self._raw_heart_rates[0:9] = list(packet[6:-1]) 110 self.index += 9 111 return None 112 else: 113 self._raw_heart_rates[self.index : self.index + 13] = list(packet[2:15]) 114 self.index += 13 115 if sub_type == self.size - 1: 116 assert self.timestamp 117 result = HeartRateLog( 118 heart_rates=self.heart_rates, 119 timestamp=self.timestamp, 120 size=self.size, 121 range=self.range, 122 index=self.index, 123 ) 124 self.reset() 125 return result 126 else: 127 return None 128 129 @property 130 def heart_rates(self) -> list[int]: 131 """ 132 Normalize and clean heart rate logs 133 134 I don't really understand why it's implemented this way. 135 I think to handle cases where there's a bit more or less data than expected 136 and if there's bad values in time slots that shouldn't exist yet because those 137 slots are in the future. 138 """ 139 140 hr = self._raw_heart_rates.copy() 141 142 if len(self._raw_heart_rates) > 288: 143 hr = hr[0:288] 144 elif len(self._raw_heart_rates) < 288: 145 hr.extend([0] * (288 - len(hr))) 146 147 # TODO see if we can remove this 148 # need a good reason why parsing should depend on the day 149 # index might be good enough to indicate how much "valid" data we've gotten 150 if self.is_today(): 151 m = date_utils.minutes_so_far(datetime.now(tz=timezone.utc)) // 5 152 hr[m:] = [0] * len(hr[m:]) 153 154 return hr
CMD_READ_HEART_RATE =
21
logger =
<Logger colmi_r02_client.hr (WARNING)>
def
read_heart_rate_packet(target: datetime.datetime) -> bytearray:
17def read_heart_rate_packet(target: datetime) -> bytearray: 18 """target datetime should be at midnight for the day of interest""" 19 data = bytearray(struct.pack("<L", int(target.timestamp()))) 20 21 return make_packet(CMD_READ_HEART_RATE, data)
target datetime should be at midnight for the day of interest
@dataclass
class
HeartRateLog:
36@dataclass 37class HeartRateLog: 38 heart_rates: list[int] 39 timestamp: datetime 40 size: int 41 index: int 42 range: int 43 44 def heart_rates_with_times(self): 45 return _add_times(self.heart_rates, self.timestamp)
class
NoData:
Returned when there's no heart rate data
class
HeartRateLogParser:
52class HeartRateLogParser: 53 def __init__(self): 54 self.reset() 55 56 def reset(self) -> None: 57 self._raw_heart_rates: list[int] = [] 58 self.timestamp: datetime | None = None 59 self.size = 0 60 self.index = 0 61 self.end = False 62 self.range = 5 63 64 def is_today(self) -> bool: 65 d = self.timestamp 66 if d is None: 67 return False 68 return date_utils.is_today(d) 69 70 def parse(self, packet: bytearray) -> HeartRateLog | NoData | None: 71 r""" 72 first byte of packet should always be CMD_READ_HEART_RATE (21) 73 second byte is the sub_type 74 75 sub_type 0 contains the lengths of things 76 byte 2 is the number of expected packets after this. 77 78 example: bytearray(b'\x15\x00\x18\x05\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x002') 79 """ 80 81 sub_type = packet[1] 82 if sub_type == 255: 83 logger.info("error response from heart rate log request") 84 self.reset() 85 return NoData() 86 if self.is_today() and sub_type == 23: 87 assert self.timestamp 88 result = HeartRateLog( 89 heart_rates=self.heart_rates, 90 timestamp=self.timestamp, 91 size=self.size, 92 range=self.range, 93 index=self.index, 94 ) 95 self.reset() 96 return result 97 if sub_type == 0: 98 self.end = False 99 self.size = packet[2] # number of expected packets 100 self.range = packet[3] 101 self._raw_heart_rates = [-1] * (self.size * 13) 102 return None 103 elif sub_type == 1: 104 # next 4 bytes are a timestamp 105 ts = struct.unpack_from("<l", packet, offset=2)[0] 106 self.timestamp = datetime.fromtimestamp(ts, timezone.utc) 107 # TODO timezone? 108 109 # remaining 16 - type - subtype - 4 - crc = 9 110 self._raw_heart_rates[0:9] = list(packet[6:-1]) 111 self.index += 9 112 return None 113 else: 114 self._raw_heart_rates[self.index : self.index + 13] = list(packet[2:15]) 115 self.index += 13 116 if sub_type == self.size - 1: 117 assert self.timestamp 118 result = HeartRateLog( 119 heart_rates=self.heart_rates, 120 timestamp=self.timestamp, 121 size=self.size, 122 range=self.range, 123 index=self.index, 124 ) 125 self.reset() 126 return result 127 else: 128 return None 129 130 @property 131 def heart_rates(self) -> list[int]: 132 """ 133 Normalize and clean heart rate logs 134 135 I don't really understand why it's implemented this way. 136 I think to handle cases where there's a bit more or less data than expected 137 and if there's bad values in time slots that shouldn't exist yet because those 138 slots are in the future. 139 """ 140 141 hr = self._raw_heart_rates.copy() 142 143 if len(self._raw_heart_rates) > 288: 144 hr = hr[0:288] 145 elif len(self._raw_heart_rates) < 288: 146 hr.extend([0] * (288 - len(hr))) 147 148 # TODO see if we can remove this 149 # need a good reason why parsing should depend on the day 150 # index might be good enough to indicate how much "valid" data we've gotten 151 if self.is_today(): 152 m = date_utils.minutes_so_far(datetime.now(tz=timezone.utc)) // 5 153 hr[m:] = [0] * len(hr[m:]) 154 155 return hr
70 def parse(self, packet: bytearray) -> HeartRateLog | NoData | None: 71 r""" 72 first byte of packet should always be CMD_READ_HEART_RATE (21) 73 second byte is the sub_type 74 75 sub_type 0 contains the lengths of things 76 byte 2 is the number of expected packets after this. 77 78 example: bytearray(b'\x15\x00\x18\x05\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x002') 79 """ 80 81 sub_type = packet[1] 82 if sub_type == 255: 83 logger.info("error response from heart rate log request") 84 self.reset() 85 return NoData() 86 if self.is_today() and sub_type == 23: 87 assert self.timestamp 88 result = HeartRateLog( 89 heart_rates=self.heart_rates, 90 timestamp=self.timestamp, 91 size=self.size, 92 range=self.range, 93 index=self.index, 94 ) 95 self.reset() 96 return result 97 if sub_type == 0: 98 self.end = False 99 self.size = packet[2] # number of expected packets 100 self.range = packet[3] 101 self._raw_heart_rates = [-1] * (self.size * 13) 102 return None 103 elif sub_type == 1: 104 # next 4 bytes are a timestamp 105 ts = struct.unpack_from("<l", packet, offset=2)[0] 106 self.timestamp = datetime.fromtimestamp(ts, timezone.utc) 107 # TODO timezone? 108 109 # remaining 16 - type - subtype - 4 - crc = 9 110 self._raw_heart_rates[0:9] = list(packet[6:-1]) 111 self.index += 9 112 return None 113 else: 114 self._raw_heart_rates[self.index : self.index + 13] = list(packet[2:15]) 115 self.index += 13 116 if sub_type == self.size - 1: 117 assert self.timestamp 118 result = HeartRateLog( 119 heart_rates=self.heart_rates, 120 timestamp=self.timestamp, 121 size=self.size, 122 range=self.range, 123 index=self.index, 124 ) 125 self.reset() 126 return result 127 else: 128 return None
first byte of packet should always be CMD_READ_HEART_RATE (21) second byte is the sub_type
sub_type 0 contains the lengths of things byte 2 is the number of expected packets after this.
example: bytearray(b'\x15\x00\x18\x05\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x002')
heart_rates: list[int]
130 @property 131 def heart_rates(self) -> list[int]: 132 """ 133 Normalize and clean heart rate logs 134 135 I don't really understand why it's implemented this way. 136 I think to handle cases where there's a bit more or less data than expected 137 and if there's bad values in time slots that shouldn't exist yet because those 138 slots are in the future. 139 """ 140 141 hr = self._raw_heart_rates.copy() 142 143 if len(self._raw_heart_rates) > 288: 144 hr = hr[0:288] 145 elif len(self._raw_heart_rates) < 288: 146 hr.extend([0] * (288 - len(hr))) 147 148 # TODO see if we can remove this 149 # need a good reason why parsing should depend on the day 150 # index might be good enough to indicate how much "valid" data we've gotten 151 if self.is_today(): 152 m = date_utils.minutes_so_far(datetime.now(tz=timezone.utc)) // 5 153 hr[m:] = [0] * len(hr[m:]) 154 155 return hr
Normalize and clean heart rate logs
I don't really understand why it's implemented this way. I think to handle cases where there's a bit more or less data than expected and if there's bad values in time slots that shouldn't exist yet because those slots are in the future.