提问者:小点点

使用pandas的“大数据”工作流程


在学熊猫的过程中,我已经想了好几个月了。我在日常工作中使用SAS,它的核心支持非常棒。然而,SAS作为一款软件是可怕的,原因还有很多。

有一天我希望用python和pandas来取代我对SAS的使用,但是我目前缺乏一个用于大型数据集的核心外工作流。我说的不是需要分布式网络的“大数据”,而是大到内存装不下,小到硬盘装不下的文件。

我首先想到的是使用hdfstore在磁盘上保存大型数据集,并只将需要的部分拉入数据帧中进行分析。其他人提到MongoDB是一个更容易使用的替代方案。我的问题是:

有哪些最佳实践工作流可以完成以下任务:

  1. 将平面文件加载到永久磁盘数据库结构中
  2. 查询该数据库以检索要输入pandas数据结构的数据
  3. 在处理熊猫中的碎片后更新数据库

真实世界的例子将非常受欢迎,特别是来自任何使用熊猫处理“大数据”的人。

编辑--我希望它如何工作的一个示例:

  1. 迭代地导入一个大型平面文件,并将其存储在一个永久的磁盘数据库结构中。这些文件通常太大,内存无法容纳。
  2. 为了使用Pandas,我希望读取内存中可以容纳的这些数据的子集(通常一次只有几列)。
  3. 我将通过对选定列执行各种操作来创建新列。
  4. 然后我必须将这些新列追加到数据库结构中。

我试图找到执行这些步骤的最佳实践方法。在阅读有关熊猫和pytables的链接时,添加一个新的专栏似乎是个问题。

编辑--具体回答Jeff的问题:

  1. 我正在建立消费信贷风险模型。数据类型包括电话,SSN和地址特征;财产价值;诸如犯罪记录,破产等贬损性信息。我每天使用的数据集平均有近1,000到2,000个混合数据类型的字段:既有数值型数据又有字符型数据的连续型,名词型和序数型变量。我很少追加行,但我确实执行了许多创建新列的操作。
  2. 典型的操作涉及使用条件逻辑将多个列组合成一个新的复合列。例如,如果var1>;2则newvar='A'elif var2=4则newvar='B'。这些操作的结果是,我的数据集中的每个记录都有一个新列。
  3. 最后,我想将这些新列追加到磁盘数据结构中。我将重复第2步,使用交叉表和描述性统计数据探索数据,试图找到有趣的,直观的关系来建模。
  4. 一个典型的项目文件通常约为1GB。文件被组织成这样一种方式,其中一行由使用者数据的记录组成。对于每个记录,每行的列数相同。这种情况将永远存在。
  5. 在创建新列时,很少会按行设置子集。但是,在创建报告或生成描述性统计信息时,对行进行子集是非常常见的。例如,我可能想为特定的业务领域创建一个简单的频率,比如零售信用卡。要做到这一点,我只选择那些line of business=retail的记录以及我想要报告的列。但是,在创建新列时,我将提取所有数据行,只提取操作所需的列。
  6. 建模过程要求我分析每个列,寻找与某个结果变量的有趣关系,并创建描述这些关系的新复合列。我所研究的专栏通常是以小的集合来完成的。例如,我将集中在一组例如20列仅仅处理财产价值的专栏上,并观察它们与拖欠贷款的关系。一旦探索了这些内容并创建了新的专栏,我就会转到另一组专栏,比如大学教育,然后重复这个过程。我所做的是创建候选变量来解释我的数据和某些结果之间的关系。在这个过程的最后,我应用了一些学习技术,从这些复合列中创建一个等式。

我很少向数据集中添加行。我几乎总是在创建新的专栏(变量或统计学/机器学习术语中的特性)。


共3个答案

匿名用户

我经常以这种方式使用几十GB的数据例如,我在磁盘上有表,我通过查询读取这些表,创建数据并追加回来。

值得阅读本线程后面的文档和一些关于如何存储数据的建议。

将影响数据存储方式的详细信息,如:
尽可能提供详细信息;我可以帮你发展一个结构。

  1. 数据大小,行数,列数,列类型;您是追加行,还是仅仅追加列?
  2. 典型的操作会是什么样子。例如。对列执行查询以选择一组行和特定列,然后执行操作(在内存中),创建新列,保存这些列。
    (给出一个玩具示例可以使我们提供更具体的建议。)
  3. 处理完之后,你要做什么?步骤2是特殊的还是可重复的?
  4. 输入平面文件:多少个,粗略总大小,单位为GB。这些是如何组织的,例如,通过记录?是否每个文件都包含不同的字段,或者每个文件都包含一些记录,其中所有字段都包含在每个文件中?
  5. 您是否曾经根据条件选择行(记录)的子集(例如,选择具有字段A>5的行)?然后执行某些操作,还是只使用所有记录选择字段A,B,C(然后执行某些操作)?
  6. 您是否“处理”所有的列(分组),或者是否有一个很好的比例可以只用于报告(例如,您希望保留数据,但不需要将该列显式地拉入到最终结果时间)?

确保您至少安装了熊猫0.10.1

逐块读取迭代文件和多个表查询。

由于pytables被优化为按行操作(这就是您查询的内容),因此我们将为每组字段创建一个表。这样可以很容易地选择一小组字段(这将与一个大表一起工作,但这样做效率更高。。。我想我将来也许可以修复这个限制。。。无论如何这更直观):
(以下是伪代码。)

import numpy as np
import pandas as pd

# create a store
store = pd.HDFStore('mystore.h5')

# this is the key to your storage:
#    this maps your fields to a specific group, and defines 
#    what you want to have as data_columns.
#    you might want to create a nice class wrapping this
#    (as you will want to have this map and its inversion)  
group_map = dict(
    A = dict(fields = ['field_1','field_2',.....], dc = ['field_1',....,'field_5']),
    B = dict(fields = ['field_10',......        ], dc = ['field_10']),
    .....
    REPORTING_ONLY = dict(fields = ['field_1000','field_1001',...], dc = []),

)

group_map_inverted = dict()
for g, v in group_map.items():
    group_map_inverted.update(dict([ (f,g) for f in v['fields'] ]))

读取文件并创建存储(实质上执行append_to_multiple所执行的操作):

for f in files:
   # read in the file, additional options may be necessary here
   # the chunksize is not strictly necessary, you may be able to slurp each 
   # file into memory in which case just eliminate this part of the loop 
   # (you can also change chunksize if necessary)
   for chunk in pd.read_table(f, chunksize=50000):
       # we are going to append to each table by group
       # we are not going to create indexes at this time
       # but we *ARE* going to create (some) data_columns

       # figure out the field groupings
       for g, v in group_map.items():
             # create the frame for this group
             frame = chunk.reindex(columns = v['fields'], copy = False)    

             # append it
             store.append(g, frame, index=False, data_columns = v['dc'])

现在文件中有了所有的表(实际上,如果您愿意,您可以将它们存储在单独的文件中,您可能必须将文件名添加到group_map中,但这可能不是必需的)。

以下是获取列并创建新列的方法:

frame = store.select(group_that_I_want)
# you can optionally specify:
# columns = a list of the columns IN THAT GROUP (if you wanted to
#     select only say 3 out of the 20 columns in this sub-table)
# and a where clause if you want a subset of the rows

# do calculations on this frame
new_frame = cool_function_on_frame(frame)

# to 'add columns', create a new group (you probably want to
# limit the columns in this new_group to be only NEW ones
# (e.g. so you don't overlap from the other tables)
# add this info to the group_map
store.append(new_group, new_frame.reindex(columns = new_columns_created, copy = False), data_columns = new_columns_created)

当您准备好进行后处理时:

# This may be a bit tricky; and depends what you are actually doing.
# I may need to modify this function to be a bit more general:
report_data = store.select_as_multiple([groups_1,groups_2,.....], where =['field_1>0', 'field_1000=foo'], selector = group_1)

关于data_columns,实际上不需要定义任何data_columns;它们允许您基于该列对行进行子选择。例如。类似于:

store.select(group, where = ['field_1000=foo', 'field_1001>0'])

在最终的报表生成阶段,它们可能是您最感兴趣的(本质上,数据列与其他列是分开的,如果您定义了很多,这可能会影响效率)。

您还可能希望:

  • 创建一个函数,该函数接受一个字段列表,在groups_map中查找组,然后选择这些组并将结果连接起来,从而得到最终的框架(这实际上是select_as_multiple所做的工作)。这样结构对您来说就非常透明了。
  • 对某些数据列建立索引(使行子集更快)。
  • 启用压缩。

有问题就告诉我!

匿名用户

我认为上面的答案遗漏了一个我发现非常有用的简单方法。

当我的文件太大而无法加载到内存中时,我会将该文件拆分成多个较小的文件(按行或按列)

例如:如果有30天的交易数据,大小约为30GB,我每天将其分解为一个大小约为1GB的文件。我随后分别处理每个文件,并在结束时汇总结果

最大的优点之一是它允许并行处理文件(多个线程或进程)

另一个优点是文件操作(如示例中的添加/删除日期)可以通过常规shell命令来完成,这在更高级/复杂的文件格式中是不可能的

这种方法并不涵盖所有场景,但在许多场景中都非常有用

匿名用户

这个问题两年后的今天,出现了一个与熊猫相当的“核心外”熊猫:达斯克。太棒了!虽然它并不支持熊猫的所有功能,但你可以用它做得更好。