I appear to be running into a deadlock issue when I try to merge two flow-graph paths. I've created an FFT block to perform strided FFTs. The basic idea is to store samples in an internal buffer until I have a full matrix. Once a matrix is full, perform a strided FFT into a second internal buffer and write to output. I'm consuming all samples, but not always returning samples.
This block works great in a flow-graph until I try to merge two paths where one path contains the FFT block. So my simplified flow graph looks like
where there is a second connection from [Throttle] --> [Multiply].
Some restrictions: I need to use tags, so not using vectors on input/output makes things easier. I'm dealing with matrix sizes on the order of 128x512. I need to avoid tagged stream blocks.
Some things I've tried: Inserting copy blocks in various places. Setting the min buffer size on the advanced tab of all the blocks to be 10*(matrix size). Inserting delay blocks on the second path. Modifying the forecast function to return nOutputs, 0, and rounded up to an integer multiple of the matrix size.
Inserting a Strided FFT block on the second path does seem to get things to move, but inserting a delay does not. I could create a block that uses the same buffering scheme as the FFT, but I'm uncertain if that is an actual solution.
Any thoughts as to why this deadlocks? See code below.
import numpy as np
from gnuradio import gr
class fft_py_cc(gr.basic_block):
"""
Perform FFT/IFFT on a matrix of dim0 x dim1 where dim1 is contiguous.
"""
def __init__(self, dim0, dim1, axis=0, forward=True):
self.axis = axis
self.dim0 = dim0
self.dim1 = dim1
self.forward = forward
self.buffer = np.zeros((dim0,dim1), dtype=np.complex64)
self.bufferOut = np.zeros((dim0,dim1), dtype=np.complex64)
self.iBuffer = 0
self.iBufferOut = dim0*dim1 # Set to the last sample to indicate not valid
gr.basic_block.__init__(self,
name="fft_py_cc",
in_sig=[np.complex64],
out_sig=[np.complex64])
def forecast(self, noutput_items, ninput_items_required):
nMatrix = self.dim0*self.dim1
nRequired = np.floor( (noutput_items - 1)/nMatrix )*nMatrix + nMatrix
for i in range(len(ninput_items_required)):
ninput_items_required[i] = int(nRequired)
#ninput_items_required[i] = noutput_items
#ninput_items_required[i] = 0
def general_work(self, input_items, output_items):
nIn = input_items[0].shape[0]
nOut = output_items[0].shape[0]
iIn = 0
iOut = 0
nBuf = self.dim0*self.dim1
nBo = self.dim0*self.dim1
iBuf = self.iBuffer
iBo = self.iBufferOut
iteration = 0
while True:
# Copy input samples to input buffer
nBufferRemain = nBuf - iBuf
nInputRemain = nIn - iIn
nCopy = min(nBufferRemain, nInputRemain)
if nCopy > 0:
self.buffer.ravel()[iBuf:iBuf + nCopy] = input_items[0][iIn:iIn + nCopy]
iIn += nCopy
iBuf += nCopy
# FFT if ready (input buffer is full and output buffer is available)
if iBuf == nBuf and iBo == nBo:
if self.forward:
self.bufferOut[:] = np.fft.fft(self.buffer, axis=self.axis)
else:
self.bufferOut[:] = np.fft.ifft(self.buffer, axis=self.axis)
iBuf = 0
iBo = 0
# Copy to output
nBufferOutRemain = nBo - iBo
nOutputRemain = nOut - iOut
nCopy = min(nBufferOutRemain, nOutputRemain)
if nCopy > 0:
output_items[0][iOut:iOut + nCopy] = self.bufferOut.ravel()[iBo:iBo + nCopy]
iOut += nCopy
iBo += nCopy
# Check if we are done
if iOut >= nOut or iIn >= nIn:
break
iteration += 1
if iteration == 10000000:
print "Error: FFT may be stuck."
# Update and report consumed and processed samples
self.iBuffer = iBuf
self.iBufferOut = iBo
self.consume(0, iIn)
return iOut