Source code for numpy_ipps.conversion

"""Arithmetic Integer Functions."""
import numpy as _numpy

import numpy_ipps._detail.dispatch as _dispatch
import numpy_ipps._detail.metaclass.selector as _selector
import numpy_ipps.policies
import numpy_ipps.utils


[docs]class SwapBytes: """SwapBytes Function.""" __slots__ = ("_ipps_backend",) dtype_candidates = ( _numpy.int16, _numpy.uint16, _numpy.int32, _numpy.uint32, _numpy.int64, _numpy.uint64, _numpy.float32, _numpy.float64, ) _ipps_kind = _selector.Kind.UNARY def __init__(self, dtype, size=None): self._ipps_backend = _dispatch.ipps_function( "SwapBytes", ( "void*", "void*", "int", ), dtype, policies=numpy_ipps.policies.Policies( bytes1=numpy_ipps.policies.TagPolicy.UNSIGNED, bytes2=numpy_ipps.policies.TagPolicy.UNSIGNED, bytes4=numpy_ipps.policies.TagPolicy.UNSIGNED, bytes8=numpy_ipps.policies.TagPolicy.UNSIGNED, ), ) def __call__(self, src, dst): assert src.size <= dst.size, "src and dst size not compatible." numpy_ipps.status = self._ipps_backend(src.cdata, dst.cdata, src.size) assert ( numpy_ipps.status == 0 ), "DEBUG: Bad Intel IPP Signal status {}".format(numpy_ipps.status) def _numpy_backend(self, src, dst): dst.ndarray[:] = src.ndarray.byteswap()
[docs]class SwapBytes_I: """SwapBytes_I Function.""" __slots__ = ("_ipps_backend",) dtype_candidates = ( _numpy.int16, _numpy.uint16, _numpy.int32, _numpy.uint32, _numpy.int64, _numpy.uint64, _numpy.float32, _numpy.float64, ) _ipps_kind = _selector.Kind.UNARY_I def __init__(self, dtype, size=None): self._ipps_backend = _dispatch.ipps_function( "SwapBytes_I", ( "void*", "int", ), dtype, policies=numpy_ipps.policies.Policies( bytes1=numpy_ipps.policies.TagPolicy.UNSIGNED, bytes2=numpy_ipps.policies.TagPolicy.UNSIGNED, bytes4=numpy_ipps.policies.TagPolicy.UNSIGNED, bytes8=numpy_ipps.policies.TagPolicy.UNSIGNED, ), ) def __call__(self, src_dst): numpy_ipps.status = self._ipps_backend(src_dst.cdata, src_dst.size) assert ( numpy_ipps.status == 0 ), "DEBUG: Bad Intel IPP Signal status {}".format(numpy_ipps.status) def _numpy_backend(self, src_dst): src_dst.ndarray[:] = src_dst.ndarray.byteswap(True)
[docs]class Flip: """Flip Function.""" __slots__ = ("_ipps_backend",) dtype_candidates = numpy_ipps.policies.default_candidates _ipps_kind = _selector.Kind.UNARY def __init__(self, dtype, size=None): self._ipps_backend = _dispatch.ipps_function( "Flip", ( "void*", "void*", "int", ), dtype, policies=numpy_ipps.policies.Policies( bytes1=numpy_ipps.policies.TagPolicy.UNSIGNED, bytes2=numpy_ipps.policies.TagPolicy.UNSIGNED, bytes4=numpy_ipps.policies.TagPolicy.FLOAT, bytes8=numpy_ipps.policies.TagPolicy.FLOAT, ), ) def __call__(self, src, dst): assert src.size <= dst.size, "src and dst size not compatible." numpy_ipps.status = self._ipps_backend(src.cdata, dst.cdata, src.size) assert ( numpy_ipps.status == 0 ), "DEBUG: Bad Intel IPP Signal status {}".format(numpy_ipps.status) def _numpy_backend(self, src, dst): dst.ndarray[:] = _numpy.flip(src.ndarray)
[docs]class Flip_I: """Flip_I Function.""" __slots__ = ("_ipps_backend",) dtype_candidates = numpy_ipps.policies.default_candidates _ipps_kind = _selector.Kind.UNARY_I def __init__(self, dtype, size=None): self._ipps_backend = _dispatch.ipps_function( "Flip_I", ( "void*", "int", ), dtype, policies=numpy_ipps.policies.Policies( bytes1=numpy_ipps.policies.TagPolicy.UNSIGNED, bytes2=numpy_ipps.policies.TagPolicy.UNSIGNED, bytes4=numpy_ipps.policies.TagPolicy.FLOAT, bytes8=numpy_ipps.policies.TagPolicy.FLOAT, ), ) def __call__(self, src_dst): numpy_ipps.status = self._ipps_backend(src_dst.cdata, src_dst.size) assert ( numpy_ipps.status == 0 ), "DEBUG: Bad Intel IPP Signal status {}".format(numpy_ipps.status) def _numpy_backend(self, src_dst): src_dst.ndarray[:] = _numpy.flip(src_dst.ndarray)
[docs]class Convert: """Conver Function.""" __slots__ = ( "_ipps_backend", "_dst_dtype", ) dtype_candidates = ( (_numpy.int8, _numpy.int16), (_numpy.int8, _numpy.float32), (_numpy.uint8, _numpy.float32), (_numpy.int16, _numpy.int32), (_numpy.int16, _numpy.float32), (_numpy.uint16, _numpy.float32), (_numpy.int32, _numpy.float64), (_numpy.float32, _numpy.float64), ) _ipps_kind = _selector.Kind.UNARY def __init__(self, dtype_src, dtype_dst, size=None): self._dst_dtype = dtype_dst self._ipps_backend = _dispatch.ipps_function( "Convert", ( "void*", "void*", "int", ), dtype_src, dtype_dst, ) def __call__(self, src, dst): assert src.size <= dst.size, "src and dst size not compatible." numpy_ipps.status = self._ipps_backend(src.cdata, dst.cdata, src.size) assert ( numpy_ipps.status == 0 ), "DEBUG: Bad Intel IPP Signal status {}".format(numpy_ipps.status) def _numpy_backend(self, src, dst): dst.ndarray[:] = src.ndarray.astype(self._dst_dtype, casting="unsafe")
[docs]class Threshold: """Threshold Function.""" __slots__ = ( "_ipps_backend", "_ipps_level", "_dtype", "_ge_buffer", ) dtype_candidates = ( _numpy.int16, _numpy.int32, _numpy.float32, _numpy.float64, ) _ipps_kind = _selector.Kind.UNARY def __init__(self, dtype, level, size=None): self._dtype = dtype self._ipps_level = numpy_ipps.utils.cast( _dispatch.as_ctype_str(dtype), level ) self._ipps_backend = _dispatch.ipps_function( "Threshold-LTAbs", ( "void*", "void*", "int", _dispatch.as_ctype_str(dtype), ), dtype, ) def __call__(self, src, dst): assert src.size <= dst.size, "src and dst size not compatible." numpy_ipps.status = self._ipps_backend( src.cdata, dst.cdata, src.size, self._ipps_level ) assert ( numpy_ipps.status == 0 ), "DEBUG: Bad Intel IPP Signal status {}".format(numpy_ipps.status) def _numpy_backend(self, src, dst): _numpy.absolute(src.ndarray, dst.ndarray, casting="unsafe") dst.ndarray[:] = _numpy.where( dst.ndarray >= self._dtype(self._ipps_level), src.ndarray, _numpy.sign(src.ndarray, casting="unsafe") * self._dtype(self._ipps_level), )
[docs]class Threshold_I: """Threshold Function.""" __slots__ = ( "_ipps_backend", "_ipps_level", "_dtype", ) dtype_candidates = ( _numpy.int16, _numpy.int32, _numpy.float32, _numpy.float64, ) _ipps_kind = _selector.Kind.UNARY_I def __init__(self, dtype, level, size=None): self._dtype = dtype self._ipps_level = numpy_ipps.utils.cast( _dispatch.as_ctype_str(dtype), level ) self._ipps_backend = _dispatch.ipps_function( "Threshold-LTAbs_I", ( "void*", "int", _dispatch.as_ctype_str(dtype), ), dtype, ) def __call__(self, src_dst): numpy_ipps.status = self._ipps_backend( src_dst.cdata, src_dst.size, self._ipps_level ) assert ( numpy_ipps.status == 0 ), "DEBUG: Bad Intel IPP Signal status {}".format(numpy_ipps.status) def _numpy_backend(self, src_dst): src_dst.ndarray[:] = _numpy.where( _numpy.absolute(src_dst.ndarray, casting="unsafe") >= self._dtype(self._ipps_level), src_dst.ndarray, _numpy.sign(src_dst.ndarray, casting="unsafe") * self._dtype(self._ipps_level), )