Elephas

Elephas: Distributed Deep Learning with Keras & Spark - Published in JOSS (2022)

https://github.com/danielenricocahall/elephas

Science Score: 93.0%

This score indicates how likely this project is to be science-related based on various indicators:

  • CITATION.cff file
  • codemeta.json file
    Found codemeta.json file
  • .zenodo.json file
    Found .zenodo.json file
  • DOI references
    Found 1 DOI reference(s) in JOSS metadata
  • Academic publication links
    Links to: arxiv.org
  • Committers with academic emails
  • Institutional organization owner
  • JOSS paper metadata
    Published in Journal of Open Source Software

Keywords from Contributors

mesh

Scientific Fields

Artificial Intelligence and Machine Learning Computer Science - 55% confidence
Mathematics Computer Science - 43% confidence
Engineering Computer Science - 40% confidence
Last synced: 4 months ago · JSON representation

Repository

Distributed Deep learning with Keras & Spark

Basic Info
  • Host: GitHub
  • Owner: danielenricocahall
  • License: mit
  • Language: Python
  • Default Branch: master
  • Homepage:
  • Size: 6.95 MB
Statistics
  • Stars: 20
  • Watchers: 0
  • Forks: 7
  • Open Issues: 9
  • Releases: 19
Fork of maxpumperla/elephas
Created about 5 years ago · Last pushed 5 months ago
Metadata Files
Readme Contributing Funding License

README.md

Elephas: Distributed Deep Learning with Keras & Spark

Elephas

Build Status license Supported Versions

Elephas is an extension of Keras, which allows you to run distributed deep learning models at scale with Spark. Elephas currently supports a number of applications, including:

Schematically, elephas works as follows.

Elephas

Table of content: * Elephas: Distributed Deep Learning with Keras & Spark * Introduction * Getting started * Basic Spark integration * Distributed Inference and Evaluation * Spark MLlib integration * Spark ML integration * Hadoop integration * Distributed hyper-parameter optimization * Distributed training of ensemble models * Discussion * Literature

Introduction

Elephas brings deep learning with Keras to Spark. Elephas intends to keep the simplicity and high usability of Keras, thereby allowing for fast prototyping of distributed models, which can be run on massive data sets. For an introductory example, see the following iPython notebook.

ἐλέφας is Greek for ivory and an accompanying project to κέρας, meaning horn. If this seems weird mentioning, like a bad dream, you should confirm it actually is at the Keras documentation. Elephas also means elephant, as in stuffed yellow elephant.

Elephas implements a class of data-parallel algorithms on top of Keras, using Spark's RDDs and data frames. Keras Models are initialized on the driver, then serialized and shipped to workers, alongside with data and broadcasted model parameters. Spark workers deserialize the model, train their chunk of data and send their gradients back to the driver. The "master" model on the driver is updated by an optimizer, which takes gradients either synchronously or asynchronously.

Getting started

Just install elephas from PyPI with, Spark will be installed through pyspark for you.

pip install elephas

That's it, you should now be able to run Elephas examples.

Basic Spark integration

After installing both Elephas, you can train a model as follows. First, create a local pyspark context python from pyspark import SparkContext, SparkConf conf = SparkConf().setAppName('Elephas_App').setMaster('local[8]') sc = SparkContext(conf=conf)

Next, you define and compile a Keras model python from tensorflow.keras.models import Sequential from tensorflow.keras.layers import Dense, Dropout, Activation from tensorflow.keras.optimizers import SGD model = Sequential() model.add(Dense(128, input_dim=784)) model.add(Activation('relu')) model.add(Dropout(0.2)) model.add(Dense(128)) model.add(Activation('relu')) model.add(Dropout(0.2)) model.add(Dense(10)) model.add(Activation('softmax')) model.compile(loss='categorical_crossentropy', optimizer=SGD())

and create an RDD from numpy arrays (or however you want to create an RDD) python from elephas.utils.rdd_utils import to_simple_rdd rdd = to_simple_rdd(sc, x_train, y_train)

The basic model in Elephas is the SparkModel. You initialize a SparkModel by passing in a compiled Keras model, an update frequency and a parallelization mode. After that you can simply fit the model on your RDD. Elephas fit has the same options as a Keras model, so you can pass epochs, batch_size etc. as you're used to from tensorflow.keras.

```python from elephas.spark_model import SparkModel, AsynchronousSparkModel

spark_model = SparkModel(model)

or, if you want use the asynchronous training paradigm

spark_model = AsynchronousSparkModel(model, frequency='epoch', mode='asynchronous')

sparkmodel.fit(rdd, epochs=20, batchsize=32, verbose=0, validation_split=0.1) ```

Your script can now be run using spark-submit bash spark-submit --driver-memory 1G ./your_script.py

Increasing the driver memory even further may be necessary, as the set of parameters in a network may be very large and collecting them on the driver eats up a lot of resources. See the examples folder for a few working examples.

Distributed Inference and Evaluation

The SparkModel can also be used for distributed inference (prediction) and evaluation. Similar to the fit method, the predict and evaluate methods conform to the Keras Model API.

```python from elephas.spark_model import SparkModel

create/train the model, similar to the previous section (Basic Spark Integration)

model = ... sparkmodel = SparkModel(model, ...) sparkmodel.fit(...)

xtest, ytest = ... # load test data

predictions = sparkmodel.predict(xtest) # perform inference evaluation = sparkmodel.evaluate(xtest, y_test) # perform evaluation/scoring ``` The paradigm is identical to the data parallelism in training, as the model is serialized and shipped to the workers and used to evaluate a chunk of the testing data. The predict method will take either a numpy array or an RDD.

Spark MLlib integration

Following up on the last example, to use Spark's MLlib library with Elephas, you create an RDD of LabeledPoints for supervised training as follows

python from elephas.utils.rdd_utils import to_labeled_point lp_rdd = to_labeled_point(sc, x_train, y_train, categorical=True)

Training a given LabeledPoint-RDD is very similar to what we've seen already

python from elephas.spark_model import SparkMLlibModel spark_model = SparkMLlibModel(model, frequency='batch', mode='hogwild') spark_model.train(lp_rdd, epochs=20, batch_size=32, verbose=0, validation_split=0.1, categorical=True, nb_classes=nb_classes)

Spark ML integration

To train a model with a SparkML estimator on a data frame, use the following syntax. ```python df = todataframe(sc, xtrain, ytrain, categorical=True) testdf = todataframe(sc, xtest, y_test, categorical=True)

estimator = ElephasEstimator(model, epochs=epochs, batchsize=batchsize, frequency='batch', mode='asynchronous', categorical=True, nbclasses=nbclasses) fitted_model = estimator.fit(df) ```

Fitting an estimator results in a SparkML transformer, which we can use for predictions and other evaluations by calling the transform method on it.

```python prediction = fittedmodel.transform(testdf) pnl = prediction.select("label", "prediction") pnl.show(100) import numpy as np predictionandlabel = pnl.rdd.map(lambda row: (row.label, float(np.argmax(row.prediction))))

metrics = MulticlassMetrics(predictionandlabel) print(metrics.weightedPrecision) print(metrics.weightedRecall) ```

If the model utilizes custom activation function, layer, or loss function, that will need to be supplied using the set_custom_objects method:

```python def custom_activation(x): ... class CustomLayer(Layer): ... model = Sequential() model.add(CustomLayer(...))

estimator = ElephasEstimator(model, epochs=epochs, batchsize=batchsize) estimator.setcustomobjects({'customactivation': customactivation, 'CustomLayer': CustomLayer}) ```

Hadoop Integration

In addition to saving locally, models may be saved directly into a network-accessible Hadoop cluster.

python spark_model.save('/absolute/file/path/model.h5', to_hadoop=True)

Models saved on a network-accessible Hadoop cluster may be loaded as follows.

```python from elephas.sparkmodel import loadspark_model

sparkmodel = loadsparkmodel('/absolute/file/path/model.h5', fromhadoop=True) ```

Distributed hyper-parameter optimization

UPDATE: As of 3.0.0, Hyper-parameter optimization features have been removed, since Hyperas is no longer active and was causing versioning compatibility issues. To use these features, install version 2.1 or below.

Hyper-parameter optimization with elephas is based on hyperas, a convenience wrapper for hyperopt and keras. Each Spark worker executes a number of trials, the results get collected and the best model is returned. As the distributed mode in hyperopt (using MongoDB), is somewhat difficult to configure and error prone at the time of writing, we chose to implement parallelization ourselves. Right now, the only available optimization algorithm is random search.

The first part of this example is more or less directly taken from the hyperas documentation. We define data and model as functions, hyper-parameter ranges are defined through braces. See the hyperas documentation for more on how this works.

```python from hyperopt import STATUS_OK from hyperas.distributions import choice, uniform

def data(): from tensorflow.keras.datasets import mnist from tensorflow.keras.utils import tocategorical (xtrain, ytrain), (xtest, ytest) = mnist.loaddata() xtrain = xtrain.reshape(60000, 784) xtest = xtest.reshape(10000, 784) xtrain = xtrain.astype('float32') xtest = xtest.astype('float32') xtrain /= 255 xtest /= 255 nbclasses = 10 ytrain = tocategorical(ytrain, nbclasses) ytest = tocategorical(ytest, nbclasses) return xtrain, ytrain, xtest, y_test

def model(xtrain, ytrain, xtest, ytest): from tensorflow.keras.models import Sequential from tensorflow.keras.layers import Dense, Dropout, Activation from tensorflow.keras.optimizers import RMSprop

model = Sequential()
model.add(Dense(512, input_shape=(784,)))
model.add(Activation('relu'))
model.add(Dropout({{uniform(0, 1)}}))
model.add(Dense({{choice([256, 512, 1024])}}))
model.add(Activation('relu'))
model.add(Dropout({{uniform(0, 1)}}))
model.add(Dense(10))
model.add(Activation('softmax'))

rms = RMSprop()
model.compile(loss='categorical_crossentropy', optimizer=rms)

model.fit(x_train, y_train,
          batch_size={{choice([64, 128])}},
          nb_epoch=1,
          show_accuracy=True,
          verbose=2,
          validation_data=(x_test, y_test))
score, acc = model.evaluate(x_test, y_test, show_accuracy=True, verbose=0)
print('Test accuracy:', acc)
return {'loss': -acc, 'status': STATUS_OK, 'model': model.to_json()}

```

Once the basic setup is defined, running the minimization is done in just a few lines of code:

```python from elephas.hyperparam import HyperParamModel from pyspark import SparkContext, SparkConf

Create Spark context

conf = SparkConf().setAppName('ElephasHyperparameterOptimization').setMaster('local[8]') sc = SparkContext(conf=conf)

Define hyper-parameter model and run optimization

hyperparammodel = HyperParamModel(sc) hyperparammodel.minimize(model=model, data=data, max_evals=5) ```

Distributed training of ensemble models

UPDATE: As of 3.0.0, Hyper-parameter optimization features have been removed, since Hyperas is no longer active and was causing versioning compatibility issues. To use these features, install version 2.1 or below.

Building on the last section, it is possible to train ensemble models with elephas by means of running hyper-parameter optimization on large search spaces and defining a resulting voting classifier on the top-n performing models. With data and model defined as above, this is a simple as running

python result = hyperparam_model.best_ensemble(nb_ensemble_models=10, model=model, data=data, max_evals=5) In this example an ensemble of 10 models is built, based on optimization of at most 5 runs on each of the Spark workers.

Hugging Face Models Training and Inference

Note: Due to incompatibilities with Keras 3.0 which would ultimately limit the Tensorflow version we can upgrade to, and the announcement of HuggingFace no longer supporting Tensorflow, HuggingFace support has been removed from Elephas. As of 6.0.0, Elephas now supports distributed training (and inference) with HuggingFace models (using the Tensorflow/Keras backend), currently for text classification, token classification, and causal langugage modeling only, and in the "synchronous" training mode. In future releases, we hope to expand this to other types of models and the "asynchronous" and "hogwild" training modes. This can be accomplished using the SparkHFModel:

```python from elephas.sparkmodel import SparkHFModel from elephas.utils.rddutils import tosimplerdd from sklearn.datasets import fetch20newsgroups from sklearn.modelselection import traintestsplit from sklearn.preprocessing import LabelEncoder from transformers import AutoTokenizer, TFAutoModelForSequenceClassification from tensorflow.keras.optimizers import SGD batchsize = ... epochs = ... numworkers = ...

newsgroups = fetch_20newsgroups(subset='train') x = newsgroups.data y = newsgroups.target

encoder = LabelEncoder() yencoded = encoder.fittransform(y)

xtrain, xtest, ytrain, ytest = traintestsplit(x, yencoded, testsize=0.2)

model_name = 'albert-base-v2'

Note: the expectation is that text data is being supplied - tokenization is handled during training

rdd = tosimplerdd(sparkcontext, xtrain, y_train)

model = TFAutoModelForSequenceClassification.frompretrained(modelname, numlabels=len(np.unique(yencoded))) tokenizer = AutoTokenizer.frompretrained(modelname) tokenizer_kwargs = {'padding': True, 'truncation': True, ...}

model.compile(optimizer=SGD(), loss='sparsecategoricalcrossentropy', metrics=['accuracy']) sparkmodel = SparkHFModel(model, numworkers=numworkers, mode="synchronous", tokenizer=tokenizer, tokenizerkwargs=tokenizer_kwargs, loader=TFAutoModelForSequenceClassification)

sparkmodel.fit(rdd, epochs=epochs, batchsize=batch_size)

predictions = sparkmodel.predict(sparkcontext.parallelize(xtest)) `` More examples can be seen in theexamplesdirectory, namely"hfcausalmodeling.py","hftokenclassification.py", and"hftext_classification.py"`.

The computational model is the same as for Keras models, except the model is serialized and deserialized differently due to differences in the HuggingFace API.

To use this capability, just install this package with the huggingface extra:

bash pip install elephas[huggingface]

Discussion

Premature parallelization may not be the root of all evil, but it may not always be the best idea to do so. Keep in mind that more workers mean less data per worker and parallelizing a model is not an excuse for actual learning. So, if you can perfectly well fit your data into memory and you're happy with training speed of the model consider just using keras.

One exception to this rule may be that you're already working within the Spark ecosystem and want to leverage what's there. The above SparkML example shows how to use evaluation modules from Spark and maybe you wish to further process the outcome of an elephas model down the road. In this case, we recommend to use elephas as a simple wrapper by setting num_workers=1.

Note that right now elephas restricts itself to data-parallel algorithms for two reasons. First, Spark simply makes it very easy to distribute data. Second, neither Spark nor Theano make it particularly easy to split up the actual model in parts, thus making model-parallelism practically impossible to realize.

Having said all that, we hope you learn to appreciate elephas as a pretty easy to setup and use playground for data-parallel deep-learning algorithms.

Literature

[1] J. Dean, G.S. Corrado, R. Monga, K. Chen, M. Devin, QV. Le, MZ. Mao, M’A. Ranzato, A. Senior, P. Tucker, K. Yang, and AY. Ng. Large Scale Distributed Deep Networks.

[2] F. Niu, B. Recht, C. Re, S.J. Wright HOGWILD!: A Lock-Free Approach to Parallelizing Stochastic Gradient Descent

[3] C. Noel, S. Osindero. Dogwild! — Distributed Hogwild for CPU & GPU

Maintainers / Contributions

This great project was started by Max Pumperla, and is currently maintained by Daniel Cahall (https://github.com/danielenricocahall). If you have any questions, please feel free to open up an issue or send an email to danielenricocahall@gmail.com. If you want to contribute, feel free to submit a PR, or start a conversation about how we can go about implementing something.

Star History

Star History Chart

Owner

  • Name: Danny
  • Login: danielenricocahall
  • Kind: user
  • Location: Philadelphia, PA
  • Company: Disney Streaming Services

JOSS Publication

Elephas: Distributed Deep Learning with Keras & Spark
Published
December 15, 2022
Volume 7, Issue 80, Page 4073
Authors
Max Pumperla
IU Internationale Hochschule, Erfurt, Germany, Anyscale Inc, San Francisco, USA
Daniel Cahall
Independent researcher, Philadelphia, USA
Editor
Patrick Diehl ORCID
Tags
Distributed Computing Deep Learning Keras Tensorflow Apache Spark

GitHub Events

Total
  • Create event: 7
  • Release event: 3
  • Issues event: 3
  • Watch event: 3
  • Issue comment event: 2
  • Push event: 63
  • Pull request review comment event: 1
  • Pull request event: 8
  • Fork event: 2
Last Year
  • Create event: 7
  • Release event: 3
  • Issues event: 3
  • Watch event: 3
  • Issue comment event: 2
  • Push event: 63
  • Pull request review comment event: 1
  • Pull request event: 8
  • Fork event: 2

Committers

Last synced: 5 months ago

All Time
  • Total Commits: 694
  • Total Committers: 28
  • Avg Commits per committer: 24.786
  • Development Distribution Score (DDS): 0.512
Past Year
  • Commits: 62
  • Committers: 2
  • Avg Commits per committer: 31.0
  • Development Distribution Score (DDS): 0.032
Top Committers
Name Email Commits
danielenricocahall d****l@g****m 339
Max Pumperla m****a@g****m 256
Yoseph Zuskin z****h@g****m 37
Oscar Pan o****n@a****m 21
Samangooei s****o@a****m 5
Ana Maria Martinez Fernandez a****f@g****m 4
dependabot[bot] 4****] 4
JONATHAN-DS\Jonathan j****n@o****m 2
Jose Alvarez j****z@i****m 2
Ivan Montero i****o@g****m 2
Liang-Chi Hsieh v****a@g****m 2
Sebasteuo s****4@e****r 2
jordan vega j****0@g****m 2
Kevin Mader k****r 2
Leo Gallucci e****3@g****m 1
Sina Samangooei s****x@g****m 1
Icyblade Dai i****e 1
Gaurav Yeole g****e@g****m 1
Andrea Bergonzo a****n 1
Alberto Fumagalli a****4@g****m 1
A-Fayez92 6****2 1
Willem Meints w****m@m****l 1
Mostafa m****r@m****m 1
10056727 t****r@b****m 1
Yuan Yifan y****n@l****m 1
n4nagappan n****n@g****m 1
spencerimp s****z@g****m 1
stevekludt@gmail.com a****r@g****m 1
Committer Domains (Top 20 + Academic)

Issues and Pull Requests

Last synced: 4 months ago

All Time
  • Total issues: 19
  • Total pull requests: 37
  • Average time to close issues: 10 months
  • Average time to close pull requests: 21 days
  • Total issue authors: 10
  • Total pull request authors: 3
  • Average comments per issue: 2.42
  • Average comments per pull request: 0.05
  • Merged pull requests: 31
  • Bot issues: 0
  • Bot pull requests: 0
Past Year
  • Issues: 0
  • Pull requests: 11
  • Average time to close issues: N/A
  • Average time to close pull requests: about 11 hours
  • Issue authors: 0
  • Pull request authors: 2
  • Average comments per issue: 0
  • Average comments per pull request: 0.09
  • Merged pull requests: 8
  • Bot issues: 0
  • Bot pull requests: 0
Top Authors
Issue Authors
  • danielenricocahall (9)
  • nmoran (2)
  • skythomp16 (1)
  • dvchoo (1)
  • blankdots (1)
  • GeoffDuniam (1)
  • raditya1117 (1)
  • Mett92 (1)
  • yvsandeep (1)
  • Yoseph-Zuskin (1)
Pull Request Authors
  • danielenricocahall (34)
  • Yoseph-Zuskin (2)
  • Sebasteuo (1)
Top Labels
Issue Labels
bug (8) enhancement (2) documentation (1)
Pull Request Labels

Packages

  • Total packages: 1
  • Total downloads:
    • pypi 19,542 last-month
  • Total docker downloads: 216
  • Total dependent packages: 0
  • Total dependent repositories: 18
  • Total versions: 49
  • Total maintainers: 2
pypi.org: elephas

Distributed deep learning on Spark with Keras

  • Versions: 49
  • Dependent Packages: 0
  • Dependent Repositories: 18
  • Downloads: 19,542 Last month
  • Docker Downloads: 216
Rankings
Downloads: 1.4%
Docker downloads count: 2.8%
Dependent repos count: 3.4%
Average: 7.9%
Dependent packages count: 10.1%
Stargazers count: 14.5%
Forks count: 15.3%
Last synced: 4 months ago

Dependencies

.github/workflows/ci.yaml actions
  • actions/checkout v3 composite
  • actions/setup-python v4 composite
.github/workflows/docs.yaml actions
  • actions/checkout v2 composite
  • actions/setup-python v2 composite
setup.py pypi
  • cython *
  • flask *
  • h5py ==3.3.0
  • pyspark <3.4
  • tensorflow >=2,
poetry.lock pypi
  • absl-py 2.1.0
  • aiohappyeyeballs 2.3.4
  • aiohttp 3.10.0
  • aiosignal 1.3.1
  • astunparse 1.6.3
  • async-timeout 4.0.3
  • attrs 24.1.0
  • blinker 1.8.2
  • cachetools 5.4.0
  • certifi 2024.7.4
  • charset-normalizer 3.3.2
  • click 8.1.7
  • colorama 0.4.6
  • coverage 7.6.0
  • cython 0.29.37
  • datasets 2.20.0
  • dill 0.3.8
  • exceptiongroup 1.2.2
  • execnet 2.1.1
  • filelock 3.15.4
  • findspark 2.0.1
  • flask 2.3.3
  • flatbuffers 24.3.25
  • frozenlist 1.4.1
  • fsspec 2024.5.0
  • gast 0.6.0
  • google-auth 2.32.0
  • google-auth-oauthlib 1.2.1
  • google-pasta 0.2.0
  • grpcio 1.65.2
  • h5py 3.11.0
  • huggingface-hub 0.24.5
  • idna 3.7
  • importlib-metadata 8.2.0
  • iniconfig 2.0.0
  • itsdangerous 2.2.0
  • jinja2 3.1.4
  • joblib 1.4.2
  • keras 2.15.0
  • libclang 18.1.1
  • markdown 3.6
  • markupsafe 2.1.5
  • ml-dtypes 0.3.2
  • mock 5.1.0
  • multidict 6.0.5
  • multiprocess 0.70.16
  • numpy 1.23.5
  • oauthlib 3.2.2
  • opt-einsum 3.3.0
  • packaging 24.1
  • pandas 2.2.2
  • pep8 1.7.1
  • pluggy 1.5.0
  • protobuf 4.25.4
  • py4j 0.10.9.7
  • pyarrow 17.0.0
  • pyarrow-hotfix 0.6
  • pyasn1 0.6.0
  • pyasn1-modules 0.4.0
  • pyspark 3.5.1
  • pytest 8.3.2
  • pytest-cache 1.0
  • pytest-cov 5.0.0
  • pytest-pep8 1.0.6
  • pytest-spark 0.6.0
  • python-dateutil 2.9.0.post0
  • pytz 2024.1
  • pyyaml 6.0.1
  • regex 2024.7.24
  • requests 2.32.3
  • requests-oauthlib 2.0.0
  • rsa 4.9
  • safetensors 0.4.3
  • scikit-learn 1.5.1
  • scipy 1.13.1
  • setuptools 72.1.0
  • six 1.16.0
  • tensorboard 2.15.2
  • tensorboard-data-server 0.7.2
  • tensorflow 2.15.1
  • tensorflow-cpu-aws 2.15.1
  • tensorflow-estimator 2.15.0
  • tensorflow-intel 2.15.1
  • tensorflow-io-gcs-filesystem 0.37.1
  • termcolor 2.4.0
  • threadpoolctl 3.5.0
  • tokenizers 0.19.1
  • tomli 2.0.1
  • tqdm 4.66.4
  • transformers 4.42.4
  • typing-extensions 4.12.2
  • tzdata 2024.1
  • urllib3 2.2.2
  • werkzeug 3.0.3
  • wheel 0.43.0
  • wrapt 1.14.1
  • xxhash 3.4.1
  • yarl 1.9.4
  • zipp 3.19.2
pyproject.toml pypi
  • datasets ^2.20.0 develop
  • mock ^5.1.0 develop
  • pytest ^8.3.1 develop
  • pytest-cov ^5.0.0 develop
  • pytest-pep8 ^1.0.6 develop
  • pytest-spark ^0.6.0 develop
  • scikit-learn ^1.5.1 develop
  • transformers ^4.42.4 develop
  • Cython ^0.29.33
  • Flask ^2.3.3
  • h5py 3.11.0
  • numpy 1.23.5
  • pyspark <=3.5.1
  • python >=3.9,<3.12
  • tensorflow >2.2,<=2.15.1
  • transformers <=4.42.4