# -*- coding: utf-8 -*- # # SPDX-License-Identifier: GPL-3.0 # # GNU Radio Python Flow Graph # Title: Csp Fsm Core # Author: Chad M. Spooner # GNU Radio version: 3.8.1.0 from gnuradio import blocks from gnuradio import gr from gnuradio.filter import firdes import sys import signal class csp_fsm_core(gr.hier_block2): def __init__(self, alpha=0, block_size=65536, samp_rate=1000000, smooth_width_samples=100): gr.hier_block2.__init__( self, "Csp Fsm Core", gr.io_signature(1, 1, gr.sizeof_gr_complex*1), gr.io_signaturev(2, 2, [gr.sizeof_float*block_size, gr.sizeof_float*block_size]), ) ################################################## # Parameters ################################################## self.alpha = alpha self.block_size = block_size self.samp_rate = samp_rate self.smooth_width_samples = smooth_width_samples ################################################## # Variables ################################################## self.alpha_up = alpha_up = round(block_size*(alpha/samp_rate)) ################################################## # Blocks ################################################## self.blocks_stream_to_vector_0_0_0 = blocks.stream_to_vector(gr.sizeof_gr_complex*1, block_size) self.blocks_nlog10_ff_0 = blocks.nlog10_ff(10, block_size, 0) self.blocks_multiply_xx_0_0 = blocks.multiply_vcc(1) self.blocks_multiply_const_vxx_0_0 = blocks.multiply_const_cc(1.0/block_size) self.blocks_moving_average_xx_0_0 = blocks.moving_average_cc(smooth_width_samples, 1.0/smooth_width_samples, 4000, 1) self.blocks_delay_0_1_1 = blocks.delay(gr.sizeof_gr_complex*1, block_size - int(smooth_width_samples/2.0)) self.blocks_delay_0_1_0 = blocks.delay(gr.sizeof_gr_complex*1, block_size - int(alpha_up/2.0)) self.blocks_delay_0_0 = blocks.delay(gr.sizeof_gr_complex*1, int(alpha_up + 0.5)) self.blocks_conjugate_cc_0_0 = blocks.conjugate_cc() self.blocks_complex_to_mag_0_0 = blocks.complex_to_mag(block_size) ################################################## # Connections ################################################## self.connect((self.blocks_complex_to_mag_0_0, 0), (self.blocks_nlog10_ff_0, 0)) self.connect((self.blocks_complex_to_mag_0_0, 0), (self, 1)) self.connect((self.blocks_conjugate_cc_0_0, 0), (self.blocks_multiply_xx_0_0, 0)) self.connect((self.blocks_delay_0_0, 0), (self.blocks_multiply_xx_0_0, 1)) self.connect((self.blocks_delay_0_1_0, 0), (self.blocks_moving_average_xx_0_0, 0)) self.connect((self.blocks_delay_0_1_1, 0), (self.blocks_stream_to_vector_0_0_0, 0)) self.connect((self.blocks_moving_average_xx_0_0, 0), (self.blocks_delay_0_1_1, 0)) self.connect((self.blocks_multiply_const_vxx_0_0, 0), (self.blocks_delay_0_1_0, 0)) self.connect((self.blocks_multiply_xx_0_0, 0), (self.blocks_multiply_const_vxx_0_0, 0)) self.connect((self.blocks_nlog10_ff_0, 0), (self, 0)) self.connect((self.blocks_stream_to_vector_0_0_0, 0), (self.blocks_complex_to_mag_0_0, 0)) self.connect((self, 0), (self.blocks_conjugate_cc_0_0, 0)) self.connect((self, 0), (self.blocks_delay_0_0, 0)) def get_alpha(self): return self.alpha def set_alpha(self, alpha): self.alpha = alpha self.set_alpha_up(round(self.block_size*(self.alpha/self.samp_rate))) def get_block_size(self): return self.block_size def set_block_size(self, block_size): self.block_size = block_size self.set_alpha_up(round(self.block_size*(self.alpha/self.samp_rate))) self.blocks_delay_0_1_0.set_dly(self.block_size - int(self.alpha_up/2.0)) self.blocks_delay_0_1_1.set_dly(self.block_size - int(self.smooth_width_samples/2.0)) self.blocks_multiply_const_vxx_0_0.set_k(1.0/self.block_size) def get_samp_rate(self): return self.samp_rate def set_samp_rate(self, samp_rate): self.samp_rate = samp_rate self.set_alpha_up(round(self.block_size*(self.alpha/self.samp_rate))) def get_smooth_width_samples(self): return self.smooth_width_samples def set_smooth_width_samples(self, smooth_width_samples): self.smooth_width_samples = smooth_width_samples self.blocks_delay_0_1_1.set_dly(self.block_size - int(self.smooth_width_samples/2.0)) self.blocks_moving_average_xx_0_0.set_length_and_scale(self.smooth_width_samples, 1.0/self.smooth_width_samples) def get_alpha_up(self): return self.alpha_up def set_alpha_up(self, alpha_up): self.alpha_up = alpha_up self.blocks_delay_0_0.set_dly(int(self.alpha_up + 0.5)) self.blocks_delay_0_1_0.set_dly(self.block_size - int(self.alpha_up/2.0))