diff --git a/mmwave/dataloader/adc.py b/mmwave/dataloader/adc.py index f7d607dd..e7591a6d 100644 --- a/mmwave/dataloader/adc.py +++ b/mmwave/dataloader/adc.py @@ -16,6 +16,7 @@ from enum import Enum import numpy as np +import time class CMD(Enum): @@ -54,11 +55,8 @@ def __str__(self): # DYNAMIC BYTES_IN_FRAME = (ADC_PARAMS['chirps'] * ADC_PARAMS['rx'] * ADC_PARAMS['tx'] * ADC_PARAMS['IQ'] * ADC_PARAMS['samples'] * ADC_PARAMS['bytes']) -BYTES_IN_FRAME_CLIPPED = (BYTES_IN_FRAME // BYTES_IN_PACKET) * BYTES_IN_PACKET -PACKETS_IN_FRAME = BYTES_IN_FRAME / BYTES_IN_PACKET -PACKETS_IN_FRAME_CLIPPED = BYTES_IN_FRAME // BYTES_IN_PACKET -UINT16_IN_PACKET = BYTES_IN_PACKET // 2 UINT16_IN_FRAME = BYTES_IN_FRAME // 2 +DELETE_INCOMPLETE_FRAMES_AFTER_SECONDS = 1.0 class DCA1000: @@ -116,7 +114,7 @@ def __init__(self, static_ip='192.168.33.30', adc_ip='192.168.33.180', self.packet_count = [] self.byte_count = [] - self.frame_buff = [] + self.frame_buff = {} self.curr_buff = None self.last_frame = None @@ -157,7 +155,7 @@ def close(self): self.config_socket.close() def read(self, timeout=1): - """ Read in a single packet via UDP + """ Read in a single frame via UDP Args: timeout (float): Time to wait for packet before moving on @@ -169,34 +167,25 @@ def read(self, timeout=1): # Configure self.data_socket.settimeout(timeout) - # Frame buffer - ret_frame = np.zeros(UINT16_IN_FRAME, dtype=np.uint16) - - # Wait for start of next frame - while True: - packet_num, byte_count, packet_data = self._read_data_packet() - if byte_count % BYTES_IN_FRAME_CLIPPED == 0: - packets_read = 1 - ret_frame[0:UINT16_IN_PACKET] = packet_data - break - - # Read in the rest of the frame + # Read packets until a full frame is read while True: + # Read UDP packet packet_num, byte_count, packet_data = self._read_data_packet() - packets_read += 1 - if byte_count % BYTES_IN_FRAME_CLIPPED == 0: - self.lost_packets = PACKETS_IN_FRAME_CLIPPED - packets_read - return ret_frame - - curr_idx = ((packet_num - 1) % PACKETS_IN_FRAME_CLIPPED) - try: - ret_frame[curr_idx * UINT16_IN_PACKET:(curr_idx + 1) * UINT16_IN_PACKET] = packet_data - except: - pass - - if packets_read > PACKETS_IN_FRAME_CLIPPED: - packets_read = 0 + # Place data from UDP packet in frame buffer + frame_num, frame_data = self._place_data_packet_in_frame_buffer( + byte_count=byte_count, + payload=packet_data + ) + + if frame_data is not None: + # Remove incomplete frames from frame buffer which exceed a timeout + dropped_frames = self._delete_incomplete_frames(timeout_seconds=DELETE_INCOMPLETE_FRAMES_AFTER_SECONDS) + if dropped_frames: + ids = ", ".join(str(f) for f in dropped_frames) + print(f"WARNING: Dropped Frame(s) {ids} since they weren't complete.") + # Return the complete frame + return frame_data def _send_command(self, cmd, length='0000', body='', timeout=1): """Helper function to send a single commmand to the FPGA @@ -236,6 +225,82 @@ def _read_data_packet(self): byte_count = struct.unpack('>Q', b'\x00\x00' + data[4:10][::-1])[0] packet_data = np.frombuffer(data[10:], dtype=np.uint16) return packet_num, byte_count, packet_data + + def _place_data_packet_in_frame_buffer(self, byte_count: int, payload: np.ndarray): + """Helper function to place one UDP packet at the correct position in the frame buffer + + Args: + byte_count (int): cumulative Bytes before this payload (from DCA1000 header) + payload (np.ndarray): uint16 from the UDP packet + + Returns: + (int, np.ndarray): Complete frame as a tuple of (frame_num, frame_data), + (None, None) if no frame is complete yet + """ + + offset = byte_count // 2 # Absolute position in UDP packet stream + idx = 0 # Read-index of payload + remaining = payload.size # Number of uint16 to process + completed = (None, None) # Tuple of (frame_id, frame_data) for complete captured frame + + while remaining > 0: + # Determine which frame_id this data chunk belongs to + frame_id = offset // UINT16_IN_FRAME + # Determine which packet number this is within the frame + packet_num_within_frame = offset % UINT16_IN_FRAME + n_uint16_to_frame_end = UINT16_IN_FRAME - packet_num_within_frame + + # Determine the size chunk of the data which is written to buffer + # (detect if the frame border is within this packet or not) + chunk_size = min(remaining, n_uint16_to_frame_end) + + # Create buffer within frame_buff obj for this frame if neccessary + buf = self.frame_buff.setdefault( + frame_id, + { + 'data': np.empty(UINT16_IN_FRAME, dtype=np.uint16), + 'filled': np.zeros(UINT16_IN_FRAME, dtype=bool), + 'first_seen': time.time() + } + ) + + # Write chunk to appropriate position in the frame's buffer + start = packet_num_within_frame + end = packet_num_within_frame + chunk_size + buf['data'][start:end] = payload[idx:idx+chunk_size] + buf['filled'][start:end] = True + + # If all packets for the frame have been read, add it to completed tuple + # (but do not return yet, as otherwise the rest of the packet data is lost) + if buf['filled'].all(): + completed = (frame_id, buf['data'].copy()) + del self.frame_buff[frame_id] + + # Persist in helper vars that chunk has been read + offset += chunk_size + idx += chunk_size + remaining -= chunk_size + + return completed + + def _delete_incomplete_frames(self, timeout_seconds: float=0.2): + """Helper function to delete incomplete frames from frame buffer which exceed a given timeout + + Args: + timeout_seconds (float): Time after which incomplete frames are deleted + + Returns: + List[int]: List of frame numbers which were deleted (can be empty) + """ + now = time.time() + to_delete = [] + for frame_number, buf in self.frame_buff.items(): + if now - buf['first_seen'] > timeout_seconds: + to_delete.append(frame_number) + for frame_number in to_delete: + del self.frame_buff[frame_number] + + return to_delete def _listen_for_error(self): """Helper function to try and read in for an error message from the FPGA