Dask 是什么?
介绍
Dask 是一个开源的 Python 库,专为并行计算和大数据处理设计。它提供了与 Pandas 和 NumPy 类似的高层次接口,同时支持将计算分布到多核、集群或云环境中。Dask 通过分块 ( chunking ) 和延迟计算 ( lazy evaluation ) 技术,实现了高效的数据处理和计算加速。
如何安装?
Dask 提供多种安装方式,你可以使用 conda
,pip
或者直接从源码安装。
具体的安装命令可查看官网: How to Install Dask 。
核心组件
- Dask Arrays:分块处理的多维数组,支持并行线性代数运算。
- Dask DataFrame:分块处理的表格数据,兼容 Pandas 操作 ( 如 groupby, join ) 。
- Dask Delayed:装饰器 @dask.delayed,用于并行化任意 Python 函数。
- Dask Distributed:分布式调度器,提供容错、动态负载均衡和诊断工具。
我们在项目中使用的时候可以通过以下方式引用:
import numpy as np
import pandas as pd
import dask.dataframe as dd
import dask.array as da
import dask.bag as db
# 实际应用中可能不需要全部引用,根据具体所需要处理的数据来定。
对于我们项目里面表格的处理,DataFrame 是比较合适的。而 Dask DataFram 和 Pandas 是完全兼容的,API 是一致的。
Load Data
import pandas as pd
df = pd.read_parquet ( 's3://mybucket/myfile.parquet' )
df.head ( )
import dask.dataframe as dd
df = dd.read_parquet ( 's3://mybucket/myfile.*.parquet' )
df.head ( )
Data Processing
import pandas as pd
df = df [df.value >= 0]
joined = df.merge ( other, on="account" )
result = joined.groupby ( "account" ) .value.mean ( )
result
import dask.dataframe as dd
df = df [df.value >= 0]
joined = df.merge ( other, on="account" )
result = joined.groupby ( "account" ) .value.mean ( )
result.compute ( )
这里可以注意到 Dask 是并行处理版本的 Pandas。而且 Dask 是懒计算,只有当调用 .compute ( )
的时候才会进行。
与 Pandas 的关键区别
操作 | Pandas | Dask |
---|---|---|
数据过滤 | 立即执行,内存中处理 | 生成任务,延迟执行,并行分块 |
表合并 | 单机内存 | 分布式 Shuffle |
分组聚合 | 直接计算全局结果 | 局部聚合 + 全局合并 |
执行时机 | 立即执行 | 需要调用 .compute ( ) 或者 .persist ( ) 触发计算 |
示例
示例 1: data_clean.py
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd
import numpy as np
import pandas as pd
# ----------------------------
# 步骤 1: 启动分布式集群 ( 本地模拟 )
# ----------------------------
# 创建一个本地集群 ( 实际部署时替换为真实集群地址 )
cluster = LocalCluster (
n_workers=4, # 4 个工作进程
threads_per_worker=2, # 每个进程 2 个线程
memory_limit="4GB" # 每个进程内存限制
)
client = Client ( cluster ) # 连接到集群
# 打印集群信息
print ( f"Dask Dashboard: {client.dashboard_link}" ) # 监控任务执行
# ----------------------------
# 步骤 2: 生成模拟数据 ( 实际场景从 HDFS/S3 读取 )
# ----------------------------
def generate_data ( ) :
# 创建分布式 CSV 文件 ( 模拟 10 个分块 )
for i in range ( 10 ) :
users = np.random.choice ( [f"user_{x}" for x in range ( 1000 )], size=10000 )
amounts = np.round ( np.random.normal ( 100,50,10000 ) , 2 )
amounts = np.where ( amounts < 0,0, amounts ) # 生成少量负值
df = pd.DataFrame ( {"user": users, "amount": amounts} )
df.to_csv ( f"./data/transactions_{i}.csv", index=False )
generate_data ( ) # 生成测试数据
# ----------------------------
# 步骤 3: 分布式数据处理
# ----------------------------
# 从分布式存储读取数据 ( 此处模拟本地文件 )
ddf = dd.read_csv ( "./data/transactions_*.csv" )
# 过滤异常交易 ( 惰性操作 )
ddf_filtered = ddf [ddf["amount"] > 0]
# 计算每个用户的总金额
result = ddf_filtered.groupby ( "user" ) ["amount"].sum ( )
# ----------------------------
# 步骤 4: 触发计算并持久化
# ----------------------------
# 将过滤后的数据缓存在集群内存中 ( 加速后续操作 )
ddf_filtered = client.persist ( ddf_filtered ) # 分布式缓存
# 计算最终结果并保存到分布式存储 ( 此处模拟输出到本地 CSV )
result = result.compute ( ) # 触发计算
result.to_csv ( "./output/user_total_amounts.csv", index_label="user" )
# 关闭集群 ( 实际生产环境通常长期运行 )
client.close ( )
cluster.close ( )
示例 2: install_plugin.py
from dask.distributed import PipInstall
from dask.dataframe import read_sql_table
from dask.distributed import Client
# 连接 Dask 集群
client = Client ( "tcp://dask-scheduler:8786" )
plugin = PipInstall ( packages=["sqlalchemy", "psycopg2-binary"], pip_options=["--upgrade"] )
client.register_plugin ( plugin )
# 分布式读取表数据 ( 按 id 列分块 )
ddf = read_sql_table (
"电视剧",
"postgresql://iger: ige@192.168.45.6:55462/postgres",
schema="chinese",
index_col="drama_id", # 用于分区的列 ( 必须有索引 )
npartitions=10, # 分区数 ( 建议与 CPU 核心数对齐 )
)
# 查看前 5 行 ( 触发实际查询 )
print ( ddf.head ( ))
贡献者
更新日志
2025/8/5 08:34
查看所有更新日志