https://github.com/bluebrain/bluepyparallel

Provides an embarrassingly parallel tool with sql backend

https://github.com/bluebrain/bluepyparallel

Science Score: 26.0%

This score indicates how likely this project is to be science-related based on various indicators:

  • CITATION.cff file
  • codemeta.json file
    Found codemeta.json file
  • .zenodo.json file
    Found .zenodo.json file
  • DOI references
  • Academic publication links
  • Academic email domains
  • Institutional organization owner
  • JOSS paper metadata
  • Scientific vocabulary similarity
    Low similarity (12.5%) to scientific vocabulary

Keywords

parallel parallel-computing parallel-programming parallelization python python3
Last synced: 6 months ago · JSON representation

Repository

Provides an embarrassingly parallel tool with sql backend

Basic Info
Statistics
  • Stars: 1
  • Watchers: 4
  • Forks: 2
  • Open Issues: 0
  • Releases: 2
Topics
parallel parallel-computing parallel-programming parallelization python python3
Created almost 2 years ago · Last pushed 6 months ago
Metadata Files
Readme Changelog Contributing License Authors

README.md

BluePyParallel: Bluebrain Python Embarrassingly Parallel library

Provides an embarrassingly parallel tool with sql backend.

Introduction

Provides an embarrassingly parallel tool with sql backend, inspired by BluePyMM of @wvangeit.

Installation

This package should be installed using pip:

bash pip install bluepyparallel

Usage

General computation

```python

factoryname = "multiprocessing" # Can also be None, dask or ipyparallel batchsize = 10 # This value is used to split the data into batches before processing them chunk_size = 1000 # This value is used to gather the elements to process before sending them to the workers

Setup the parallel factory

parallelfactory = initparallelfactory( factoryname, batchsize=batchsize, chunksize=chunksize, processes=4, # This parameter is specific to the multiprocessing factory )

Get the mapper from the factory

mapper = parallelfactory.getmapper()

Use the mapper to map the given function to each element of mapped_data and gather the results

result = sorted(mapper(function, mappeddata, *functionargs, **function_kwargs)) ```

Working with Pandas

This library provides a specific function working with large :class:pandas.DataFrame: :func:bluepyparallel.evaluator.evaluate. This function converts the DataFrame into a list of dict (one for each row), then maps a given function to element and finally gathers the results.

Example:

```python input_df = pd.DataFrame(index=[1, 2], columns=['data'], data=[100, 200])

def evaluationfunction(row): result1, result2 = computesomething(row['data']) return {'newcolumn1': result1, 'newcolumns2': result2}

Use the mapper to map the given function to each element of the DataFrame

resultdf = evaluate( inputdf, # This is the DataFrame to process evaluationfunction, # This is the function that should be applied to each row of the DataFrame parallelfactory="multiprocessing", # This could also be a Factory previously defined newcolumns=[['newcolumn1', 0], ['newcolumns2', None]], # this defines default values for columns ) assert resultdf.columns == ['data', 'newcolumns1', 'newcolumns2'] `` It is in a way a generalisation of the pandas.apply` method.

Working with an SQL backend

As it aims at working with time consuming functions, it also provides a checkpoint and resume mechanism using a SQL backend. The SQL backend uses the SQLAlchemy library, so it can work with a large variety of database types (like SQLite, PostgreSQL, MySQL, ...). To activate this feature, just pass a URL that can be processed by SQLAlchemy to the db_url parameter of :func:bluepyparallel.evaluator.evaluate.

.. note:: A specific driver might have to be installed to access the database (like psycopg2 <https://www.psycopg.org/docs/>_ for PostgreSQL for example).

Example:

```python

Use the mapper to map the given function to each element of the DataFrame

resultdf = evaluate( inputdf, # This is the DataFrame to process evaluationfunction, # This is the function that should be applied to each row of the DataFrame parallelfactory="multiprocessing", # This could also be a Factory previously defined db_url="sqlite:///db.sql", # This could also just be "db.sql" and would be automatically turned to SQLite URL ) ```

Now, if the computation crashed for any reason, the partial result is stored in the db.sql file. If the crash was due to an external cause (therefore executing the code again should work), it is possible to resume the computation from the last computed element. Thus, only the missing elements are computed, which can save a lot of time.

Running with distributed Dask MPI on HPC systems

This is an example of a sbatch script that can be adapted to execute the script using multiple nodes and workers with distributed dask and MPI. In this example, the code called by the run.py should be parallelized using BluePyParallel.

Dask variables are not strictly required, but highly recommended, and they can be fine tuned.

```bash

!/bin/bash -l

Dask configuration

export DASKDISTRIBUTEDLOGGINGDISTRIBUTED="info" export DASKDISTRIBUTEDWORKERUSEFILELOCKING=False export DASKDISTRIBUTEDWORKERMEMORYTARGET=False # don't spill to disk export DASKDISTRIBUTEDWORKERMEMORYSPILL=False # don't spill to disk export DASK_DISTRIBUTEDWORKERMEMORYPAUSE=0.80 # pause execution at 80% memory use export DASKDISTRIBUTEDWORKERMEMORYTERMINATE=0.95 # restart the worker at 95% use export DASKDISTRIBUTEDWORKERMULTIPROCESSINGMETHOD=spawn export DASKDISTRIBUTEDWORKERDAEMON=True

Reduce dask profile memory usage/leak (see https://github.com/dask/distributed/issues/4091)

export DASKDISTRIBUTEDWORKERPROFILEINTERVAL=10000ms # Time between statistical profiling queries export DASKDISTRIBUTEDWORKERPROFILE__CYCLE=1000000ms # Time between starting new profile

Split tasks to avoid some dask errors (e.g. Event loop was unresponsive in Worker)

export PARALLELBATCHSIZE=1000

srun -v run.py ```

To ensure only the evaluate function is run with parallel dask, one has to initialise the parallel factory before anything else is done in the code. For example, run.py could look like:

python if __name__ == "__main__": parallel_factory = init_parallel_factory('dask_dataframe') df = pd.read_csv("inuput_data.csv") df = some_preprocessing(df) df = evaluate(df, function_to_evaluate, parallel_factory=parallel_factory) df.to_csv("output_data.csv")

This is because everything before init_parallel_factory will be run in parallel, as mpi is not initialized yet.

.. note:: We recommend to use dask_dataframe instead of dask, as it is in practice more stable for large computations.

Funding & Acknowledgment

The development of this software was supported by funding to the Blue Brain Project, a research center of the École polytechnique fédérale de Lausanne (EPFL), from the Swiss government’s ETH Board of the Swiss Federal Institutes of Technology.

For license and authors, see LICENSE.txt and AUTHORS.md respectively.

Copyright © 2023-2024 Blue Brain Project/EPFL

Owner

  • Name: The Blue Brain Project
  • Login: BlueBrain
  • Kind: organization
  • Email: bbp.opensource@epfl.ch
  • Location: Geneva, Switzerland

Open Source Software produced and used by the Blue Brain Project

GitHub Events

Total
  • Delete event: 2
  • Push event: 7
  • Pull request review event: 1
  • Pull request event: 5
  • Fork event: 1
  • Create event: 3
Last Year
  • Delete event: 2
  • Push event: 7
  • Pull request review event: 1
  • Pull request event: 5
  • Fork event: 1
  • Create event: 3

Issues and Pull Requests

Last synced: 6 months ago

All Time
  • Total issues: 0
  • Total pull requests: 8
  • Average time to close issues: N/A
  • Average time to close pull requests: about 18 hours
  • Total issue authors: 0
  • Total pull request authors: 3
  • Average comments per issue: 0
  • Average comments per pull request: 0.63
  • Merged pull requests: 8
  • Bot issues: 0
  • Bot pull requests: 2
Past Year
  • Issues: 0
  • Pull requests: 2
  • Average time to close issues: N/A
  • Average time to close pull requests: about 23 hours
  • Issue authors: 0
  • Pull request authors: 1
  • Average comments per issue: 0
  • Average comments per pull request: 0.0
  • Merged pull requests: 2
  • Bot issues: 0
  • Bot pull requests: 2
Top Authors
Issue Authors
Pull Request Authors
  • adrien-berchet (6)
  • dependabot[bot] (5)
  • arnaudon (3)
Top Labels
Issue Labels
Pull Request Labels
dependencies (5) github_actions (1)

Packages

  • Total packages: 1
  • Total downloads:
    • pypi 121 last-month
  • Total dependent packages: 1
  • Total dependent repositories: 0
  • Total versions: 8
  • Total maintainers: 3
pypi.org: bluepyparallel

Provides an embarrassingly parallel tool with sql backend.

  • Versions: 8
  • Dependent Packages: 1
  • Dependent Repositories: 0
  • Downloads: 121 Last month
Rankings
Dependent packages count: 9.8%
Average: 37.1%
Dependent repos count: 64.4%
Last synced: 6 months ago

Dependencies

.github/workflows/commitlint.yml actions
  • actions/checkout v4 composite
  • actions/setup-node v4 composite
.github/workflows/publish-sdist.yml actions
  • actions/checkout v4 composite
  • actions/setup-python v5 composite
  • pypa/gh-action-pypi-publish release/v1 composite
.github/workflows/run-tox.yml actions
  • actions/cache v4 composite
  • actions/checkout v4 composite
  • actions/setup-python v5 composite
  • actions/upload-artifact v4 composite
  • codecov/codecov-action v4 composite
  • mikepenz/action-junit-report v4 composite
package.json npm
pyproject.toml pypi
  • dask [dataframe, distributed]>=2021.11
  • dask-mpi >=2021.11
  • distributed >=2021.11
  • ipyparallel >=6.3
  • packaging >=20
  • pandas >=1.3
  • sqlalchemy >=1.4.24
  • sqlalchemy <2; python_version<'3.8'
  • sqlalchemy-utils >=0.37.2
  • tqdm >=3.7
setup.py pypi