Source code for numpy_ipps.filtering.fir

"""FIR Functions."""
import collections.abc as _abc
import logging as _logging

import numpy as _numpy
import scipy.signal as _signal

import numpy_ipps._detail.debug as _debug
import numpy_ipps._detail.dispatch as _dispatch
import numpy_ipps._detail.metaclass.selector as _selector
import numpy_ipps.policies
import numpy_ipps.signal
import numpy_ipps.utils


[docs]class FIR: """FIR Function. ``dst <- convolve( src, kernel )`` """ __slots__ = ( "_ipps_backend", "_ipps_mem_spec", "_ipps_mem_buffer", "_ipps_kernel", "_ipps_dlySrc", "_ipps_dlyDst", "_ipps_method", "_numpy_method", "_numpy_broadcast", "_numpy_factor", "_numpy_phase", ) dtype_candidates = numpy_ipps.policies.float_candidates _ipps_kind = _selector.Kind.UNARY def __init__( self, kernel, method=numpy_ipps.Method.DIRECT, continuous=False, rate=None, ): if ( __debug__ and rate is not None and not isinstance(rate, _abc.Sequence) ): _debug.log_and_raise( AssertionError, "FIR rate should be a pair.", name=__name__, ) if rate is not None and rate[0] == 1: rate = None if rate is None: self._numpy_factor, self._numpy_phase = 1, 0 else: if __debug__ and method is numpy_ipps.Method.FFT: _debug.log_and_raise( AssertionError, "Method for Multi-rate FIR should be Method.DIRECT.", name=__name__, ) self._numpy_factor, self._numpy_phase = rate if method is numpy_ipps.Method.DIRECT: self._ipps_method = numpy_ipps.utils.cast("int", 0x00000001) self._numpy_method = "direct" elif method is numpy_ipps.Method.FFT: self._ipps_method = numpy_ipps.utils.cast("int", 0x00000002) self._numpy_method = "fft" else: raise RuntimeError("Unknown method {}".format(method)) self._ipps_kernel = numpy_ipps.utils.ndarray(kernel) self._numpy_broadcast = -kernel.size + 1 if kernel.dtype == _numpy.float32: ipps_type = numpy_ipps.utils.cast("int", 13) elif kernel.dtype == _numpy.float64: ipps_type = numpy_ipps.utils.cast("int", 19) elif kernel.dtype == _numpy.complex64: ipps_type = numpy_ipps.utils.cast("int", 14) elif kernel.dtype == _numpy.complex128: ipps_type = numpy_ipps.utils.cast("int", 20) else: raise RuntimeError("Unknown dtype {}".format(kernel.dtype)) if continuous: self._ipps_dlySrc = numpy_ipps.utils.ndarray( _numpy.zeros(kernel.size, dtype=kernel.dtype) ) self._ipps_dlyDst = numpy_ipps.utils.ndarray( _numpy.empty(kernel.size, dtype=kernel.dtype) ) else: self._ipps_dlySrc = numpy_ipps.utils.ndarray() self._ipps_dlyDst = numpy_ipps.utils.ndarray() ipps_spec_size = numpy_ipps.utils.new("int*") ipps_buffer_size = numpy_ipps.utils.new("int*") ipps_fir_getsize = _dispatch.ipps_function( "FIRSRGetSize" if rate is None else "FIRMRGetSize", ("int",) + (tuple() if rate is None else ("int", "int")) + ("int", "int*", "int*"), ) ipps_fir_init = _dispatch.ipps_function( "FIRSRInit" if rate is None else "FIRMRInit", ("void*", "int") + (("int",) if rate is None else ("int", "int", "int", "int")) + ("void*",), kernel.dtype, ) if rate is None: numpy_ipps.status = ipps_fir_getsize( self._ipps_kernel.size, ipps_type, ipps_spec_size, ipps_buffer_size, ) else: up_factor = numpy_ipps.utils.cast("int", 1) down_factor = numpy_ipps.utils.cast("int", rate[0]) numpy_ipps.status = ipps_fir_getsize( self._ipps_kernel.size, up_factor, down_factor, ipps_type, ipps_spec_size, ipps_buffer_size, ) _debug.assert_status( numpy_ipps.status, message="Get FIR size", name=__name__ ) self._ipps_mem_spec = numpy_ipps.utils.ndarray( _numpy.empty(ipps_spec_size[0], dtype=_numpy.uint8) ) self._ipps_mem_buffer = numpy_ipps.utils.ndarray( _numpy.empty(ipps_buffer_size[0], dtype=_numpy.uint8) ) _logging.getLogger(__name__).info( "FIR allocations: spec {}o -- buffer {}o.".format( ipps_spec_size[0], ipps_buffer_size[0] ) ) if rate is None: numpy_ipps.status = ipps_fir_init( self._ipps_kernel.cdata, self._ipps_kernel.size, self._ipps_method, self._ipps_mem_spec.cdata, ) else: up_phase = numpy_ipps.utils.cast("int", 0) down_phase = numpy_ipps.utils.cast("int", rate[1]) numpy_ipps.status = ipps_fir_init( self._ipps_kernel.cdata, self._ipps_kernel.size, up_factor, up_phase, down_factor, down_phase, self._ipps_mem_spec.cdata, ) _debug.assert_status( numpy_ipps.status, message="Init FIR", name=__name__ ) self._ipps_backend = _dispatch.ipps_function( "FIRSR" if rate is None else "FIRMR", ( "void*", "void*", "int", "void*", "void*", "void*", "void*", ), kernel.dtype, ) def __call__(self, src, dst): numpy_ipps.status = self._ipps_backend( src.cdata, dst.cdata, dst.size, self._ipps_mem_spec.cdata, self._ipps_dlySrc.cdata, self._ipps_dlyDst.cdata, self._ipps_mem_buffer.cdata, ) assert ( numpy_ipps.status == 0 ), "DEBUG: Bad Intel IPP Signal status {}".format(numpy_ipps.status) numpy_ipps.utils.swap_ndarray(self._ipps_dlySrc, self._ipps_dlyDst) def _numpy_backend(self, src, dst): dst.ndarray[:] = _signal.convolve( src.ndarray, self._ipps_kernel.ndarray, mode="full", method=self._numpy_method, )[self._numpy_phase : self._numpy_broadcast : self._numpy_factor]