Concurrent Processing Documentation
Documentation for thread.ConcurrentProcessing
.
Why Concurrent Processing?
Concurrent Processing is used to speed up the data processing of large datasets by splitting workflow into multiple threads.
Traditionally, this is achieved with a for loop.
my_dataset = [] # Large dataset
def my_data_processor(Data_In) -> Data_Out:
...
processed_data = []
for data in my_dataset:
processed_data = my_data_processor(data)
print(processed_data) # Processed data
While this is simple and decent enough for a small dataset, this is not ideal for large datasets, especially when runtime matters.
By using thread.ConcurrentProcessing
we can split the large dataset into multiple chunks and process each chunk simultaneously.
Concurrent Processing is not True Parallel. Learn more here.
How It Works
Determine Thread Count
The number of threads used is determined by the following formula:
thread_count = min(max_threads, len(dataset))
This ensures that the number of threads used will always be less than or equal to the length of the dataset, which prevents redundant threads to be initialized for small datasets.
Chunking
The dataset is split as evenly as possible into chunks, preserving the order of data. Chunks follow the structure:
chunks = [[1, 2, 3, ...], [50, 51, 52, ...], ...]
Let be the length of the dataset and let be the number of threads.
The individual chunk lengths decrease down the chunk list. The length of each chunk will can be either or .
The chunks generated are generators, meaning they will not take up much memory.
Importing the class
import thread
thread.ConcurrentProcessing
from thread import ConcurrentProcessing
Quick Start
There are main 2 ways of initializing a concurrent processing object.
On-Demand
You can create a simple process by initializing thread.ConcurrentProcessing
and passing the function
and dataset
.
def my_data_processor(Data_In) -> Data_Out: ...
# Recommended way
my_processor = ConcurrentProcessing(
function = my_data_processor,
dataset = [i in range(0, n)]
)
# OR
# Not the recommended way
my_processor = ConcurrentProcessing(my_data_processor, [i in range(0, n)])
It can be ran by invoking the start()
method
my_processor.start()
Decorated Function
You can decorate a function with thread.processor
which uses thread.ConcurrentProcessing
.
When the decorated function is invoked, it will automatically be ran in a new thread each time and return a thread.ConcurrentProcessing
object.
A decorated function's signature is overwritten, replacing the first argument to require a sequence of the Data_In
type.
import thread
@thread.processor
def my_target(Data_In, arg1, arg2, *, arg3: bool = False) -> Data_Out: ...
dataset: Sequence[type[Data_In]]
worker = my_target(dataset, arg1, arg2, arg3 = True) # thread.ConcurrentProcessing()
Did you know?
Decorators can take in keyword arguments that change the behavior of the thread.
import thread
@thread.processor(name = 'my_thread', suppress_errors = True)
def my_target(): ...
See the full list of arguments here
Compatibility
Data processing is usually achieved with external libraries like pandas
.
However, there is no native support for dataset objects without both of the __len__() and __getitem__() methods.
This is primarily because:
- The
__len__()
method is used to determine the length of the dataset using thelen(dataset)
method. - The
__getitem__()
method is used to access the dataset using thedataset[index]
method.
This is also why thread.ConcurrentProcessing
does not support Generator
objects or Iterator
objects out of the box.
Work-around
We now non-natively support all most dataset types.
We stopped explicitly supporting the Sequence
type and instead now use Protocol
s to check if __len__()
or __getitem__()
are implemented.
We also added context-specific optional/required _length
and _get_value
arguments when initializing thread.ConcurrentProcessing
.
You can find out more about the valid _length and get_value arguments here.
Mapping
Has __len__
and __getitem__
Does not have __len__
and __getitem__
Does not have __len__
and has __getitem__
Has __len__
and does not have __getitem__
_length Required
_get_value Required
Example
Now you do not have to pre-convert the dataset to a supported dataset type.
from thread import ConcurrentProcessing
myDataFrame: ...
process = ConcurrentProcessing(
function = lambda x: x + 1,
dataset = myDataFrame,
_length = myDataFrame.getLength(),
_get_value = lambda d, i: d.getIndex(i)
)
Static type checking will reflect the whether _length
and _get_value
are required or optional depending on the dataset type.
Initialization
This will cover the required and optional arguments initializing a concurrent process.
Required
function
(Data_In, *args, **kwargs) -> Data_Out
This should be a function that takes in a data from the dataset
with/without overloads and returns Data_Out.
Arguments and keyword arguments excluding the first argument parsed to the function
can be parsed through args
and kwargs
.
Data_Out
will be written to the generated thread's Thread._returned_value
and can be accessed via ConcurrentProcessing.results
or ConcurrentProcessing.get_return_values()
.
function
can be parsed as the first argument to ConcurrentProcessing.__init__()
, although it is recommended to use only keyword arguments.
import thread
thread.ConcurrentProcessing(lambda x: x + 1, [])
thread.ConcurrentProcessing(function = lambda x: x + 1, dataset = [])
Best Practices
While you can use a lambda function, it is best to use a normal function for your LSP/Linter to infer types.
from thread import ConcurrentProcessing
worker = ConcurrentProcessing(function = lambda x: x + 1, dataset = [1, 2, 3])
worker.start()
worker.join()
worker.results # This will be inferred as Unknown by your LSP/Linter
from thread import ConcurrentProcessing
def my_target(x: int) -> int:
return x + 1
worker = ConcurrentProcessing(function = my_target, dataset = [1, 2, 3])
worker.start()
worker.join()
worker.results # This will be inferred as a list[int]
dataset
Dataset[Data_In]
This should be an interable sequence of data parsed as the first argument to function
.
This can be of any type if you pass the according _length
and _get_value
arguments. See here for more details.
import thread
def my_function(x: int) -> int:
...
thread.ConcurrentProcessing(function = my_function, dataset = [1, 2, 3])
thread.ConcurrentProcessing(function = my_function, dataset = ('hi')) # This will be highlighted by your LSP/Linter
Optional
max_threads
int
(default: 8)
This is the maximum number of threads that will be created by thread.ConcurrentProcessing
.
This value is not always the number of threads created. See here for more details.
_get_value
(Dataset, int) -> Data_Out
(default: None)
dataset
type. See here for more details.This is invoked every time a value is retrieved from the dataset.
from thread import ConcurrentProcessing
dataset: MyDatasetType = ...
ConcurrentProcessing(
function = my_function,
dataset = dataset,
_get_value = lambda d, index: d[index],
)
ConcurrentProcessing(
function = my_function,
dataset = dataset,
_get_value = lambda d, index: d.getIndex(index),
)
_length
int | (Dataset) -> Data_In
(default: dataset.len)
dataset
type. See here for more details.This is the length of the dataset that will be processed by thread.ConcurrentProcessing
.
This is invoked only once when thread.ConcurrentProcessing
is initialized.
from thread import ConcurrentProcessing
dataset: MyDatasetType = ...
ConcurrentProcessing(
function = my_function,
dataset = dataset,
_length = 5,
)
def get_length(dataset: MyDatasetType) -> int: ...
ConcurrentProcessing(
function = my_function,
dataset = dataset,
_length = get_length,
)
*args / **kwargs
(default: None)
Any / Mapping[str, Any]
These overloads are parsed to thread.Thread.__init__()
, then threading.Thread.__init__()
.
If kwargs contain an argument named args
, then it will automatically be removed from kwargs and joined with ConcurrentProcessing.__init__().args
.
See thread.Thread
documentation for more details.
See threading
documentation (opens in a new tab) for more details.
Properties
Attributes
These are attributes of thread.ConcurrentProcessing
class.
results
List[Data_Out]
This is a list of the data that was returned by the function
in thread.ConcurrentProcessing
.
status
thread.ThreadStatus
This is the current status of the thread.
These Are The Possible Values
Methods
These are methods of thread.ConcurrentProcessing
class.
start
() -> None
This starts the processing.
Simply invoke ConcurrentProcessing.start()
on a ConcurrentProcessing object.
import thread
worker = thread.ConcurrentProcessing(function = my_func, dataset = [1, 2, 3])
worker.start()
Exceptions Raised
is_alive
() -> bool
This indicates whether the threads are still alive.
Simply invoke ConcurrentProcessing.is_alive()
on a ConcurrentProcessing object.
import thread
worker = thread.ConcurrentProcessing(function = my_func, dataset = [1, 2, 3])
worker.is_alive()
Exceptions Raised
get_return_values
() -> List[Data_Out]
This halts the current thread execution until the processing completes and returns the value returned by function
.
Simply invoke ConcurrentProcessing.get_return_values()
on a thread object.
import thread
worker = thread.ConcurrentProcessing(function = my_func, dataset = [1, 2, 3])
worker.get_return_values()
Exceptions Raised
join
(timeout: float = None) -> None
This halts the current thread execution until the ConcurrentProcessing
completes or exceeds the timeout.
A None value for timeout will have the same effect as passing float("inf")
as a timeout.
Simply invoke ConcurrentProcessing.join()
on a ConcurrentProcessing object.
import thread
worker = thread.ConcurrentProcessing(function = my_func, dataset = [1, 2, 3])
worker.join(5)
worker.join()
Exceptions Raised
kill
(yielding: bool = False, timeout: float = 5) -> bool
This schedules the threads to be killed.
If yielding is True, it halts the current thread execution until the threads are killed or the timeout is exceeded.
Similar to ConcurrentProcessing.join()
, a None value for timeout will have the same effect as passing float("inf")
as a timeout.
Simply invoke ConcurrentProcessing.kill()
on a ConcurrentProcessing object.
import thread
worker = thread.ConcurrentProcessing(function = my_func, dataset = [1, 2, 3])
worker.kill(True, 10)
worker.kill(False)
worker.kill()
Exceptions Raised
This only schedules the threads to be killed, and does not immediately kill the threads.
Meaning that if function
has a long time.wait()
call, it will only be killed after it moves onto the next line.