#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # SPDX-License-Identifier: GPL-3.0 # # GNU Radio Python Flow Graph # Title: Not titled yet # import json5 as json from gnuradio import blocks from gnuradio import gr from gnuradio.filter import firdes from gnuradio.fft import window import sys import os import signal from argparse import ArgumentParser from gnuradio.eng_arg import eng_float, intx from gnuradio import eng_notation from gnuradio import gr, blocks from gnuradio import uhd import time import numpy as np from datetime import datetime from loguru import logger logger.add("logs/min_test_example.log", rotation="1 week") # for printing time stamp fmt = '%Y-%m-%d %H:%M:%S.%f' dual_record_settings_json = """ { "Fc_hz": 437000000.0, "Fs_hz": 500000.0, "gain_dB": 45, "NID": 666, "file_name": "", "file_path": "/media/rf_data/recordings", "antenna": "RX2", "pps_present": true, "test_dev_null": false, "usrps": [ { "serial": "3136D4B", "subdev": "A:A A:B", "args": "recv_frame_size=1032, num_recv_frames=5120", "antenna": "RX2" } ] } """ single_record_settings_json = """ { "Fc_hz": 437000000.0, "Fs_hz": 500000.0, "gain_dB": 45, "NID": 666, "file_name": "", "file_path": "/media/rf_data/recordings", "antenna": "RX2", "pps_present": true, "test_dev_null": false, "usrps": [ { "serial": "3136D4B", "subdev": "A:A", "args": "recv_frame_size=1032, num_recv_frames=5120", "antenna": "RX2" } ] } """ class bulk_usrp_record(gr.top_block): def __init__(self, cfg): gr.top_block.__init__(self, "") # get default settings self.cfg = cfg self.samp_rate = int(cfg.get('Fs_hz')) self.Fc_hz = int(cfg.get('Fc_hz')) self.gain_dB = cfg.get('gain_dB', 0) self.antenna = cfg.get('antenna', 'RX2') # default if not specified for each USRP self.subdev = cfg.get('subdev', 'A:A') # default if not specified for each USRPS self.args = cfg.get('args', '') # default if not specified for each USRPS self.pps_present = cfg.get('pps_present', True) # default to pps enabled. If no pps is present the script will hang self.test_dev_null = cfg.get('test_dev_null', False) # write output to dev/null just for testing purposes self.file_base_name = cfg.get('file_name') self.file_path = cfg.get('file_path') # check the USRPs usrp_configs = cfg.get('usrps', []) if len(usrp_configs) == 0: # empty list, initialize one USRP with one channel n_channels = len(self.subdev.split(' ')) uhd_usrp_source_0 = uhd.usrp_source( self.args, uhd.stream_args( cpu_format="fc32", args='', channels=list(range(0, n_channels)), ), ) uhd_usrp_source_0.set_subdev_spec(self.subdev, 0) uhd_usrp_source_0.set_thread_priority(1) # -1 lower priority, 0 normal, 1 higher priority self.uhd_usrp_sources = [ {'usrp': uhd_usrp_source_0, 'n_channels': n_channels, 'antenna': self.antenna, 'config': {} } ] else: # make multiple USRP devices. All devices must be fully specified # Required: Serial and n_channels # args self.uhd_usrp_sources = [] for u in usrp_configs: subdev = u.get('subdev', self.subdev) n_channels = len(subdev.split(' ')) args = u.get('args', self.args) serial = u.get('serial','') addr = u.get('addr','') if len(serial) > 0: args += f', serial={serial}' elif len(addr) > 0: args += f', addr={addr}' uhd_usrp_source_0 = uhd.usrp_source( args, uhd.stream_args( cpu_format="fc32", args='', channels=list(range(0, n_channels)), ), ) uhd_usrp_source_0.set_subdev_spec(subdev, 0) uhd_usrp_source_0.set_thread_priority(1) # -1 lower priority, 0 normal, 1 higher priority self.uhd_usrp_sources.append( {'usrp': uhd_usrp_source_0, 'n_channels': n_channels, 'antenna': u.get('antenna', self.antenna), 'serial': serial, 'config': u } ) logger.info(f'USRPS: {self.uhd_usrp_sources}') logger.debug(f'settings: {self.cfg}') # finish configuring the USRPS for usrp in self.uhd_usrp_sources: u = usrp['usrp'] u_cfg = usrp['config'] clock_source = u_cfg.get('clock_source','external') time_source = u_cfg.get('time_source','external') logger.debug(f'USRP {usrp}: Setting clock source to {clock_source} and time source to {time_source}') u.set_clock_source(clock_source, 0) u.set_time_source(time_source, 0) u.set_samp_rate(self.samp_rate) # needs to be set immediately # check USRP lock status status_ref_locked = u.get_mboard_sensor('ref_locked').to_bool() logger.info(f'Clock ref locked {status_ref_locked}') for c in range(usrp['n_channels']): logger.debug(f'{c} antenna {usrp["antenna"]} gain {self.gain_dB}') u.set_antenna(usrp['antenna'], c) try: u.set_rx_agc(False, c) except (NotImplementedError, RuntimeError): logger.debug(f'Disabling AGC not supported for usrp {usrp["serial"]}') u.set_gain(self.gain_dB, c) # Create the file sources and connect these cfg['output_file_names'] = [] if self.test_dev_null is True: file_base = 'rec/null' file_path = '' logger.info(f'Writing to {file_base}') else: file_date = int(time.time()) file_path = f'{self.file_path}/' file_base = f'{file_date}_{cfg.get("NID","")}' file_offset = 0 for usrp in self.uhd_usrp_sources: # create file name as logger.debug(usrp['n_channels']) for c in range(usrp['n_channels']): if self.test_dev_null: f_name = f'{file_base}_{c+file_offset}.bin' else: f_name = f'{file_base}_ch{c+file_offset}.dat' cfg['output_file_names'].append(f_name) blocks_file_meta_sink_0 = blocks.file_meta_sink(gr.sizeof_gr_complex*1, file_path + f_name, self.samp_rate, 1, blocks.GR_FILE_FLOAT, True, 1000000, detached_header=True) blocks_file_meta_sink_0.set_unbuffered(False) self.connect((usrp['usrp'], c), (blocks_file_meta_sink_0, 0)) file_offset = file_offset + usrp['n_channels'] if self.pps_present: time.sleep(1) logger.warning('syncing time -- If this hangs, please verify that an external pps is configured') self.sync_clock() logger.info('verifying time') self.check_clock() else: logger.warning('skipping time sync, since pps is configured not to be present') time.sleep(1) logger.debug('tune the USRPs') self.tune_usrps() logger.debug('done') def sync_clock(self): """ Set the time on the USRPs """ time_tol = 50e-9 for i,usrp in enumerate(self.uhd_usrp_sources): usrp_last_pps_time = usrp['usrp'].get_time_last_pps().get_real_secs() dtstamp = datetime.fromtimestamp(usrp_last_pps_time) logger.debug(f'USRP{i} time {usrp_last_pps_time} {dtstamp.strftime(fmt)}') cur_time = time.time() # GPS synced pc clock cnt = 0 t_err = int(cur_time) - usrp_last_pps_time while abs(t_err) > time_tol: usrp['usrp'].set_time_next_pps(uhd.time_spec_t(int(time.time())+1)) time.sleep(0.1) usrp_last_pps_time = usrp['usrp'].get_time_last_pps().get_real_secs() cur_time = time.time() t_err = int(cur_time) - usrp_last_pps_time cnt += 1 logger.info(f'USRP {usrp.get("serial","")} -- Successfully synced USRP GPS time in {cnt} attempts') def check_clock(self): """ Check whether the USRPs are in sync (same second) """ u = self.uhd_usrp_sources[0]['usrp'] last_time = u.get_time_last_pps().get_real_secs() # wait for clock edge on one USRP while u.get_time_last_pps().get_real_secs() - last_time == 0: time.sleep(0.001) times = [] for i,usrp in enumerate(self.uhd_usrp_sources): times.append(usrp['usrp'].get_time_last_pps().get_real_secs()) dtstamp = datetime.fromtimestamp(times[-1]) logger.info(f'USRP{i} time {times[-1]} {dtstamp.strftime(fmt)}') if not all([t == times[0] for t in times]): logger.warning(f'USRP clocks not synced. times recorded: {times}') else: logger.warning('All USRP clocks in sync (to the second)') def tune_usrps(self): """ Tune all the USRPs coherently to the center frequency. """ CMD_DELAY = 2 usrp_time = self.uhd_usrp_sources[0]['usrp'].get_time_last_pps().get_real_secs() cmd_time = usrp_time + CMD_DELAY for usrp in self.uhd_usrp_sources: u = usrp['usrp'] for c in range(usrp['n_channels']): u.set_center_freq(self.Fc_hz, c) def get_samp_rate(self): return self.samp_rate def set_samp_rate(self, samp_rate): self.samp_rate = samp_rate self.uhd_usrp_source_0.set_samp_rate(self.samp_rate) def start(self,delay_sec=0): """ Override the start method to insert a command timer TODO: B210 two channel mode throws "USRP Source Block caught rx error code: 2" because the time gets multiplied by 2 when starting the recording. Check whether UHD 4.2 fixes this """ # all USRPs should be in sync so use the time from one usrp_last_pps_time = self.uhd_usrp_sources[0]['usrp'].get_time_last_pps().get_real_secs() dtstamp = datetime.fromtimestamp(usrp_last_pps_time) logger.info(f'Before stream start: system time {time.time()} usrp time {usrp_last_pps_time} ({dtstamp.strftime(fmt)})') if delay_sec > 0: for u in self.uhd_usrp_sources: u['usrp'].set_start_time(uhd.time_spec_t(int(usrp_last_pps_time)+delay_sec)) logger.warning(f'Starting recording in {int(usrp_last_pps_time)+delay_sec - int(time.time())} s at time {int(usrp_last_pps_time)+delay_sec}') usrp_last_pps_time = self.uhd_usrp_sources[0]['usrp'].get_time_last_pps().get_real_secs() dtstamp = datetime.fromtimestamp(usrp_last_pps_time) logger.info(f'Check time again system time {time.time()} usrp time {usrp_last_pps_time} ({dtstamp.strftime(fmt)})') else: logger.warning('Starting recording immediately') logger.info('Calling start()') gr.top_block.start(self) usrp_last_pps_time = self.uhd_usrp_sources[0]['usrp'].get_time_last_pps().get_real_secs() dtstamp = datetime.fromtimestamp(usrp_last_pps_time) logger.info(f'Time after issuing start stream command: {time.time()} usrp time {usrp_last_pps_time} ({dtstamp.strftime(fmt)})') if __name__ == '__main__': cfg = json.loads(dual_record_settings_json) tb = bulk_usrp_record(cfg) def sig_handler(sig=None, frame=None): tb.stop() tb.wait() sys.exit(0) signal.signal(signal.SIGINT, sig_handler) signal.signal(signal.SIGTERM, sig_handler) logger.info(f'start recording') tb.start(0) # TODO: B210 two channel mode multiplies time by 2 upon start, so anything above 0 here will throw rx error code 2 try: input('Press Enter to quit: ') except EOFError: pass tb.stop() tb.wait() logger.warning(f'verifying clock after recording') tb.check_clock() logger.info(cfg)