[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Discuss-gnuradio] Experimental waterfall display module
From: |
David Carr |
Subject: |
[Discuss-gnuradio] Experimental waterfall display module |
Date: |
Mon, 15 Nov 2004 16:08:45 -0600 |
User-agent: |
Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.7.3) Gecko/20040928 MultiZilla/1.6.4.0b |
Here is an experimental waterfall display... Its not perfect but it
does seem to work. I need some help to make the window start up with
the correct dimensions. Just drag the window a little bigger to see the
whole display. I'm sure many other improvements could be made as well.
-David Carr
#!/usr/bin/env python
#
# Copyright 2003,2004 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
# GNU Radio is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2, or (at your option)
# any later version.
#
# GNU Radio is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with GNU Radio; see the file COPYING. If not, write to
# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
# Boston, MA 02111-1307, USA.
#
from gnuradio import gr
from gnuradio.wxgui import stdgui
import wx
from wxPython.wx import *
import Numeric
import os
import threading
# FIXME this should be rewritten to use hierarchical modules (when they're
ready)
# ========================================================================
# returns (block, win).
# block requires a single input stream of float
# win is a subclass of wxWindow
def make_waterfall_sink_f (fg, parent, label, fft_size, input_rate):
(r_fd, w_fd) = os.pipe ()
fft_rate = 10
s2p = gr.serial_to_parallel (gr.sizeof_float, fft_size)
one_in_n = gr.keep_one_in_n (gr.sizeof_float * fft_size,
int (input_rate / fft_size / fft_rate))
fft = gr.fft_vfc (fft_size, True, True)
dst = gr.file_descriptor_sink (gr.sizeof_gr_complex * fft_size, w_fd)
fg.connect (s2p, one_in_n)
fg.connect (one_in_n, fft)
fg.connect (fft, dst)
block = s2p # head of pipeline
win = waterfall_window (fft_info (r_fd, fft_size, input_rate, True, label),
parent)
return (block, win)
# ========================================================================
# returns (block, win).
# block requires a single input stream of gr_complex
# win is a subclass of wxWindow
def make_waterfall_sink_c (fg, parent, label, fft_size, input_rate):
(r_fd, w_fd) = os.pipe ()
fft_rate = 20
s2p = gr.serial_to_parallel (gr.sizeof_gr_complex, fft_size)
one_in_n = gr.keep_one_in_n (gr.sizeof_gr_complex * fft_size,
int (input_rate / fft_size / fft_rate))
fft = gr.fft_vcc (fft_size, True, True)
dst = gr.file_descriptor_sink (gr.sizeof_gr_complex * fft_size, w_fd)
fg.connect (s2p, one_in_n)
fg.connect (one_in_n, fft)
fg.connect (fft, dst)
block = s2p # head of pipeline
win = waterfall_window (fft_info (r_fd, fft_size, input_rate, False,
label), parent)
return (block, win)
# ------------------------------------------------------------------------
myDATA_EVENT = wx.NewEventType()
EVT_DATA_EVENT = wx.PyEventBinder (myDATA_EVENT, 0)
class DataEvent(wx.PyEvent):
def __init__(self, data):
wx.PyEvent.__init__(self)
self.SetEventType (myDATA_EVENT)
self.data = data
def Clone (self):
self.__class__ (self.GetId())
class fft_info:
def __init__ (self, file_descriptor, fft_size, sample_rate, input_is_real,
title = "fft"):
self.file_descriptor = file_descriptor
self.fft_size = fft_size
self.sample_rate = sample_rate
self.input_is_real = input_is_real
self.title = title;
class input_watcher (threading.Thread):
def __init__ (self, file_descriptor, fft_size, event_receiver, **kwds):
threading.Thread.__init__ (self, **kwds)
self.setDaemon (1)
self.file_descriptor = file_descriptor
self.fft_size = fft_size
self.event_receiver = event_receiver
self.keep_running = True
self.start ()
def run (self):
# print "input_watcher: pid = ", os.getpid ()
while (self.keep_running):
s = os.read (self.file_descriptor, gr.sizeof_gr_complex *
self.fft_size)
if not s:
self.keep_running = False
break
complex_data = Numeric.fromstring (s, Numeric.Complex32)
de = DataEvent (complex_data)
wx.PostEvent (self.event_receiver, de)
# print "run: len(complex_data) = ", len(complex_data)
del de
class waterfall_window (wxPanel ):
bm = None
def __init__ (self, info, parent, id = -1,
pos = wx.DefaultPosition, size = wx.DefaultSize,
style = wx.DEFAULT_FRAME_STYLE,name=""):
wxPanel.__init__ (self, parent)
self.info = info;
self.bm = wx.EmptyBitmap(self.info.fft_size,300,-1)
dc1 = wx.MemoryDC()
dc1.SelectObject(self.bm)
dc1.Clear()
EVT_PAINT( self, self.OnPaint )
EVT_DATA_EVENT (self, self.set_data)
wx.EVT_CLOSE (self, self.on_close_window)
self.input_watcher = input_watcher (info.file_descriptor,
info.fft_size,
self)
def OnPaint(self, event):
dc = wxPaintDC(self)
self.DoDrawing(dc)
def DoDrawing(self, dc=None):
if dc is None:
dc = wxClientDC(self)
dc.DrawBitmap( self.bm,0,0,False )
def on_close_window (self, event):
print "waterfall_window:on_close_window"
self.keep_running = False
def set_data (self, evt):
data = evt.data
dB = 20 * Numeric.log10 (abs(data) + 1e-8)
l = len (dB)
dc1 = wx.MemoryDC()
dc1.SelectObject(self.bm)
dc1.Blit(0,1,self.info.fft_size,300,dc1,0,0,wx.COPY,False,-1,-1)
if self.info.input_is_real:
d_max = l/2
p_width = 2
else:
d_max = l
p_width = 1
for x_pos in range(0, d_max):
value = int(dB[x_pos] * 1.5)
if value > 255:
value = 255
elif value < 0:
value = 0
colour = wx.Colour(value, value, value)
dc1.SetPen(wx.Pen(colour, 1, wx.SOLID))
dc1.SetBrush(wx.Brush(colour, wx.SOLID))
dc1.DrawRectangle(x_pos*p_width, 0, p_width, 1)
self.DoDrawing (None)
# ----------------------------------------------------------------
# Standalone test app
# ----------------------------------------------------------------
class test_app_flow_graph (stdgui.gui_flow_graph):
def __init__(self, frame, panel, vbox, argv):
stdgui.gui_flow_graph.__init__ (self, frame, panel, vbox, argv)
# build our flow graph
input_rate = 2e6
src = gr.sig_source_c (input_rate, gr.GR_SIN_WAVE, .3e6, 1e6)
block, waterfall_win = make_waterfall_sink_c (self, panel, "Secret
Data", 512, input_rate)
self.connect (src, block)
vbox.Add (waterfall_win, 1, wx.EXPAND)
def main ():
app = stdgui.stdapp (test_app_flow_graph, "WATERFALL Sink Test App")
app.MainLoop ()
if __name__ == '__main__':
main ()
# ----------------------------------------------------------------
- [Discuss-gnuradio] Experimental waterfall display module,
David Carr <=