Skip to content

Mp utils

tablite.mp_utils

Attributes

tablite.mp_utils.lookup_ops = {'in': _in, 'not in': not_in, '<': operator.lt, '<=': operator.le, '>': operator.gt, '>=': operator.ge, '!=': operator.ne, '==': operator.eq} module-attribute

tablite.mp_utils.filter_ops = {'>': operator.gt, '>=': operator.ge, '==': operator.eq, '<': operator.lt, '<=': operator.le, '!=': operator.ne, 'in': _in} module-attribute

tablite.mp_utils.filter_ops_from_text = {'gt': '>', 'gteq': '>=', 'eq': '==', 'lt': '<', 'lteq': '<=', 'neq': '!=', 'in': _in} module-attribute

Classes

Functions

tablite.mp_utils.not_in(a, b)

Source code in tablite/mp_utils.py
 9
10
def not_in(a, b):
    return not operator.contains(str(a), str(b))

tablite.mp_utils.is_mp(fields: int) -> bool

PARAMETER DESCRIPTION
fields

number of fields

TYPE: int

RETURNS DESCRIPTION
bool

bool

Source code in tablite/mp_utils.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def is_mp(fields: int) -> bool:
    """

    Args:
        fields (int): number of fields

    Returns:
        bool
    """
    if Config.MULTIPROCESSING_MODE == Config.FORCE:
        return True

    if Config.MULTIPROCESSING_MODE == Config.FALSE:
        return False

    if fields < Config.SINGLE_PROCESSING_LIMIT:
        return False

    if max(psutil.cpu_count(logical=False), 1) < 2:
        return False

    return True

tablite.mp_utils.select_processing_method(fields, sp, mp)

PARAMETER DESCRIPTION
fields

number of fields

TYPE: int

sp

method for single processing

TYPE: callable

mp

method for multiprocessing

TYPE: callable

RETURNS DESCRIPTION
_type_

description

Source code in tablite/mp_utils.py
69
70
71
72
73
74
75
76
77
78
79
80
def select_processing_method(fields, sp, mp):
    """

    Args:
        fields (int): number of fields
        sp (callable): method for single processing
        mp (callable): method for multiprocessing

    Returns:
        _type_: _description_
    """
    return mp if is_mp(fields) else sp

tablite.mp_utils.maskify(arr)

Source code in tablite/mp_utils.py
82
83
84
85
86
87
88
89
90
def maskify(arr):
    none_mask = [False] * len(arr)  # Setting the default

    for i in range(len(arr)):
        if arr[i] is None:  # Check if our value is None
            none_mask[i] = True
            arr[i] = 0  # Remove None from the original array

    return none_mask

tablite.mp_utils.share_mem(inp_arr, dtype)

Source code in tablite/mp_utils.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
def share_mem(inp_arr, dtype):
    len_ = len(inp_arr)
    size = np.dtype(dtype).itemsize * len_
    shape = (len_,)

    out_shm = shared_memory.SharedMemory(create=True, size=size)  # the co_processors will read this.
    out_arr_index = np.ndarray(shape, dtype=dtype, buffer=out_shm.buf)
    out_arr_index[:] = inp_arr

    return out_arr_index, out_shm

tablite.mp_utils.map_task(data_shm_name, index_shm_name, destination_shm_name, shape, dtype, start, end)

Source code in tablite/mp_utils.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def map_task(data_shm_name, index_shm_name, destination_shm_name, shape, dtype, start, end):
    # connect
    shared_data = shared_memory.SharedMemory(name=data_shm_name)
    data = np.ndarray(shape, dtype=dtype, buffer=shared_data.buf)

    shared_index = shared_memory.SharedMemory(name=index_shm_name)
    index = np.ndarray(shape, dtype=np.int64, buffer=shared_index.buf)

    shared_target = shared_memory.SharedMemory(name=destination_shm_name)
    target = np.ndarray(shape, dtype=dtype, buffer=shared_target.buf)
    # work
    target[start:end] = np.take(data[start:end], index[start:end])
    # disconnect
    shared_data.close()
    shared_index.close()
    shared_target.close()

tablite.mp_utils.reindex_task(src, dst, index_shm, shm_shape, start, end)

Source code in tablite/mp_utils.py
123
124
125
126
127
128
129
130
131
132
def reindex_task(src, dst, index_shm, shm_shape, start, end):
    # connect
    existing_shm = shared_memory.SharedMemory(name=index_shm)
    shared_index = np.ndarray(shm_shape, dtype=np.int64, buffer=existing_shm.buf)
    # work
    array = load_numpy(src)
    new = np.take(array, shared_index[start:end])
    np.save(dst, new, allow_pickle=True, fix_imports=False)
    # disconnect
    existing_shm.close()