#!/usr/bin/env python ################################################## # Gnuradio Python Flow Graph # Title: Integrated Radio Astronomy Receiver--test graph # Author: Marcus Leech # Description: gnuradio flow graph # Generated: Wed Jun 29 00:24:15 2011 ################################################## from gnuradio import eng_notation from gnuradio import gr from gnuradio import uhd from gnuradio import window from gnuradio.eng_option import eng_option from gnuradio.gr import firdes from gnuradio.wxgui import fftsink2 from grc_gnuradio import wxgui as grc_wxgui from optparse import OptionParser import wx class uhd_test_ra_receiver(grc_wxgui.top_block_gui): def __init__(self, freq=1420.4058e6, gain=55, ratepsr=50000, bandwidth=10.0e6, devaddr="addr=192.168.10.2", setisize=65536*4): grc_wxgui.top_block_gui.__init__(self, title="Integrated Radio Astronomy Receiver--test graph") ################################################## # Parameters ################################################## self.freq = freq self.gain = gain self.ratepsr = ratepsr self.bandwidth = bandwidth self.devaddr = devaddr self.setisize = setisize ################################################## # Variables ################################################## self.seti_rate = seti_rate = 5 ################################################## # Blocks ################################################## self.wxgui_fftsink2_0 = fftsink2.fft_sink_c( self.GetWin(), baseband_freq=freq, y_per_div=10, y_divs=10, ref_level=50, ref_scale=2.0, sample_rate=bandwidth, fft_size=1024, fft_rate=10, average=True, avg_alpha=0.250, title="FFT Plot", peak_hold=False, ) self.Add(self.wxgui_fftsink2_0.win) self.uhd_usrp_source_0 = uhd.usrp_source( device_addr=devaddr, io_type=uhd.io_type.COMPLEX_FLOAT32, num_channels=1, ) self.uhd_usrp_source_0.set_samp_rate(bandwidth) self.uhd_usrp_source_0.set_center_freq(freq, 0) self.uhd_usrp_source_0.set_gain(gain, 0) self.gr_stream_to_vector_0 = gr.stream_to_vector(gr.sizeof_gr_complex*1, int(setisize)) self.gr_single_pole_iir_filter_xx_1 = gr.single_pole_iir_filter_ff(1.0/(bandwidth/setisize), int(setisize)) self.gr_single_pole_iir_filter_xx_0 = gr.single_pole_iir_filter_ff(1.0/(bandwidth/(ratepsr/2.0)), 1) self.gr_keep_one_in_n_1 = gr.keep_one_in_n(gr.sizeof_float*int(setisize), int(bandwidth/setisize)/seti_rate) self.gr_keep_one_in_n_0 = gr.keep_one_in_n(gr.sizeof_float*1, int(bandwidth/ratepsr)) self.gr_file_sink_1 = gr.file_sink(gr.sizeof_float*1, "/dev/null") self.gr_file_sink_1.set_unbuffered(True) self.gr_file_sink_0_0 = gr.file_sink(gr.sizeof_float*int(setisize), "/dev/null") self.gr_file_sink_0_0.set_unbuffered(True) self.gr_fft_vxx_0 = gr.fft_vcc(int(setisize), True, (window.blackmanharris(int(bandwidth))), False) self.gr_complex_to_mag_squared_0 = gr.complex_to_mag_squared(1) self.gr_complex_to_mag_0 = gr.complex_to_mag(int(setisize)) ################################################## # Connections ################################################## self.connect((self.gr_stream_to_vector_0, 0), (self.gr_fft_vxx_0, 0)) self.connect((self.gr_fft_vxx_0, 0), (self.gr_complex_to_mag_0, 0)) self.connect((self.gr_single_pole_iir_filter_xx_0, 0), (self.gr_keep_one_in_n_0, 0)) self.connect((self.gr_complex_to_mag_squared_0, 0), (self.gr_single_pole_iir_filter_xx_0, 0)) self.connect((self.gr_keep_one_in_n_0, 0), (self.gr_file_sink_1, 0)) self.connect((self.gr_complex_to_mag_0, 0), (self.gr_single_pole_iir_filter_xx_1, 0)) self.connect((self.gr_single_pole_iir_filter_xx_1, 0), (self.gr_keep_one_in_n_1, 0)) self.connect((self.gr_keep_one_in_n_1, 0), (self.gr_file_sink_0_0, 0)) self.connect((self.uhd_usrp_source_0, 0), (self.wxgui_fftsink2_0, 0)) self.connect((self.uhd_usrp_source_0, 0), (self.gr_complex_to_mag_squared_0, 0)) self.connect((self.uhd_usrp_source_0, 0), (self.gr_stream_to_vector_0, 0)) def get_freq(self): return self.freq def set_freq(self, freq): self.freq = freq self.wxgui_fftsink2_0.set_baseband_freq(self.freq) self.uhd_usrp_source_0.set_center_freq(self.freq, 0) def get_gain(self): return self.gain def set_gain(self, gain): self.gain = gain self.uhd_usrp_source_0.set_gain(self.gain, 0) def get_ratepsr(self): return self.ratepsr def set_ratepsr(self, ratepsr): self.ratepsr = ratepsr self.gr_keep_one_in_n_0.set_n(int(self.bandwidth/self.ratepsr)) self.gr_single_pole_iir_filter_xx_0.set_taps(1.0/(self.bandwidth/(self.ratepsr/2.0))) def get_bandwidth(self): return self.bandwidth def set_bandwidth(self, bandwidth): self.bandwidth = bandwidth self.gr_keep_one_in_n_1.set_n(int(self.bandwidth/self.setisize)/self.seti_rate) self.gr_keep_one_in_n_0.set_n(int(self.bandwidth/self.ratepsr)) self.gr_single_pole_iir_filter_xx_0.set_taps(1.0/(self.bandwidth/(self.ratepsr/2.0))) self.wxgui_fftsink2_0.set_sample_rate(self.bandwidth) self.uhd_usrp_source_0.set_samp_rate(self.bandwidth) self.gr_single_pole_iir_filter_xx_1.set_taps(1.0/(self.bandwidth/self.setisize)) def get_devaddr(self): return self.devaddr def set_devaddr(self, devaddr): self.devaddr = devaddr def get_setisize(self): return self.setisize def set_setisize(self, setisize): self.setisize = setisize self.gr_keep_one_in_n_1.set_n(int(self.bandwidth/self.setisize)/self.seti_rate) self.gr_single_pole_iir_filter_xx_1.set_taps(1.0/(self.bandwidth/self.setisize)) def get_seti_rate(self): return self.seti_rate def set_seti_rate(self, seti_rate): self.seti_rate = seti_rate self.gr_keep_one_in_n_1.set_n(int(self.bandwidth/self.setisize)/self.seti_rate) if __name__ == '__main__': parser = OptionParser(option_class=eng_option, usage="%prog: [options]") parser.add_option("", "--freq", dest="freq", type="eng_float", default=eng_notation.num_to_str(1420.4058e6), help="Set freq [default=%default]") parser.add_option("", "--gain", dest="gain", type="eng_float", default=eng_notation.num_to_str(55), help="Set gain [default=%default]") parser.add_option("", "--ratepsr", dest="ratepsr", type="eng_float", default=eng_notation.num_to_str(50000), help="Set ratepsr [default=%default]") parser.add_option("", "--bandwidth", dest="bandwidth", type="eng_float", default=eng_notation.num_to_str(10.0e6), help="Set None [default=%default]") parser.add_option("", "--devaddr", dest="devaddr", type="string", default="addr=192.168.10.2", help="Set devaddr [default=%default]") parser.add_option("", "--setisize", dest="setisize", type="intx", default=65536*4, help="Set setisize [default=%default]") (options, args) = parser.parse_args() tb = uhd_test_ra_receiver(freq=options.freq, gain=options.gain, ratepsr=options.ratepsr, bandwidth=options.bandwidth, devaddr=options.devaddr, setisize=options.setisize) tb.Run(True)