Skip to content

Dask 是什么?

约 1011 字大约 3 分钟

分布式大数据

2025-04-15

介绍

Dask 是一个开源的 Python 库,专为并行计算和大数据处理设计。它提供了与 Pandas 和 NumPy 类似的高层次接口,同时支持将计算分布到多核、集群或云环境中。Dask 通过分块(chunking)和延迟计算(lazy evaluation)技术,实现了高效的数据处理和计算加速。

如何安装?

Dask 提供多种安装方式,你可以使用 condapip 或者直接从源码安装。

具体的安装命令可查看官网:How to Install Dask

核心组件

Dask Overview
Dask Overview
  • ​​Dask Arrays​​:分块处理的多维数组,支持并行线性代数运算。
  • ​Dask DataFrame​​:分块处理的表格数据,兼容 Pandas 操作(如 groupby,join)。
  • Dask Delayed​​:装饰器 @dask.delayed,用于并行化任意 Python 函数。
  • Dask Distributed​:分布式调度器,提供容错、动态负载均衡和诊断工具。

我们在项目中使用的时候可以通过以下方式引用:

import numpy as np
import pandas as pd

import dask.dataframe as dd
import dask.array as da
import dask.bag as db

# 实际应用中可能不需要全部引用,根据具体所需要处理的数据来定。

对于我们项目里面表格的处理,DataFrame 是比较合适的。而 Dask DataFram 和 Pandas 是完全兼容的,API 是一致的。

Load Data
import pandas as pd

df = pd.read_parquet('s3://mybucket/myfile.parquet')
df.head()
import dask.dataframe as dd

df = dd.read_parquet('s3://mybucket/myfile.*.parquet')
df.head()

与 Pandas 的关键区别

操作PandasDask
数据过滤立即执行,内存中处理生成任务,延迟执行,并行分块
表合并单机内存分布式 Shuffle
分组聚合直接计算全局结果局部聚合 + 全局合并
执行时机立即执行需要调用 .compute() 或者 .persist() 触发计算

示例

示例 1:data_clean.py
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd
import numpy as np
import pandas as pd

# ----------------------------
# 步骤 1: 启动分布式集群(本地模拟)
# ----------------------------
# 创建一个本地集群(实际部署时替换为真实集群地址)
cluster = LocalCluster(
    n_workers=4,          # 4个工作进程
    threads_per_worker=2, # 每个进程2个线程
    memory_limit="4GB"    # 每个进程内存限制
)
client = Client(cluster)  # 连接到集群

# 打印集群信息
print(f"Dask Dashboard: {client.dashboard_link}")  # 监控任务执行

# ----------------------------
# 步骤 2: 生成模拟数据(实际场景从 HDFS/S3 读取)
# ----------------------------
def generate_data():
    # 创建分布式 CSV 文件(模拟10个分块)
    for i in range(10):
        users = np.random.choice([f"user_{x}" for x in range(1000)], size=10000)
        amounts = np.round(np.random.normal(100, 50, 10000), 2)
        amounts = np.where(amounts < 0, 0, amounts)  # 生成少量负值

        df = pd.DataFrame({"user": users, "amount": amounts})
        df.to_csv(f"./data/transactions_{i}.csv", index=False)

generate_data()  # 生成测试数据

# ----------------------------
# 步骤 3: 分布式数据处理
# ----------------------------
# 从分布式存储读取数据(此处模拟本地文件)
ddf = dd.read_csv("./data/transactions_*.csv")

# 过滤异常交易(惰性操作)
ddf_filtered = ddf[ddf["amount"] > 0]

# 计算每个用户的总金额
result = ddf_filtered.groupby("user")["amount"].sum()

# ----------------------------
# 步骤 4: 触发计算并持久化
# ----------------------------
# 将过滤后的数据缓存在集群内存中(加速后续操作)
ddf_filtered = client.persist(ddf_filtered)  # 分布式缓存

# 计算最终结果并保存到分布式存储(此处模拟输出到本地 CSV)
result = result.compute()                   # 触发计算
result.to_csv("./output/user_total_amounts.csv", index_label="user")

# 关闭集群(实际生产环境通常长期运行)
client.close()
cluster.close()

贡献者

更新日志

2025/4/28 00:52
查看所有更新日志
  • 49eec-improve(docs): code highlight
  • c4231-improve(docs): text typo
  • 081a2-feat(docs): add new article

Keep It Simple