楼主: ReneeBK
2425 7

[Elephas]Distributed Deep Learning with Keras & Spark Build Status [推广有奖]

  • 1关注
  • 62粉丝

VIP

已卖:4897份资源

学术权威

14%

还不是VIP/贵宾

-

TA的文库  其他...

R资源总汇

Panel Data Analysis

Experimental Design

威望
1
论坛币
49635 个
通用积分
55.7537
学术水平
370 点
热心指数
273 点
信用等级
335 点
经验
57805 点
帖子
4005
精华
21
在线时间
582 小时
注册时间
2005-5-8
最后登录
2023-11-26

楼主
ReneeBK 发表于 2017-9-8 03:19:33 |AI写论文

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币

  1. Elephas: Distributed Deep Learning with Keras & Spark Build Status

  2. Elephas is an extension of Keras, which allows you to run distributed deep learning models at scale with Spark. Elephas currently supports a number of applications, including:

  3. Data-parallel training of deep learning models
  4. Distributed hyper-parameter optimization
  5. Distributed training of ensemble models
  6. Schematically, elephas works as follows.

  7. Elephas

  8. Table of content:

  9. Elephas: Distributed Deep Learning with Keras & Spark
  10. Introduction
  11. Getting started
  12. Installation
  13. Basic example
  14. Spark ML example
  15. Usage of data-parallel models
  16. Model updates (optimizers)
  17. Update frequency
  18. Update mode
  19. Asynchronous updates with read and write locks (mode='asynchronous')
  20. Asynchronous updates without locks (mode='hogwild')
  21. Synchronous updates (mode='synchronous')
  22. Degree of parallelization (number of workers)
  23. Distributed hyper-parameter optimization
  24. Distributed training of ensemble models
  25. Discussion
  26. Future work & contributions
  27. Literature
  28. Introduction

  29. Elephas brings deep learning with Keras to Spark. Elephas intends to keep the simplicity and high usability of Keras, thereby allowing for fast prototyping of distributed models, which can be run on massive data sets. For an introductory example, see the following iPython notebook.

  30. ἐλέφας is Greek for ivory and an accompanying project to κέρας, meaning horn. If this seems weird mentioning, like a bad dream, you should confirm it actually is at the Keras documentation. Elephas also means elephant, as in stuffed yellow elephant.

  31. Elephas implements a class of data-parallel algorithms on top of Keras, using Spark's RDDs and data frames. Keras Models are initialized on the driver, then serialized and shipped to workers, alongside with data and broadcasted model parameters. Spark workers deserialize the model, train their chunk of data and send their gradients back to the driver. The "master" model on the driver is updated by an optimizer, which takes gradients either synchronously or asynchronously.

  32. Getting started

  33. Installation

  34. Install elephas from PyPI with

  35. pip install elephas
  36. Depending on what OS you are using, you may need to install some prerequisite modules (LAPACK, BLAS, fortran compiler) first.

  37. For example, on Ubuntu Linux:

  38. sudo apt-get install liblapack-dev libblas-dev gfortran
  39. A quick way to install Spark locally is to use homebrew on Mac

  40. brew install spark
  41. or linuxbrew on linux.

  42. brew install apache-spark
  43. The brew version of Spark may be outdated at times. To build from source, simply follow the instructions at the Spark download section or use the following commands.

  44. wget http://apache.mirrors.tds.net/spark/spark-1.5.2/spark-1.5.2-bin-hadoop2.6.tgz -P ~
  45. sudo tar zxvf ~/spark-* -C /usr/local
  46. sudo mv /usr/local/spark-* /usr/local/spark
  47. After that, make sure to put these path variables to your shell profile (e.g. ~/.zshrc):

  48. export SPARK_HOME=/usr/local/spark
  49. export PATH=$PATH:$SPARK_HOME/bin
  50. Using Docker

  51. Install and get Docker running by following the instructions here (https://www.docker.com/).

  52. Building

  53. The build takes quite a while to run the first time since many packages need to be downloaded and installed. In the same directory as the Dockerfile run the following commands

  54. docker build . -t pyspark/elephas
  55. Running

  56. The following command starts a container with the Notebook server listening for HTTP connections on port 8899 (since local Jupyter notebooks use 8888) without authentication configured.

  57. docker run -d -p 8899:8888 pyspark/elephas
复制代码

本帖隐藏的内容

https://github.com/maxpumperla/elephas


二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:distributed Learning earning tribute status

本帖被以下文库推荐

沙发
ReneeBK 发表于 2017-9-8 03:20:45
  1. Basic example

  2. After installing both Elephas and Spark, training a model is done schematically as follows:

  3. ##Create a local pyspark context
  4. from pyspark import SparkContext, SparkConf
  5. conf = SparkConf().setAppName('Elephas_App').setMaster('local[8]')
  6. sc = SparkContext(conf=conf)

  7. ##Define and compile a Keras model
  8. from keras.models import Sequential
  9. from keras.layers.core import Dense, Dropout, Activation
  10. from keras.optimizers import SGD
  11. model = Sequential()
  12. model.add(Dense(128, input_dim=784))
  13. model.add(Activation('relu'))
  14. model.add(Dropout(0.2))
  15. model.add(Dense(128))
  16. model.add(Activation('relu'))
  17. model.add(Dropout(0.2))
  18. model.add(Dense(10))
  19. model.add(Activation('softmax'))
  20. model.compile(loss='categorical_crossentropy', optimizer=SGD())

  21. ##Create an RDD from numpy arrays
  22. from elephas.utils.rdd_utils import to_simple_rdd
  23. rdd = to_simple_rdd(sc, X_train, Y_train)
  24. A SparkModel is defined by passing Spark context and Keras model. Additionally, one has choose an optimizer used for updating the elephas model, an update frequency, a parallelization mode and the degree of parallelism, i.e. the number of workers.
  25. from elephas.spark_model import SparkModel
  26. from elephas import optimizers as elephas_optimizers

  27. adagrad = elephas_optimizers.Adagrad()
  28. spark_model = SparkModel(sc,model, optimizer=adagrad, frequency='epoch', mode='asynchronous', num_workers=2)
  29. spark_model.train(rdd, nb_epoch=20, batch_size=32, verbose=0, validation_split=0.1, num_workers=8)

  30. ##Run your script using spark-submit
  31. spark-submit --driver-memory 1G ./your_script.py
  32. Increasing the driver memory even further may be necessary, as the set of parameters in a network may be very large and collecting them on the driver eats up a lot of resources. See the examples folder for a few working examples.
复制代码

藤椅
MouJack007 发表于 2017-9-8 03:23:20
谢谢楼主分享!

板凳
MouJack007 发表于 2017-9-8 03:23:56

报纸
ReneeBK 发表于 2017-9-8 03:24:39
  1. Spark MLlib example

  2. Following up on the last example, to create an RDD of LabeledPoints for supervised training from pairs of numpy arrays, use

  3. from elephas.utils.rdd_utils import to_labeled_point
  4. lp_rdd = to_labeled_point(sc, X_train, Y_train, categorical=True)
  5. Training a given LabeledPoint-RDD is very similar to what we've seen already

  6. from elephas.spark_model import SparkMLlibModel
  7. adadelta = elephas_optimizers.Adadelta()
  8. spark_model = SparkMLlibModel(sc,model, optimizer=adadelta, frequency='batch', mode='hogwild', num_workers=2)
  9. spark_model.train(lp_rdd, nb_epoch=20, batch_size=32, verbose=0, validation_split=0.1, categorical=True, nb_classes=nb_classes)
复制代码

地板
ReneeBK 发表于 2017-9-8 03:25:26
  1. Spark ML example

  2. To train a model with a SparkML estimator on a data frame, use the following syntax.

  3. df = to_data_frame(sc, X_train, Y_train, categorical=True)
  4. test_df = to_data_frame(sc, X_test, Y_test, categorical=True)

  5. adadelta = elephas_optimizers.Adadelta()
  6. estimator = ElephasEstimator(sc,model,
  7.         nb_epoch=nb_epoch, batch_size=batch_size, optimizer=adadelta, frequency='batch', mode='asynchronous', num_workers=2,
  8.         verbose=0, validation_split=0.1, categorical=True, nb_classes=nb_classes)

  9. fitted_model = estimator.fit(df)
复制代码

7
ReneeBK 发表于 2017-9-8 03:26:48
  1. from __future__ import print_function
  2. from hyperopt import Trials, STATUS_OK, tpe

  3. from hyperas import optim
  4. from hyperas.distributions import choice, uniform

  5. from elephas.hyperparam import HyperParamModel

  6. from pyspark import SparkContext, SparkConf

  7. def data():
  8.     '''
  9.     Data providing function:

  10.     Make sure to have every relevant import statement included here and return data as
  11.     used in model function below. This function is separated from model() so that hyperopt
  12.     won't reload data for each evaluation run.
  13.     '''
  14.     from keras.datasets import mnist
  15.     from keras.utils import np_utils
  16.     (X_train, y_train), (X_test, y_test) = mnist.load_data()
  17.     X_train = X_train.reshape(60000, 784)
  18.     X_test = X_test.reshape(10000, 784)
  19.     X_train = X_train.astype('float32')
  20.     X_test = X_test.astype('float32')
  21.     X_train /= 255
  22.     X_test /= 255
  23.     nb_classes = 10
  24.     Y_train = np_utils.to_categorical(y_train, nb_classes)
  25.     Y_test = np_utils.to_categorical(y_test, nb_classes)
  26.     return X_train, Y_train, X_test, Y_test


  27. def model(X_train, Y_train, X_test, Y_test):
  28.     '''
  29.     Model providing function:

  30.     Create Keras model with double curly brackets dropped-in as needed.
  31.     Return value has to be a valid python dictionary with two customary keys:
  32.         - loss: Specify a numeric evaluation metric to be minimized
  33.         - status: Just use STATUS_OK and see hyperopt documentation if not feasible
  34.     The last one is optional, though recommended, namely:
  35.         - model: specify the model just created so that we can later use it again.
  36.     '''
  37.     from keras.models import Sequential
  38.     from keras.layers.core import Dense, Dropout, Activation
  39.     from keras.optimizers import RMSprop

  40.     model = Sequential()
  41.     model.add(Dense(512, input_shape=(784,)))
  42.     model.add(Activation('relu'))
  43.     model.add(Dropout({{uniform(0, 1)}}))
  44.     model.add(Dense({{choice([256, 512, 1024])}}))
  45.     model.add(Activation('relu'))
  46.     model.add(Dropout({{uniform(0, 1)}}))
  47.     model.add(Dense(10))
  48.     model.add(Activation('softmax'))

  49.     rms = RMSprop()
  50.     model.compile(loss='categorical_crossentropy', optimizer=rms)

  51.     model.fit(X_train, Y_train,
  52.               batch_size={{choice([64, 128])}},
  53.               nb_epoch=1,
  54.               show_accuracy=True,
  55.               verbose=2,
  56.               validation_data=(X_test, Y_test))
  57.     score, acc = model.evaluate(X_test, Y_test, show_accuracy=True, verbose=0)
  58.     print('Test accuracy:', acc)
  59.     return {'loss': -acc, 'status': STATUS_OK, 'model': model.to_yaml(), 'weights': pickle.dumps(model.get_weights())}

  60. # Create Spark context
  61. conf = SparkConf().setAppName('Elephas_Hyperparameter_Optimization').setMaster('local[8]')
  62. sc = SparkContext(conf=conf)

  63. # Define hyper-parameter model and run optimization.
  64. hyperparam_model = HyperParamModel(sc)
  65. hyperparam_model.minimize(model=model, data=data, max_evals=5)
复制代码

8
ReneeBK 发表于 2017-9-8 03:27:47
  1. from __future__ import absolute_import
  2. from __future__ import print_function

  3. from keras.datasets import mnist
  4. from keras.models import Sequential
  5. from keras.layers.core import Dense, Dropout, Activation
  6. from keras.optimizers import Adam
  7. from keras.utils import np_utils

  8. from elephas.ml_model import ElephasEstimator
  9. from elephas.ml.adapter import to_data_frame
  10. from elephas import optimizers as elephas_optimizers

  11. from pyspark import SparkContext, SparkConf
  12. from pyspark.mllib.evaluation import MulticlassMetrics
  13. from pyspark.ml import Pipeline


  14. # Define basic parameters
  15. batch_size = 64
  16. nb_classes = 10
  17. nb_epoch = 1

  18. # Load data
  19. (x_train, y_train), (x_test, y_test) = mnist.load_data()

  20. x_train = x_train.reshape(60000, 784)
  21. x_test = x_test.reshape(10000, 784)
  22. x_train = x_train.astype("float32")
  23. x_test = x_test.astype("float32")
  24. x_train /= 255
  25. x_test /= 255
  26. print(x_train.shape[0], 'train samples')
  27. print(x_test.shape[0], 'test samples')

  28. # Convert class vectors to binary class matrices
  29. y_train = np_utils.to_categorical(y_train, nb_classes)
  30. y_test = np_utils.to_categorical(y_test, nb_classes)

  31. model = Sequential()
  32. model.add(Dense(128, input_dim=784))
  33. model.add(Activation('relu'))
  34. model.add(Dropout(0.2))
  35. model.add(Dense(128))
  36. model.add(Activation('relu'))
  37. model.add(Dropout(0.2))
  38. model.add(Dense(10))
  39. model.add(Activation('softmax'))


  40. # Compile model
  41. adam = Adam()
  42. model.compile(loss='categorical_crossentropy', optimizer=adam)

  43. # Create Spark context
  44. conf = SparkConf().setAppName('Mnist_Spark_MLP').setMaster('local[8]')
  45. sc = SparkContext(conf=conf)

  46. # Build RDD from numpy features and labels
  47. df = to_data_frame(sc, x_train, y_train, categorical=True)
  48. test_df = to_data_frame(sc, x_test, y_test, categorical=True)

  49. # Define elephas optimizer
  50. adadelta = elephas_optimizers.Adadelta()

  51. # Initialize Spark ML Estimator
  52. estimator = ElephasEstimator()
  53. estimator.set_keras_model_config(model.to_yaml())
  54. estimator.set_optimizer_config(adadelta.get_config())
  55. estimator.set_nb_epoch(nb_epoch)
  56. estimator.set_batch_size(batch_size)
  57. estimator.set_num_workers(1)
  58. estimator.set_verbosity(0)
  59. estimator.set_validation_split(0.1)
  60. estimator.set_categorical_labels(True)
  61. estimator.set_nb_classes(nb_classes)

  62. # Fitting a model returns a Transformer
  63. pipeline = Pipeline(stages=[estimator])
  64. fitted_pipeline = pipeline.fit(df)

  65. # Evaluate Spark model by evaluating the underlying model
  66. prediction = fitted_pipeline.transform(test_df)
  67. pnl = prediction.select("label", "prediction")
  68. pnl.show(100)

  69. prediction_and_label = pnl.map(lambda row: (row.label, row.prediction))
  70. metrics = MulticlassMetrics(prediction_and_label)
  71. print(metrics.precision())
  72. print(metrics.recall())
复制代码

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注jltj
拉您入交流群
GMT+8, 2026-1-2 22:50