colmi_r02_client.hr

This is called the DailyHeartRate in Java.

  1"""This is called the DailyHeartRate in Java."""
  2
  3from datetime import datetime, timezone, timedelta
  4from dataclasses import dataclass
  5import logging
  6import struct
  7
  8from colmi_r02_client.packet import make_packet
  9from colmi_r02_client import date_utils
 10
 11CMD_READ_HEART_RATE = 21  # 0x15
 12
 13logger = logging.getLogger(__name__)
 14
 15
 16def read_heart_rate_packet(target: datetime) -> bytearray:
 17    """target datetime should be at midnight for the day of interest"""
 18    data = bytearray(struct.pack("<L", int(target.timestamp())))
 19
 20    return make_packet(CMD_READ_HEART_RATE, data)
 21
 22
 23def _add_times(heart_rates: list[int], ts: datetime) -> list[tuple[int, datetime]]:
 24    assert len(heart_rates) == 288, "Need exactly 288 points at 5 minute intervals"
 25    result = []
 26    m = datetime(ts.year, ts.month, ts.day, tzinfo=ts.tzinfo)
 27    five_min = timedelta(minutes=5)
 28    for hr in heart_rates:
 29        result.append((hr, m))
 30        m += five_min
 31
 32    return result
 33
 34
 35@dataclass
 36class HeartRateLog:
 37    heart_rates: list[int]
 38    timestamp: datetime
 39    size: int
 40    index: int
 41    range: int
 42
 43    def heart_rates_with_times(self):
 44        return _add_times(self.heart_rates, self.timestamp)
 45
 46
 47class NoData:
 48    """Returned when there's no heart rate data"""
 49
 50
 51class HeartRateLogParser:
 52    def __init__(self):
 53        self.reset()
 54
 55    def reset(self) -> None:
 56        self._raw_heart_rates: list[int] = []
 57        self.timestamp: datetime | None = None
 58        self.size = 0
 59        self.index = 0
 60        self.end = False
 61        self.range = 5
 62
 63    def is_today(self) -> bool:
 64        d = self.timestamp
 65        if d is None:
 66            return False
 67        return date_utils.is_today(d)
 68
 69    def parse(self, packet: bytearray) -> HeartRateLog | NoData | None:
 70        r"""
 71        first byte of packet should always be CMD_READ_HEART_RATE (21)
 72        second byte is the sub_type
 73
 74        sub_type 0 contains the lengths of things
 75        byte 2 is the number of expected packets after this.
 76
 77        example: bytearray(b'\x15\x00\x18\x05\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x002')
 78        """
 79
 80        sub_type = packet[1]
 81        if sub_type == 255:
 82            logger.info("error response from heart rate log request")
 83            self.reset()
 84            return NoData()
 85        if self.is_today() and sub_type == 23:
 86            assert self.timestamp
 87            result = HeartRateLog(
 88                heart_rates=self.heart_rates,
 89                timestamp=self.timestamp,
 90                size=self.size,
 91                range=self.range,
 92                index=self.index,
 93            )
 94            self.reset()
 95            return result
 96        if sub_type == 0:
 97            self.end = False
 98            self.size = packet[2]  # number of expected packets
 99            self.range = packet[3]
100            self._raw_heart_rates = [-1] * (self.size * 13)
101            return None
102        elif sub_type == 1:
103            # next 4 bytes are a timestamp
104            ts = struct.unpack_from("<l", packet, offset=2)[0]
105            self.timestamp = datetime.fromtimestamp(ts, timezone.utc)
106            # TODO timezone?
107
108            # remaining 16 - type - subtype - 4 - crc = 9
109            self._raw_heart_rates[0:9] = list(packet[6:-1])
110            self.index += 9
111            return None
112        else:
113            self._raw_heart_rates[self.index : self.index + 13] = list(packet[2:15])
114            self.index += 13
115            if sub_type == self.size - 1:
116                assert self.timestamp
117                result = HeartRateLog(
118                    heart_rates=self.heart_rates,
119                    timestamp=self.timestamp,
120                    size=self.size,
121                    range=self.range,
122                    index=self.index,
123                )
124                self.reset()
125                return result
126            else:
127                return None
128
129    @property
130    def heart_rates(self) -> list[int]:
131        """
132        Normalize and clean heart rate logs
133
134        I don't really understand why it's implemented this way.
135        I think to handle cases where there's a bit more or less data than expected
136        and if there's bad values in time slots that shouldn't exist yet because those
137        slots are in the future.
138        """
139
140        hr = self._raw_heart_rates.copy()
141
142        if len(self._raw_heart_rates) > 288:
143            hr = hr[0:288]
144        elif len(self._raw_heart_rates) < 288:
145            hr.extend([0] * (288 - len(hr)))
146
147        # TODO see if we can remove this
148        # need a good reason why parsing should depend on the day
149        # index might be good enough to indicate how much "valid" data we've gotten
150        if self.is_today():
151            m = date_utils.minutes_so_far(datetime.now(tz=timezone.utc)) // 5
152            hr[m:] = [0] * len(hr[m:])
153
154        return hr
CMD_READ_HEART_RATE = 21
logger = <Logger colmi_r02_client.hr (WARNING)>
def read_heart_rate_packet(target: datetime.datetime) -> bytearray:
17def read_heart_rate_packet(target: datetime) -> bytearray:
18    """target datetime should be at midnight for the day of interest"""
19    data = bytearray(struct.pack("<L", int(target.timestamp())))
20
21    return make_packet(CMD_READ_HEART_RATE, data)

target datetime should be at midnight for the day of interest

@dataclass
class HeartRateLog:
36@dataclass
37class HeartRateLog:
38    heart_rates: list[int]
39    timestamp: datetime
40    size: int
41    index: int
42    range: int
43
44    def heart_rates_with_times(self):
45        return _add_times(self.heart_rates, self.timestamp)
HeartRateLog( heart_rates: list[int], timestamp: datetime.datetime, size: int, index: int, range: int)
heart_rates: list[int]
timestamp: datetime.datetime
size: int
index: int
range: int
def heart_rates_with_times(self):
44    def heart_rates_with_times(self):
45        return _add_times(self.heart_rates, self.timestamp)
class NoData:
48class NoData:
49    """Returned when there's no heart rate data"""

Returned when there's no heart rate data

class HeartRateLogParser:
 52class HeartRateLogParser:
 53    def __init__(self):
 54        self.reset()
 55
 56    def reset(self) -> None:
 57        self._raw_heart_rates: list[int] = []
 58        self.timestamp: datetime | None = None
 59        self.size = 0
 60        self.index = 0
 61        self.end = False
 62        self.range = 5
 63
 64    def is_today(self) -> bool:
 65        d = self.timestamp
 66        if d is None:
 67            return False
 68        return date_utils.is_today(d)
 69
 70    def parse(self, packet: bytearray) -> HeartRateLog | NoData | None:
 71        r"""
 72        first byte of packet should always be CMD_READ_HEART_RATE (21)
 73        second byte is the sub_type
 74
 75        sub_type 0 contains the lengths of things
 76        byte 2 is the number of expected packets after this.
 77
 78        example: bytearray(b'\x15\x00\x18\x05\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x002')
 79        """
 80
 81        sub_type = packet[1]
 82        if sub_type == 255:
 83            logger.info("error response from heart rate log request")
 84            self.reset()
 85            return NoData()
 86        if self.is_today() and sub_type == 23:
 87            assert self.timestamp
 88            result = HeartRateLog(
 89                heart_rates=self.heart_rates,
 90                timestamp=self.timestamp,
 91                size=self.size,
 92                range=self.range,
 93                index=self.index,
 94            )
 95            self.reset()
 96            return result
 97        if sub_type == 0:
 98            self.end = False
 99            self.size = packet[2]  # number of expected packets
100            self.range = packet[3]
101            self._raw_heart_rates = [-1] * (self.size * 13)
102            return None
103        elif sub_type == 1:
104            # next 4 bytes are a timestamp
105            ts = struct.unpack_from("<l", packet, offset=2)[0]
106            self.timestamp = datetime.fromtimestamp(ts, timezone.utc)
107            # TODO timezone?
108
109            # remaining 16 - type - subtype - 4 - crc = 9
110            self._raw_heart_rates[0:9] = list(packet[6:-1])
111            self.index += 9
112            return None
113        else:
114            self._raw_heart_rates[self.index : self.index + 13] = list(packet[2:15])
115            self.index += 13
116            if sub_type == self.size - 1:
117                assert self.timestamp
118                result = HeartRateLog(
119                    heart_rates=self.heart_rates,
120                    timestamp=self.timestamp,
121                    size=self.size,
122                    range=self.range,
123                    index=self.index,
124                )
125                self.reset()
126                return result
127            else:
128                return None
129
130    @property
131    def heart_rates(self) -> list[int]:
132        """
133        Normalize and clean heart rate logs
134
135        I don't really understand why it's implemented this way.
136        I think to handle cases where there's a bit more or less data than expected
137        and if there's bad values in time slots that shouldn't exist yet because those
138        slots are in the future.
139        """
140
141        hr = self._raw_heart_rates.copy()
142
143        if len(self._raw_heart_rates) > 288:
144            hr = hr[0:288]
145        elif len(self._raw_heart_rates) < 288:
146            hr.extend([0] * (288 - len(hr)))
147
148        # TODO see if we can remove this
149        # need a good reason why parsing should depend on the day
150        # index might be good enough to indicate how much "valid" data we've gotten
151        if self.is_today():
152            m = date_utils.minutes_so_far(datetime.now(tz=timezone.utc)) // 5
153            hr[m:] = [0] * len(hr[m:])
154
155        return hr
def reset(self) -> None:
56    def reset(self) -> None:
57        self._raw_heart_rates: list[int] = []
58        self.timestamp: datetime | None = None
59        self.size = 0
60        self.index = 0
61        self.end = False
62        self.range = 5
def is_today(self) -> bool:
64    def is_today(self) -> bool:
65        d = self.timestamp
66        if d is None:
67            return False
68        return date_utils.is_today(d)
def parse( self, packet: bytearray) -> HeartRateLog | NoData | None:
 70    def parse(self, packet: bytearray) -> HeartRateLog | NoData | None:
 71        r"""
 72        first byte of packet should always be CMD_READ_HEART_RATE (21)
 73        second byte is the sub_type
 74
 75        sub_type 0 contains the lengths of things
 76        byte 2 is the number of expected packets after this.
 77
 78        example: bytearray(b'\x15\x00\x18\x05\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x002')
 79        """
 80
 81        sub_type = packet[1]
 82        if sub_type == 255:
 83            logger.info("error response from heart rate log request")
 84            self.reset()
 85            return NoData()
 86        if self.is_today() and sub_type == 23:
 87            assert self.timestamp
 88            result = HeartRateLog(
 89                heart_rates=self.heart_rates,
 90                timestamp=self.timestamp,
 91                size=self.size,
 92                range=self.range,
 93                index=self.index,
 94            )
 95            self.reset()
 96            return result
 97        if sub_type == 0:
 98            self.end = False
 99            self.size = packet[2]  # number of expected packets
100            self.range = packet[3]
101            self._raw_heart_rates = [-1] * (self.size * 13)
102            return None
103        elif sub_type == 1:
104            # next 4 bytes are a timestamp
105            ts = struct.unpack_from("<l", packet, offset=2)[0]
106            self.timestamp = datetime.fromtimestamp(ts, timezone.utc)
107            # TODO timezone?
108
109            # remaining 16 - type - subtype - 4 - crc = 9
110            self._raw_heart_rates[0:9] = list(packet[6:-1])
111            self.index += 9
112            return None
113        else:
114            self._raw_heart_rates[self.index : self.index + 13] = list(packet[2:15])
115            self.index += 13
116            if sub_type == self.size - 1:
117                assert self.timestamp
118                result = HeartRateLog(
119                    heart_rates=self.heart_rates,
120                    timestamp=self.timestamp,
121                    size=self.size,
122                    range=self.range,
123                    index=self.index,
124                )
125                self.reset()
126                return result
127            else:
128                return None

first byte of packet should always be CMD_READ_HEART_RATE (21) second byte is the sub_type

sub_type 0 contains the lengths of things byte 2 is the number of expected packets after this.

example: bytearray(b'\x15\x00\x18\x05\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x002')

heart_rates: list[int]
130    @property
131    def heart_rates(self) -> list[int]:
132        """
133        Normalize and clean heart rate logs
134
135        I don't really understand why it's implemented this way.
136        I think to handle cases where there's a bit more or less data than expected
137        and if there's bad values in time slots that shouldn't exist yet because those
138        slots are in the future.
139        """
140
141        hr = self._raw_heart_rates.copy()
142
143        if len(self._raw_heart_rates) > 288:
144            hr = hr[0:288]
145        elif len(self._raw_heart_rates) < 288:
146            hr.extend([0] * (288 - len(hr)))
147
148        # TODO see if we can remove this
149        # need a good reason why parsing should depend on the day
150        # index might be good enough to indicate how much "valid" data we've gotten
151        if self.is_today():
152            m = date_utils.minutes_so_far(datetime.now(tz=timezone.utc)) // 5
153            hr[m:] = [0] * len(hr[m:])
154
155        return hr

Normalize and clean heart rate logs

I don't really understand why it's implemented this way. I think to handle cases where there's a bit more or less data than expected and if there's bad values in time slots that shouldn't exist yet because those slots are in the future.