楼主: olympic
1992 7

[程序分享] Dask介绍 [推广有奖]

  • 0关注
  • 9粉丝

已卖:482份资源

泰斗

51%

还不是VIP/贵宾

-

威望
0
论坛币
65159 个
通用积分
7567.9673
学术水平
200 点
热心指数
243 点
信用等级
179 点
经验
4002 点
帖子
29105
精华
0
在线时间
10018 小时
注册时间
2014-4-10
最后登录
2025-12-29

初级热心勋章 中级热心勋章 20周年荣誉勋章

楼主
olympic 发表于 2021-3-19 21:58:57 |AI写论文

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币
Dask是一款用于分析计算的灵活并行计算库。
Dask 是一个开源项目,为你提供 NumPy 数组、Pandas Dataframes 以及常规 list 的抽象,
        允许你使用多核处理器并行运行它们的操作。
Dask是一个并行计算库,能在集群中进行分布式计算,能以一种更方便简洁的方式处理大数据量,
    与Spark这些大数据处理框架相比较,Dask更轻。Dask更侧重与其他框架,
    如:Numpy,Pandas,Scikit-learning相结合,从而使其能更加方便进行分布式并行计算。
Dask由两部分组成:
    ·针对计算优化的动态任务调度。这与Airflow,Luigi,Celery或Make类似,但针对交互式计算工作负载进行了优化。
    · "大数据"集合, 像并行数组,数据框和列表一样,它们将通用接口(如NumPy,Pandas或Python迭代器)扩展到大于内存或分布式环境。
        这些并行集合运行在动态任务调度器之上。
二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:Ask Das scikit-learn Dataframe Learning

沙发
olympic 发表于 2021-3-19 22:00:18
安装dask
· 在线安装:
    pip install dask: # 仅安装核心部分
    pip install "dask[complete]" : 安装完整版本
· 离线安装:
    方法步骤[https://blog.csdn.net/weixin_39916734/article/details/95345351]
    注意: dask的有一些库要求的python版本 > 2.7.8 或者 3.4——版本过低容易被坑

藤椅
olympic 发表于 2021-3-19 22:01:27
数据结构
· Arrays:
    Dask中的Arrays(位于包dask.arrays下),其实就是对Numpy中的ndarray的部分接口进行了改进,从而方便处理大数据量。
    对于大数据集,特别是其大小大于内存时,如果我们要对其计算,按照传统的方式,会将其全部塞进内存里,
    那么这就会报Out-Of-Memory错误,当然,我们也可以一次读取一部分数据,那么我们是否可以提前将大数据集进行分块处理了,
    只需要控制每块数据集不超过内存,从而满足In-Memory计算了?Dask就是这样做的。
· Dataframes:
    Dataframe是基于Pandas Dataframe改进的一个可以并行处理大数据量的数据结构,即使大于内存的数据也是能够处理的.
· Bags:
    对于Bags,其最主要的是用于半结构化的大数据集,比如日志或者博客等等

板凳
olympic 发表于 2021-3-19 22:02:21
[High Level Graph]

比如我们在创建Dask Dataframe时,其实是通过HighLevelGraph构建了一个任务图,那么这个任务图是什么?
其实他本质上就是一个字典结构(Dict),从组成元素来看,一共由两部分组成,
一个是动作(可看做是Task Graph中的节点),一个是依赖(可看做是Task Graph中的边)

报纸
olympic 发表于 2021-3-19 22:02:48
[分布式]

在Dask 分布式中(也可以是伪分布式,即在本机中通过线程或者进程来并行处理),共有三种角色:Client端,Scheduler端以及Worker端,
· Client负责提交Task给Scheduler
· Scheduler负责对提交的Task按照一定的策略分发给Worker
· Worker进行实际的计算、数据存储,在此期间,Scheduler时刻关注着Worker的状态。

地板
olympic 发表于 2021-3-19 22:03:07
[延时计算与即时计算]

· Delayed:
    只需在一个普通的Python Function上面通过dask.delayed函数进行封装,就能得到一个Delayed对象
    只是构建了一个Task Graph,并没有进行实际的计算,只有调用compute的时候,才开始进行计算
· Future:
    对于Future是立即执行的,可以通过submit、map方法将一个Function提交给Scheduler,
    在后台,Scheduler会对提交的任务进行处理并分发给Workers进行实际的计算。
    当任务提交后,会返回一个指向任务运行结果的Key值,即Future对象,
    我们可以跟踪其当前状态,当然我们也可以通过result和gather方法等待任务完成后从而将结果收集到本地

7
olympic 发表于 2021-3-19 22:03:30
[简单使用]

· 读取csv
    import dask.dataframe as dd
    df = dd.read_csv('2015-*-*.csv')
    df.groupby(df.user_id).value.mean().compute()
· 读取hdf5
    import dask.array as da
    f = h5py.File('myfile.hdf5')
    x = da.from_array(f['/big-data'], chunks=(1000, 1000))
    x - x.mean(axis=1).compute()
· Dask Bag
    import dask.bag as db
    b = db.read_text('2015-*-*.json.gz').map(json.loads)
    b.pluck('name').frequencies().topk(10, lambda pair: pair[1]).compute()
· Dask delayed
    from dask import delayed
    L = []
    for fn in filenames:                  # Use for loops to build up computation
        data = delayed(load)(fn)          # Delay execution of function
        L.append(delayed(process)(data))  # Build connections between variables
    result = delayed(summarize)(L)
    result.compute()
· Client
    client = Client(processes=True)
    from_sequence(seq[, partition_size, npartitions]): Create a dask Bag from Python sequence.
    dask.compute(): Compute several dask collections at once.

8
sigmund 在职认证  发表于 2021-3-23 00:20:13
很好的介绍!

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注cda
拉您进交流群
GMT+8, 2025-12-30 11:44