discuss-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Discuss-gnuradio] Experimental waterfall display module


From: David Carr
Subject: [Discuss-gnuradio] Experimental waterfall display module
Date: Mon, 15 Nov 2004 16:08:45 -0600
User-agent: Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.7.3) Gecko/20040928 MultiZilla/1.6.4.0b

Here is an experimental waterfall display... Its not perfect but it does seem to work. I need some help to make the window start up with the correct dimensions. Just drag the window a little bigger to see the whole display. I'm sure many other improvements could be made as well.

-David Carr


#!/usr/bin/env python
#
# Copyright 2003,2004 Free Software Foundation, Inc.
# 
# This file is part of GNU Radio
# 
# GNU Radio is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2, or (at your option)
# any later version.
# 
# GNU Radio is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with GNU Radio; see the file COPYING.  If not, write to
# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
# Boston, MA 02111-1307, USA.
# 

from gnuradio import gr
from gnuradio.wxgui import stdgui
import wx
from wxPython.wx import *
import Numeric
import os
import threading
    


# FIXME this should be rewritten to use hierarchical modules (when they're 
ready)

# ========================================================================
# returns (block, win).
#   block requires a single input stream of float
#   win is a subclass of wxWindow

def make_waterfall_sink_f (fg, parent, label, fft_size, input_rate):
    (r_fd, w_fd) = os.pipe ()
    fft_rate = 10

    s2p = gr.serial_to_parallel (gr.sizeof_float, fft_size)
    one_in_n = gr.keep_one_in_n (gr.sizeof_float * fft_size,
                             int (input_rate / fft_size / fft_rate))
    fft = gr.fft_vfc (fft_size, True, True)
    dst = gr.file_descriptor_sink (gr.sizeof_gr_complex * fft_size, w_fd)

    fg.connect (s2p, one_in_n)
    fg.connect (one_in_n, fft)
    fg.connect (fft, dst)

    block = s2p                       # head of pipeline

    win = waterfall_window (fft_info (r_fd, fft_size, input_rate, True, label), 
parent)
    
    return (block, win)

# ========================================================================
# returns (block, win).
#   block requires a single input stream of gr_complex
#   win is a subclass of wxWindow

def make_waterfall_sink_c (fg, parent, label, fft_size, input_rate):
    (r_fd, w_fd) = os.pipe ()
    fft_rate = 20

    s2p = gr.serial_to_parallel (gr.sizeof_gr_complex, fft_size)
    one_in_n = gr.keep_one_in_n (gr.sizeof_gr_complex * fft_size,
                             int (input_rate / fft_size / fft_rate))
    fft = gr.fft_vcc (fft_size, True, True)
    dst = gr.file_descriptor_sink (gr.sizeof_gr_complex * fft_size, w_fd)

    fg.connect (s2p, one_in_n)
    fg.connect (one_in_n, fft)
    fg.connect (fft, dst)

    block = s2p                       # head of pipeline

    win = waterfall_window (fft_info (r_fd, fft_size, input_rate, False, 
label), parent)

    return (block, win)

# ------------------------------------------------------------------------

myDATA_EVENT = wx.NewEventType()
EVT_DATA_EVENT = wx.PyEventBinder (myDATA_EVENT, 0)


class DataEvent(wx.PyEvent):
    def __init__(self, data):
        wx.PyEvent.__init__(self)
        self.SetEventType (myDATA_EVENT)
        self.data = data

    def Clone (self): 
        self.__class__ (self.GetId())


class fft_info:
    def __init__ (self, file_descriptor, fft_size, sample_rate, input_is_real, 
title = "fft"):
        self.file_descriptor = file_descriptor
        self.fft_size = fft_size
        self.sample_rate = sample_rate
        self.input_is_real = input_is_real
        self.title = title;
        
class input_watcher (threading.Thread):
    def __init__ (self, file_descriptor, fft_size, event_receiver, **kwds):
        threading.Thread.__init__ (self, **kwds)
        self.setDaemon (1)
        self.file_descriptor = file_descriptor
        self.fft_size = fft_size
        self.event_receiver = event_receiver
        self.keep_running = True
        self.start ()

    def run (self):
        # print "input_watcher: pid = ", os.getpid ()
        while (self.keep_running):
            s = os.read (self.file_descriptor, gr.sizeof_gr_complex * 
self.fft_size)
            if not s:
                self.keep_running = False
                break

            complex_data = Numeric.fromstring (s, Numeric.Complex32)
            de = DataEvent (complex_data)
            wx.PostEvent (self.event_receiver, de)
            # print "run: len(complex_data) = ", len(complex_data)
            del de
    

class waterfall_window (wxPanel ):
    bm = None
    def __init__ (self, info, parent, id = -1,
                  pos = wx.DefaultPosition, size = wx.DefaultSize,
                  style = wx.DEFAULT_FRAME_STYLE,name=""):
        wxPanel.__init__ (self, parent)
        
        self.info = info;

        self.bm =  wx.EmptyBitmap(self.info.fft_size,300,-1)

        dc1 = wx.MemoryDC()
        dc1.SelectObject(self.bm)
        dc1.Clear()

        EVT_PAINT( self, self.OnPaint )
        EVT_DATA_EVENT (self, self.set_data)
        wx.EVT_CLOSE (self, self.on_close_window)

        self.input_watcher = input_watcher (info.file_descriptor,
                                            info.fft_size,
                                            self)
    def OnPaint(self, event):
        dc = wxPaintDC(self)
        self.DoDrawing(dc)

    def DoDrawing(self, dc=None):
        if dc is None:
            dc = wxClientDC(self)
        dc.DrawBitmap( self.bm,0,0,False )
    
    def on_close_window (self, event):
        print "waterfall_window:on_close_window"
        self.keep_running = False
        

    def set_data (self, evt):
        data = evt.data
        dB = 20 * Numeric.log10 (abs(data) + 1e-8)
        l = len (dB)
        
        dc1 = wx.MemoryDC()
        dc1.SelectObject(self.bm)
        dc1.Blit(0,1,self.info.fft_size,300,dc1,0,0,wx.COPY,False,-1,-1)

        if self.info.input_is_real:
            d_max = l/2
            p_width = 2
        else:
            d_max = l
            p_width = 1

        for x_pos in range(0, d_max):
                value = int(dB[x_pos] * 1.5)
                if value > 255:
                    value = 255
                elif value < 0:
                    value = 0
                colour = wx.Colour(value, value, value)

                dc1.SetPen(wx.Pen(colour, 1, wx.SOLID))
                dc1.SetBrush(wx.Brush(colour, wx.SOLID))
                dc1.DrawRectangle(x_pos*p_width, 0, p_width, 1)
        self.DoDrawing (None)


# ----------------------------------------------------------------
# Standalone test app
# ----------------------------------------------------------------

class test_app_flow_graph (stdgui.gui_flow_graph):
    def __init__(self, frame, panel, vbox, argv):
        stdgui.gui_flow_graph.__init__ (self, frame, panel, vbox, argv)

        # build our flow graph
        input_rate = 2e6

        src = gr.sig_source_c (input_rate, gr.GR_SIN_WAVE, .3e6, 1e6)
        block, waterfall_win = make_waterfall_sink_c (self, panel, "Secret 
Data", 512, input_rate)
        self.connect (src, block)
        vbox.Add (waterfall_win, 1, wx.EXPAND)


def main ():
    app = stdgui.stdapp (test_app_flow_graph, "WATERFALL Sink Test App")
    app.MainLoop ()

if __name__ == '__main__':
    main ()

# ----------------------------------------------------------------

reply via email to

[Prev in Thread] Current Thread [Next in Thread]