Source code for pyvx.vx

"""
:mod:`pyvx.vx` --- C-like Python API
==========================================

The functions specified by the `OpenVX`_ standard are provided in form of two
modules,  :mod:`pyvx.vx` that provide the vxXxxfunctions and :class:`pyvx.vxu`
that provide the vxuXxx functions. Pleaserefer to the `OpenVX speficication`_
for a description of the API. The modulenames vx and vxu is used instead of a
vx/vxu prefix on all symbols. The initialexample on page 12 of the
specification would in python look like this:

.. code-block:: python

    from pyvx import vx

    context = vx.CreateContext()
    images = [
        vx.CreateImage(context, 640, 480, vx.DF_IMAGE_UYVY),
        vx.CreateImage(context, 640, 480, vx.DF_IMAGE_S16),
        vx.CreateImage(context, 640, 480, vx.DF_IMAGE_U8),
    ]
    graph = vx.CreateGraph(context)
    virts = [
        vx.CreateVirtualImage(graph, 0, 0, vx.DF_IMAGE_VIRT),
        vx.CreateVirtualImage(graph, 0, 0, vx.DF_IMAGE_VIRT),
        vx.CreateVirtualImage(graph, 0, 0, vx.DF_IMAGE_VIRT),
        vx.CreateVirtualImage(graph, 0, 0, vx.DF_IMAGE_VIRT),
    ]
    vx.ChannelExtractNode(graph, images[0], vx.CHANNEL_Y, virts[0])
    vx.Gaussian3x3Node(graph, virts[0], virts[1])
    vx.Sobel3x3Node(graph, virts[1], virts[2], virts[3])
    vx.MagnitudeNode(graph, virts[2], virts[3], images[1])
    vx.PhaseNode(graph, virts[2], virts[3], images[2])
    status = vx.VerifyGraph(graph)
    print status
    if status == vx.SUCCESS:
        status = vx.ProcessGraph(graph)
    else:
        print("Verification failed.")
    vx.ReleaseContext(context)

For a compact example on how to call all the functions in the API check
out `test_vx.py`_.

.. _`OpenVX`: https://www.khronos.org/openvx
.. _`OpenVX speficication`: https://www.khronos.org/registry/vx/specs/OpenVX_1.0_Provisional_Specifications.zip
.. _`test_vx.py`: https://github.com/hakanardo/pyvx/tree/master/test/test_vx.py

The API is kept as
close as possible to the C API, but the few changes listed below were
made. Mostly due to the usage of pointers in C.

    * The vx prefix is removed for each function name. The module name
      forms a similar role in python.

    * The *ReleaseXxx* and *RemoveNode* functions take a normal object (as
      returned by the
      corresponding CreateXxx) as argument and not a pointer to a pointer.

    * Out arguments passed in as pointers are returned instead. The
      returned tuple will contain the original return value as it's
      first value and following it, the output arguments in the same
      order as they apear in the C signature.

    * In/Out arguemnts are passed in as values and then returned in the
      same manner as the out arguments.

    * Any python object implementing the buffer interface can be passed
      instead of pointers to blocks of data. This includes both
      *array.array* and *numpy.ndarray* objects.

    * Python buffer objects are returned instead of pointers to blocks
      of data.

    * *QueryXxx* functions have the signature
        .. code-block:: python

            (status, value) = vx.QueryXxx(context, attribute, c_type, python_type=None)

      where *c_type* is a string specifying the type of the attribute,
      for example "vx_uint32", and *python_type* can be set to *str* for
      string-valued attributes.

    * *SetXxxAttribute* functions have the signature
        .. code-block:: python

            status = vx.SetXxxAttribute(context, attribute, value, c_type=None)

      where *c_type* is a string specifying the type of the attribute,
      for example "vx_uint32".

    * *CreateUniformImage* have the signature
        .. code-block:: python

            image = vx.CreateUniformImage(context, width, height, color, value, c_type)

        where value is a python *int* and *c_type* a string specifying
        it's type. For example "vx_uint32".

    * Normal python functions can be used instead of function pointers.

    * *LoadKernels* can load python modules if it is passed a string that
      is the name of an importable python module. In that case it will
      import *PublishKernels* from it and call
      *PublishKernels(context)*.

    * *CreateScalar* and *WriteScalarValue* take a python int as value.

    * Objects are not implicitly casted to/from references. Use
      :func:`pyvx.vx.reference` and :func:`pyvx.vx.from_reference` instead.

    * The typedefed structures called vx_xxx_t can be allocated using
      vx.xxx_t(...). See below.

"""

from weakref import WeakKeyDictionary
import sys

from pyvx.types import *

keep_alive = WeakKeyDictionary()

_reference_types = {ffi.typeof(s)
                    for s in ['vx_context', 'vx_image', 'vx_graph', 'vx_node', 'vx_scalar',
                              'vx_delay', 'vx_lut', 'vx_distribution', 'vx_threshold', 'vx_kernel',
                              'vx_matrix', 'vx_convolution', 'vx_pyramid', 'vx_remap', 'vx_array',
                              'vx_parameter', 'vx_reference']}

def _get_attribute(func, ref, attribute, c_type, python_type):
    if ffi.typeof(c_type).kind != 'array':
        val = ffi.new(c_type + '*')
        status = func(ref, attribute, val, ffi.sizeof(c_type))
        val = val[0]
    else:
        val = ffi.new(c_type)
        status = func(ref, attribute, val, ffi.sizeof(c_type))

    if python_type is str:
        val = ffi.string(val).decode("utf8")
    elif python_type is not None:
        val = python_type(val)

    return status, val

def _set_attribute(func, ref, attribute, value, c_type):
    if c_type is not None:
        assert ffi.typeof(c_type).kind == 'primitive'
        value = ffi.new(c_type + '*', value)
    s = ffi.sizeof(ffi.typeof(value).item)
    return func(ref, attribute, value, s)

def _enum2ctype(data_type):
    data_type_name = ffi.string(ffi.cast("enum vx_type_e", data_type))
    assert data_type_name.startswith('VX_TYPE_')
    return 'vx_' + data_type_name[8:].lower()

def _callback(ctype, callback, parent, error):
    callback = ffi.callback(ctype, error=error)(callback)
    keep_alive.setdefault(parent, []).append(callback)
    return callback

# CONTEXT
def ReleaseContext(context):
    c = ffi.new('vx_context *', context)
    return lib.vxReleaseContext(c)

def QueryContext(context, attribute, c_type, python_type=None):
    return _get_attribute(lib.vxQueryContext, context, attribute, c_type, python_type)

def SetContextAttribute(context, attribute, value, c_type=None):
    return _set_attribute(lib.vxSetContextAttribute, context, attribute, value, c_type)


# IMAGE

def ReleaseImage(image):
    ref = ffi.new('vx_image *', image)
    return lib.vxReleaseImage(ref)

def QueryImage(image, attribute, c_type, python_type=None):
    return _get_attribute(lib.vxQueryImage, image, attribute, c_type, python_type)

def SetImageAttribute(image, attribute, value, c_type=None):
    return _set_attribute(lib.vxSetImageAttribute, image, attribute, value, c_type)

def CreateUniformImage(context, width, height, color, value, c_type):
    if ffi.typeof(c_type).kind != 'array':
        c_type += '*'
    value = ffi.new(c_type, value)
    return lib.vxCreateUniformImage(context, width, height, color, value)

def CreateImageFromHandle(context, color, addrs, ptrs, import_type):
    if not isinstance(addrs, (tuple, list)):
        addrs = (addrs,)
    if not isinstance(ptrs, (tuple, list)):
        ptrs = (ptrs,)

    addrs = ffi.new('vx_imagepatch_addressing_t[]', [a[0] for a in addrs])
    ptrs = ffi.new('void *[]', [ffi.from_buffer(p) for p in ptrs])
    return lib.vxCreateImageFromHandle(context, color, addrs, ptrs, import_type)

def AccessImagePatch(image, rect, plane_index, addr, ptr, usage):
    if addr is None:
        addr = ffi.new('vx_imagepatch_addressing_t *')
    if ptr is not None:
        ptr = ffi.from_buffer(ptr)
    ptr_p = ffi.new('void **', ptr)
    size = ComputeImagePatchSize(image, rect, plane_index)
    status = lib.vxAccessImagePatch(image, rect, plane_index, addr, ptr_p, usage)
    return status, addr, ffi.buffer(ptr_p[0], size)

def CommitImagePatch(image, rect, plane_index, addr, ptr):
    ptr = ffi.from_buffer(ptr)
    return lib.vxCommitImagePatch(image, rect, plane_index, addr, ptr)

def FormatImagePatchAddress1d(ptr, index, addr):
    ptr = ffi.from_buffer(ptr)
    p = lib.vxFormatImagePatchAddress1d(ptr, index, addr)
    return ffi.buffer(p, addr.stride_x)

def FormatImagePatchAddress2d(ptr, x, y, addr):
    ptr = ffi.from_buffer(ptr)
    p = lib.vxFormatImagePatchAddress2d(ptr, x, y, addr)
    return ffi.buffer(p, addr.stride_x)

def GetValidRegionImage(image):
    rect = rectangle_t(0,0,0,0)
    status = lib.vxGetValidRegionImage(image, rect)
    return status, rect


# KERNEL

def ReleaseKernel(kernel):
    ref = ffi.new('vx_kernel *', kernel)
    return lib.vxReleaseKernel(ref)

def QueryKernel(kernel, attribute, c_type, python_type=None):
    return _get_attribute(lib.vxQueryKernel, kernel, attribute, c_type, python_type)

def SetKernelAttribute(kernel, attribute, value, c_type=None):
    return _set_attribute(lib.vxSetKernelAttribute, kernel, attribute, value, c_type)

def AddKernel(context, name, enumeration, func_ptr, numParams, input, output, init, deinit):
    func_ptr = _callback("vx_kernel_f", func_ptr, context, FAILURE)
    input = _callback("vx_kernel_input_validate_f", input, context, FAILURE)
    output = _callback("vx_kernel_output_validate_f", output, context, FAILURE)
    if init is None:
        init = ffi.NULL
    else:
        init = _callback("vx_kernel_initialize_f", init, context, FAILURE)
    if deinit is None:
        deinit = ffi.NULL
    else:
        deinit = _callback("vx_kernel_deinitialize_f", deinit, context, FAILURE)
    return lib.vxAddKernel(context, name, enumeration, func_ptr, numParams, input, output, init, deinit)

def SetMetaFormatAttribute(meta, attribute, value, c_type=None):
    return _set_attribute(lib.vxSetMetaFormatAttribute, meta, attribute, value, c_type)

def LoadKernels(context, module):
    if sys.version_info > (3,) and not isinstance(module, bytes):
        s = lib.vxLoadKernels(context, bytes(module, "utf8"))
    else:
        s = lib.vxLoadKernels(context, module)
    if s == SUCCESS:
        return s
    try:
        d = {}
        exec("import %s as mod" % module, d)
        mod = d['mod']
    except ImportError:
        return FAILURE
    return mod.PublishKernels(context)


# GRAPH

def ReleaseGraph(graph):
    ref = ffi.new('vx_graph *', graph)
    return lib.vxReleaseGraph(ref)

def QueryGraph(graph, attribute, c_type, python_type=None):
    return _get_attribute(lib.vxQueryGraph, graph, attribute, c_type, python_type)

def SetGraphAttribute(graph, attribute, value, c_type=None):
    return _set_attribute(lib.vxSetGraphAttribute, graph, attribute, value, c_type)


# NODE

def ReleaseNode(node):
    ref = ffi.new('vx_node *', node)
    return lib.vxReleaseNode(ref)

def QueryNode(node, attribute, c_type, python_type=None):
    return _get_attribute(lib.vxQueryNode, node, attribute, c_type, python_type)

def SetNodeAttribute(node, attribute, value, c_type=None):
    return _set_attribute(lib.vxSetNodeAttribute, node, attribute, value, c_type)

def RemoveNode(node):
    ref = ffi.new('vx_node *', node)
    return lib.vxReleaseNode(ref)

def AssignNodeCallback(node, callback):
    if callback is not None:
        callback = _callback("vx_nodecomplete_f", callback, node, ACTION_ABANDON)
    else:
        callback = ffi.NULL
    return lib.vxAssignNodeCallback(node, callback)


# PARAMETER

def ReleaseParameter(parameter):
    ref = ffi.new('vx_parameter *', parameter)
    return lib.vxReleaseParameter(ref)

def QueryParameter(parameter, attribute, c_type, python_type=None):
    return _get_attribute(lib.vxQueryParameter, parameter, attribute, c_type, python_type)


# SCALAR

def CreateScalar(context, data_type, value):
    ptr = ffi.new(_enum2ctype(data_type) + '*', value)
    return lib.vxCreateScalar(context, data_type, ptr)

def ReleaseScalar(scalar):
    ref = ffi.new('vx_scalar *', scalar)
    return lib.vxReleaseScalar(ref)

def QueryScalar(scalar, attribute, c_type, python_type=None):
    return _get_attribute(lib.vxQueryScalar, scalar, attribute, c_type, python_type)

def ReadScalarValue(scalar):
    s, data_type = QueryScalar(scalar, SCALAR_ATTRIBUTE_TYPE, "vx_enum")
    ptr = ffi.new(_enum2ctype(data_type) + '*')
    s = lib.vxReadScalarValue(scalar, ptr)
    return s, ptr[0]

def WriteScalarValue(scalar, value):
    s, data_type = QueryScalar(scalar, SCALAR_ATTRIBUTE_TYPE, "vx_enum")
    ptr = ffi.new(_enum2ctype(data_type) + '*', value)
    return lib.vxWriteScalarValue(scalar, ptr)


# REFERENCE

[docs]def reference(reference): """ Cast the object *reference* into a "vx_reference" object. """ if ffi.typeof(reference) not in _reference_types: raise TypeError("Can't cast %r to vx_reference" % reference) return ffi.cast('vx_reference', reference)
[docs]def from_reference(ref): """ Cast the "vx_reference" object *ref* into it's specific type (i.e. "vx_image" or "vx_graqph" or ...). """ s, data_type = QueryReference(ref, REF_ATTRIBUTE_TYPE, 'vx_enum') return ffi.cast(_enum2ctype(data_type), ref)
def QueryReference(reference, attribute, c_type, python_type=None): return _get_attribute(lib.vxQueryReference, reference, attribute, c_type, python_type) # DELAY def ReleaseDelay(delay): ref = ffi.new('vx_delay *', delay) return lib.vxReleaseDelay(ref) def QueryDelay(delay, attribute, c_type, python_type=None): return _get_attribute(lib.vxQueryDelay, delay, attribute, c_type, python_type) # LOGGING def RegisterLogCallback(context, callback, reentrant): def wrapper(context, ref, status, string): callback(context, ref, status, ffi.string(string)) cb = _callback('vx_log_callback_f', wrapper, context, None) lib.vxRegisterLogCallback(context, cb, reentrant) # LUT def ReleaseLUT(lut): ref = ffi.new('vx_lut *', lut) return lib.vxReleaseLUT(ref) def QueryLUT(lut, attribute, c_type, python_type=None): return _get_attribute(lib.vxQueryLUT, lut, attribute, c_type, python_type) def AccessLUT(lut, ptr, usage): if ptr is not None: ptr = ffi.from_buffer(ptr) ptr_p = ffi.new('void **', ptr) status = lib.vxAccessLUT(lut, ptr_p, usage) _, size = QueryLUT(lut, LUT_ATTRIBUTE_COUNT, 'vx_size') return (status, ffi.buffer(ptr_p[0], size)) def CommitLUT(lut, ptr): ptr = ffi.from_buffer(ptr) return lib.vxCommitLUT(lut, ptr) # DISTRIBUTION def ReleaseDistribution(distribution): ref = ffi.new('vx_distribution *', distribution) return lib.vxReleaseDistribution(ref) def QueryDistribution(distribution, attribute, c_type, python_type=None): return _get_attribute(lib.vxQueryDistribution, distribution, attribute, c_type, python_type) def AccessDistribution(distribution, ptr, usage): if ptr is not None: ptr = ffi.from_buffer(ptr) ptr_p = ffi.new('void **', ptr) status = lib.vxAccessDistribution(distribution, ptr_p, usage) _, size = QueryDistribution(distribution, DISTRIBUTION_ATTRIBUTE_SIZE, 'vx_size') return (status, ffi.buffer(ptr_p[0], size)) def CommitDistribution(distribution, ptr): ptr = ffi.from_buffer(ptr) return lib.vxCommitDistribution(distribution, ptr) # THRESHOLD def ReleaseThreshold(threshold): ref = ffi.new('vx_threshold *', threshold) return lib.vxReleaseThreshold(ref) def QueryThreshold(threshold, attribute, c_type, python_type=None): return _get_attribute(lib.vxQueryThreshold, threshold, attribute, c_type, python_type) def SetThresholdAttribute(threshold, attribute, value, c_type=None): return _set_attribute(lib.vxSetThresholdAttribute, threshold, attribute, value, c_type) # MATRIX def ReleaseMatrix(matrix): ref = ffi.new('vx_matrix *', matrix) return lib.vxReleaseMatrix(ref) def QueryMatrix(matrix, attribute, c_type, python_type=None): return _get_attribute(lib.vxQueryMatrix, matrix, attribute, c_type, python_type) def ReadMatrix(mat, array): array = ffi.from_buffer(array) return lib.vxReadMatrix(mat, array) def WriteMatrix(mat, array): array = ffi.from_buffer(array) return lib.vxWriteMatrix(mat, array) # CONVOLUTION def ReleaseConvolution(convolution): ref = ffi.new('vx_convolution *', convolution) return lib.vxReleaseConvolution(ref) def QueryConvolution(convolution, attribute, c_type, python_type=None): return _get_attribute(lib.vxQueryConvolution, convolution, attribute, c_type, python_type) def SetConvolutionAttribute(convolution, attribute, value, c_type=None): return _set_attribute(lib.vxSetConvolutionAttribute, convolution, attribute, value, c_type) def WriteConvolutionCoefficients(conv, array): array = ffi.from_buffer(array) return lib.vxWriteConvolutionCoefficients(conv, array) def ReadConvolutionCoefficients(conv, array): array = ffi.from_buffer(array) return lib.vxReadConvolutionCoefficients(conv, array) # PYRAMID def ReleasePyramid(pyramid): ref = ffi.new('vx_pyramid *', pyramid) return lib.vxReleasePyramid(ref) def QueryPyramid(pyramid, attribute, c_type, python_type=None): return _get_attribute(lib.vxQueryPyramid, pyramid, attribute, c_type, python_type) # REMAP def ReleaseRemap(remap): ref = ffi.new('vx_remap *', remap) return lib.vxReleaseRemap(ref) def QueryRemap(remap, attribute, c_type, python_type=None): return _get_attribute(lib.vxQueryRemap, remap, attribute, c_type, python_type) def GetRemapPoint(table, dst_x, dst_y): src_x = ffi.new('vx_float32 *') src_y = ffi.new('vx_float32 *') status = lib.vxGetRemapPoint(table, dst_x, dst_y, src_x, src_y) return status, src_x[0], src_y[0] # ARRAY def ReleaseArray(array): ref = ffi.new('vx_array *', array) return lib.vxReleaseArray(ref) def QueryArray(array, attribute, c_type, python_type=None): return _get_attribute(lib.vxQueryArray, array, attribute, c_type, python_type) def FormatArrayPointer(ptr, index, stride): if sys.version_info > (3,): return memoryview(ptr)[index * stride:] else: return buffer(ptr, index * stride) def ArrayItem(type, ptr, index, stride): return ffi.cast(type + '*', ffi.from_buffer(FormatArrayPointer(ptr, index, stride))) def AddArrayItems(arr, count, ptr, stride): if not isinstance(ptr, ffi.CData): ptr = ffi.from_buffer(ptr) return lib.vxAddArrayItems(arr, count, ptr, stride) def AccessArrayRange(arr, start, end, stride, ptr, usage): if ptr is not None: ptr = ffi.from_buffer(ptr) ptr_p = ffi.new('void **', ptr) stride_p = ffi.new('vx_size *', stride) status = lib.vxAccessArrayRange(arr, start, end, stride_p, ptr_p, usage) _, item_size = QueryArray(arr, ARRAY_ATTRIBUTE_ITEMSIZE, 'vx_size') return (status, stride_p[0], ffi.buffer(ptr_p[0], item_size * (end - start + 1))) def CommitArrayRange(arr, start, end, ptr): ptr = ffi.from_buffer(ptr) return lib.vxCommitArrayRange(arr, start, end, ptr)