掌握大数据处理:使用Dask高效处理大型CSV文件
掌握大数据处理:使用Dask高效处理大型CSV文件
在当今数据驱动的商业环境中,数据已成为每个企业不可或缺的资源。然而,并非所有数据都存储在简单的数据库中。许多公司仍然依赖老式的CSV文件来存储和交换所有的表格数据,因为这是数据存储的最简单形式。
随着公司的发展,数据收集将呈指数级增长。这些文件的大小可能会显著累积,使得使用如Pandas这样的常见库无法加载它们。这些大型CSV文件会减慢许多数据活动,并耗尽我们的系统资源,这也是为什么许多专业人士尝试使用大数据的替代解决方案。
上述问题正是Dask诞生的原因。Dask是一个强大的Python库,专为数据操作设计,但具有并行计算能力。它允许用户处理超出机器内存的数据,通过将其分解为可管理的分区并并行执行操作。Dask还使用惰性评估来管理内存,其中任何计算都被优化,并且仅在明确请求时执行。
随着Dask成为许多数据专业人士的重要工具,本文将探讨如何使用Dask处理CSV文件目录,尤其是在内存不足的情况下。
使用Dask处理CSV文件
首先,我们准备一个样本CSV数据集。你可以使用你的实际数据集或Kaggle上的样本数据集,我将在这里使用后者。将文件放入'data'文件夹并重命名它们。
准备好数据集后,我们安装Dask库以供使用。
pip install dask[complete]
如果安装成功,我们可以使用Dask来读取和处理我们的CSV目录。
首先,我们查看文件夹内的所有CSV数据集。我们可以使用以下代码来实现。
import dask.dataframe as dd
import glob
file_pattern = "data/*.csv"
files = glob.glob(file_pattern)
输出将类似于下面的列表。如果你的数据文件夹中有许多CSV文件,它可能会更长。
['data/features_3_sec.csv', 'data/features_30_sec.csv']
使用上述列表,我们将使用Dask CSV读取器读取所有CSV文件。
ddf = dd.read_csv(file_pattern, assume_missing=True)
在上述代码中,Dask不会立即将CSV数据加载到内存中。相反,它创建了一个惰性DataFrame,其中每个(或部分)成为一个分区。我们还假设一个缺失参数将使推断的数据类型更加灵活。
在后台,Dask已经自动化了并行化过程,因此当我们调用Dask CSV读取器时,我们不需要手动分割数据;它已经将其分解为可管理的块大小。
我们可以通过读取CSV文件目录来检查分区的数量。
print("Number of partitions:", ddf.npartitions)
输出类似于“Number of partitions: 2”。
让我们尝试使用以下代码过滤数据。
filtered_ddf = ddf[ddf["rms_mean"] > 0.1]
你可能熟悉上述操作,因为它们类似于Pandas过滤。然而,Dask在每个操作上懒惰地应用了这些操作,以便不将所有数据加载到内存中。
然后,我们可以使用以下代码对过滤后的数据集执行计算操作。
mean_spectral_centroid_mean = filtered_ddf["spectral_centroid_mean"].mean().compute()
print("Mean of feature2 for rows where rms_mean > 0.1:", mean_spectral_centroid_mean)
输出将类似于以下内容。
Mean of feature2 for rows where rms_mean > 0.1: 2406.2594844026335
在上述代码中,我们对所有分区执行均值操作,并且仅在使用触发器时才执行实际计算。最终结果将存储在内存中。
如果你想保存经过所有计算过程的每个分区,我们可以使用以下代码。
filtered_ddf.to_csv("output/filtered_*.csv", index=False)
CSV数据集将是所有先前过滤的分区,并存储在我们的本地。
现在,我们可以使用以下代码来控制内存限制、工作线程数量和线程。
from dask.distributed import Client
client = Client(n_workers=4, threads_per_worker=1, memory_limit="2GB")
通过工作线程,我们指的是可以独立执行任务的单独进程。我们还为每个工作线程分配一个线程,以便工作线程可以在不同核心上与其他工作线程并行运行任务。最后,我们设置内存限制,以便进程不会超过我们的限制。
说到内存,我们可以使用blocksize参数控制每个分区中的数据量。
ddf_custom = dd.read_csv("data/*.csv", blocksize="5MB", assume_missing=True)
blocksize参数将对每个分区的大小进行限制。这种灵活性是Dask的优势之一,允许用户无论文件大小如何都能高效工作。
最后,我们可以使用以下代码对每个分区分别执行每个操作,而不是在所有分区上聚合它。
partition_means = ddf_custom["spectral_centroid_mean"].map_partitions(lambda df: df.mean()).compute()
print(partition_means)
结果将类似于以下数据系列。
0 2201.780898
1 2021.533468
2 2376.124512
dtype: float64
你可以看到,自定义的blocksize将我们的2个CSV文件分为3个分区,并且我们可以对每个分区进行操作。
这就是使用Dask处理CSV文件目录的简单介绍。你可以尝试使用你的CSV数据集并执行更复杂的操作。
结论
CSV文件是许多公司用作数据存储的标准文件,它可能会累积并且大小变大。通常的库,如Pandas,很难处理这些大数据文件,这使得我们考虑替代解决方案。Dask库正是为了解决这个问题而诞生的。
在本文中,我们了解到Dask可以从目录中读取多个CSV文件,将数据分区为可管理的块,并通过惰性评估执行并行计算,提供对内存和处理资源的灵活控制。这些示例展示了Dask在数据操作活动中的强大之处。
希望这对你有所帮助!