import numpy as np from gnuradio import gr, gr_unittest, blocks import pmt class signal_sink(gr.sync_block): def __init__(self): gr.sync_block.__init__(self, name="sink", in_sig=[np.float32], out_sig=None) self.signal = pmt.from_bool(False) self.port_name = pmt.intern("sig") self.message_port_register_out(self.port_name) def work(self, input_items, output_items): #self.message_port_pub(self.port_name, self.signal) return len(input_items[0]) class qa_msg_passing(gr_unittest.TestCase): def setUp(self): self.tb = gr.top_block() def tearDown(self): self.tb = None def test_copy(self): src_data = np.arange(1000) src = blocks.vector_source_f(src_data) copy = blocks.copy(gr.sizeof_float) msg_debug = blocks.message_debug() sig_sink = signal_sink() self.tb.connect(src, copy, sig_sink) self.tb.msg_connect(sig_sink, "sig", copy, "en") # This causes problems #self.tb.msg_connect(sig_sink, "sig", msg_debug, "print") # This does not # Run once (this run is successful) self.tb.run() # Run again (this run hangs) src.rewind() self.tb.start() print("Debug: 2nd top_block run started...") self.tb.wait() print("Debug: 2nd top_block run exited...") if __name__ == '__main__': #import os #print("Blocked waiting for GDB attach (pid = {})".format(os.getpid())) #raw_input("Press Enter to continue...") gr_unittest.run(qa_msg_passing)