话说啊,几天之前,我接到了这么一个任务,就是把大量MySQL的数据导入到Elastic Search中,再久之前我还有个把数据导入到MongoDB中。MongoDB和ES本身还是比较接近的,只不过这次导入到ES的数据量非常大,最大的表大概有4GiB左右,咱得必须想点方法优化一下这个导入过程。
当然了,凡是涉及到对海量数据进行处理的,都可以用本篇提到的思路进行处理。
咱先说说思路。咋办捏?最笨的办法哈,从MySQL中读取一行,根据列名转换成对应形式的字典,然后插入到MongoDB/ES中,然后再读一行,转换,插入,直到读取完。这样吧,说实话,效率是有点低,但是也能完成任务对吧。包括俺的mentor、还有其他Python的专家们,都反对俺这种读一行写一行的处理方式。
其实啊,我也知道insert_many()
、executemany()
和writelines()
这种批量的方法,另一种思路呢,就是把数据存到一个列表里,然后迭代批量操作。但是怎奈我都是处理很大的数据的,如果一下子把这些东西塞到内存里,怕是没等运行完呢,就Memory Error或者发生大量的swap交换再Kernel Panic了吧。
当然了,第二种思路其实是能够有办法完成的,要不我写这篇文章是干嘛的!
当然了,咱要先从导入少量数据开始。
导入少量数据到MongoDB
废话少说,直接贴以前的代码。太懒了,直接贴图了。
大概就是这么回事,逻辑非常简单明了。
data = cur.fetchall()
将数据库表中的内容(结果集)一下子塞到内存里,其形式为列表内套元组,然后下面根据它的长度进行循环,以此进行构造字典、调用插入,直到不满足循环条件。
当然了,大家看我注释能发现,pymongo还是有些坑的,比如说就关于Duplicate Key Error
这个错误,如果把mongo_dic
这个变量写到循环外面,那就要用copy()
浅拷贝来让驱动生成不同的_id
,如果放在循环里,那么每次每次循环驱动都会生成不同的_id
啦。
这种做法有何问题呢?有很多
一行一个操作效率太低
这可能是这种做法最大的问题了,效率太低,对数据库压力也不小,唯一开心的是内存了。
无法快速重用代码
假如某一天我想导入另一个数据表,列很有可能是不同的,那么构造字典这一块就要推倒重来,很难做到快速重用代码。况且,用update()
方法来构造字典好像不是那么常见,咱都是mongo_dic=dict(key1=value1,key2=value2)
的。
无法处理大量数据
咦?咱开篇不是说到,一行一行的读是可以处理大量数据只不过效率偏低。
没错啊,但是那是指的使用fetchone()
,fetchmany()
这种方法。在这个demo里我图省事用了fetchall()
,如果表太大,内存就炸了。想要改进,那就得放到一个while循环里用fetchone()
,这里就不用演示了,估计有点基础的人都能分分钟写出来。
其他小问题
什么docstring写的不对,不Pythonic,try...except不规范等等等就不用说了。我后来还写了一个OOP的版本。反正这玩意是,能用,但是性能太差,不好用;糊弄小盆友可以,工作上用就等着被开除吧(尽管我mentor说,嗯这次代码写得很干净,很不错,但是咱怎么能知足呢!)
痛定思痛
于是乎,在国庆假期之前,我被Python的多线程彻底坑了一次(GIL,全局解释器锁),然后顺带着开始了熟悉Elastic Search(然而现在依旧什么都不会);
然后在国庆假期之后,我的新任务就是熟悉ELK(Elastic Search+Logstash+Kibana),居然有一种大数据工程师的感觉。
然而我并没有抽出时间来熟悉这玩意,因为我忙着难过、写Telegram Bot、翻译、维护博客、维护两款主题来着
然后某一天,我就有了个小任务,要想办法把某个数据库里最大的4个表导入到另一台机器上的ES中。感受一下这13GiB的表……
按着MongoDB的思路来
此路不太好走,内心深处的我纠结了很久才决定回头。
构造四个字典
我:这算啥事,哥哥小时候很穷,新衣服都穿不起,没少吃苦,这点算啥,哪怕是曾经138列,我都敢再来一次。为什么我的眼里常含泪水,因为我对你爱的深沉啊
某童鞋:你真无聊。
fetchone读一行写一行
我:效率是有点低,但是任务还是能完成的。
老板:你丫拿着工资不干正事,这效率太低了。
CPU:我XXXX。算了,我忍。
硬盘:你妹!你是测试我4K性能呢么?
Python:哎内存大哥,把这个放进去,哎把那个拿出来,哎把这个放进去给原来的拿走,哎给这个拿走。
内存:有完没完……( ̄_ ̄|||)
读到一个列表里,一起插入
我:效率是高了点,就是不知道能不能行。
老板:嗯,看样子还是不错的哈……
内存:Python大哥你悠着点,我这才8G,你这表4G就占了一半了啊。
Python:内存大哥你放心,我保证把200MiB的表读入内存时就跟你要4G的地。
硬盘:我了割草内核你干啥突然要来写入/swap,我这老胳膊老腿了,你有啥事找隔壁内存啊,我就是个备胎啊找你真爱内存去。
内核:我也很无奈啊,没办法了只有你能救我,我还是爱你的。哎,Python那家伙吃了太多内存,我等等看看要不要手起刀落施展生杀大权。
其他程序:喂喂喂 内核,我要I/O,阻塞好久了!!快,快!一会主人逼急了断电了你负责么?
CPU:大家干啥呢那么热闹?
内核:大家别急,让我为民除害,杀了Python这个……负 心 汉
Python:内核大哥,别别别,不是我的错,是我主人太太太…
主人:哎Python你干啥啊读200MiB的数据库跟人要4G的内存,造反啊
Kernel Panic了
神说要有光,就有了光。坐在椅子上的我仰天长笑(并做出一种神经病一样的诡异表情):Let there be light.
处理大量结果集、读取多行并写入ES的方法
既然如此,那么咱只好采取读多行,写入ES,再读取多行,写入ES,直到全部完成为止。
鉴于这样的需求,我们需要解决那些问题呢?
读取多行时的内存消耗
我使用的MySQL驱动是由MySQL官方维护的MySQL Connector/Python,而不是大部分网上教程所用的MySQLdb(MySQL-Python)。不选择用MySQLdb的原因是它上次更新是2014年,这都2017年了,真是不敢用不更新的基础库啊
图片来源,pypi
根据MySQL Connector官方文档的说明,Connector默认不会缓存结果集。换句话说,在执行cur.execute()
之后,程序应该负责处理数据(大白话,你就是select了一个100TB的表,内存也是不会暴涨的)。假如结果集很小,能够一次性处理,那么可以选择buffered=True
来缓存数据(或者是设置另一个游标),咱的经验嘛,fetchall()
也行,就是得每次运行之后都来一遍。
注意:貌似MySQLdb是默认缓存的。
那如果结果集很大呢,自然不能fetchall()
,也不能设置buffered=True
了,而要用fetchone()
或者fetchmany()
。其中fetchmany()
这个方法就神奇了,它的参数是指定本次读取多少行结果集(不写就是1行,和fetchone()
一样),然后再次运行会继续读取对应行的结果集,一行都不对丢,全都读完了返回空列表。所以咱程序员们完全不用考虑offset和limit这类烦人的问题。
PS,我觉得这就是库提供的处理大结果集的方式……没找到还有没有更好的方法了。
哎,咱得说一句,网上的这群人啊,你抄我我抄你,你说有用吧还说得过去,都是没用的错误的,有意思嘛。
啊呸,跑题太远了,咱得说说读取多行时怎么限制内存消耗。俺想到了魔术方法__sizeof__
和sys.getsizeof()
,这俩的区别嘛,sys.getsizeof()
会调用对象的__sizeof__
方法,如果对象有垃圾回收器的话,还会加上垃圾回收器的内存占用量。
所以咱测试占用100MiB内存是这么写的:
别说,还挺准。
所以咱测试结果集占用内存的时候也差不多是这么写的,然后发现,坑爹了。待我堆上一个Excel折线图:
横轴是getsizeof显示的结果集占用内存的数值(单位理应为字节),取得的数据量依次为1000、10000、10W、20W、30W。我也不知道为啥这个getsizeof显示的数值是不正确的,结果集是标准的内置对象啊,可能他只是显示了外层的列表的内存占用,我还得乘以内层的元组内存占用?(是的我算上了乘以内部元组的大小,结果还是错误的,但也是线性的,还望高人指教怎么做乘法才会得到正确的结果)
我是不是该计算y=kx一次函数了?反正就是,这玩意能用,非得计算,或者做参考,还是可以的。
getsizeof
只会获得最外层数据类型的内存占用,所以如果想获得整个变量的内存占用,要递归到基础的不可变类型。当然这递归效率就不太高了,反正管他怎么的,管他看不看这个折线图的,咱起码可以好好利用
fetchmany()
这个方法了,所以这块控制的代码大概就是下面这个样子:
cur.execute('SELECT * FROM sometable') while True: data = cur.fetchmany(SIZE) if data: pass else: break
其中SIZE
可以放到文件的最开始,或者写到配置文件中,甚至是另写一个函数、方法来控制计算内存占用。
批量插入ES
这一点似乎没什么好说的,类似MongoDB,基本上他们需要的批量插入的数据类型就是列表里面包含着字典,也就是[{},{}...]
,我们只需要构造字典,在列表中不停的使用append()
方法,就可以达到这个目的了。
手动构造字典?我拒绝
咱在最上面的demo,是手动根据数据库的列名来构造字典的key的,但是这表说不定很复杂,俺以前穷,吃点苦手动也就算了;虽说我现在也很穷,但是,我傲娇啊,我高冷啊,就要这么不近人情,不想手动搞四个字典!(再加上这代码也没办法复用啊)
根据列名自动获得键
想起来了,咱可以通过查询语句得知表里有那些列,然后把列名取出来作为字典的key,再正常执行查询得到值,然后使用某种方法把这俩动态的捏到一起,再append()
到SIZE的次数~( ̄▽ ̄)~*
没错,看起来这个思路是很好的。
那么咋搞出来列名呢?
cur.execute('SHOW COLUMNS from tb') col_data = cur.fetchall() col_field = [i[0] for i in col_data]
由于列名也不能多到哪去,直接fetchall()
肯定是没问题的,顺便应用了一下列表推导,这样col_field
这个列表里就会保存有整个表的列名啦。
根据键和表内容动态构造字典
下面可能就到了本篇中最不好搞的一部分了,反正我也不知道我是怎么搞出来的,大概是看了一眼《流畅的Python》吧,差点连字典推导都用上了(ノ*・ω・)ノ
动态构造字典大概是这样的,其中data
是fetchmany()
的结果集(列表套元组),bulk_dic
是一个列表(我这名字起的不太好):
for i in range(len(data)): es_dic = dict(zip(col_field, data[i])) bulk_dic.append(es_dic)
别问我这动态构造字典咋搞出来的,我也是吭吃瘪肚的胡编乱造了好久才弄出来的(哎这就是我和大牛的差距啊,人家一杯茶的功夫,我得搞一个小时)
为什么不尝试使用下pymysql.cursors.DictCursor呢?是吧是吧,DictCursor最方便了
齐活!拼凑代码
所以,基本上思路都有了,各个模块的功能也都实现了,那么就剩下把代码组合在一起,然后测试了!
大体思路就是,利用上面介绍的fetchmany()
限制内存占用,利用col_field和
取得的结果集构造一个正确的字典,append()
到一个列表里,执行插入,然后重新开始一轮循环。
俺最终写出来的代码大概是这样子的(懒,贴图):
exe_time
是俺写的一个装饰器,用于测量一个函数的执行时间。只要调用这个函数,传递两个参数,就可以不管不顾了,这个函数会把表的名字当作_index
插入到ES中。当然了,我这个SIZE=100
有点小,应该根据机器配置进行适当的调整。
其中还有几点我不是很理解的地方、以及我觉得有必要再多说几句的地方(我也是瞎猫碰死耗子啊)
paramStyle
看注释可以得知,
cur.execute('SHOW COLUMNS from %s' % tb)
这一句实际上是存在注入风险的。
按照mysql.connector的参数风格pyformat,正常应该是字典或者元组模式就可以的,
字典示例代码如下:
cursor.execute("SELECT spam FROM eggs WHERE lumberjack = %(jack)s", {'jack': lumberjack})
元组示例代码如下:
cursor.execute("SELECT spam FROM eggs WHERE lumberjack = %s", (lumberjack,))
然而在实际过程中却可耻的抛异常了,不晓得为什么,只好用%和格式化字符拼接下了。
_index与_type
对于_index
,如果接触过ES的人大概知道,_index
大概相当于关系数据库里面的数据库;_type
就有点像表了;
咱看个表格吧,对比下SQL、NoSQL和ES,只是个人的理解,未必准确:
SQL | Database | Table | Row | Column |
NoSQL | Database | Collection | Document | Field |
ES | Index | Type | Document | Field |
当然了,由于mentor的要求是把每一个表名作为index(数据库),所以上面代码中我是这么写的:
es_dic.update(_index=tb, _type='hey')
这个_type
就是随意起的了;
可能你的要求是index叫test,type叫表名,那就 es_dic.update(_index='test', _type=tb)
,很简单的。
哦对了,我自然还是会写一个函数从information_schema
中得出指定数据库中最大的四个表,然后自动处理的,我怎么会自己写四个调用呢!!!我这么傲娇这么高冷,要是你叫高晓松我叫韩寒我肯定天天把“高处不甚寒”挂在嘴边
理论与实际总是存在差距的
圈里有一句话,
In theory, there is no difference between theory and practice. In practice there is.
上面的这堆代码,在你处理非常非常大的表的时候,很有可能还会因为种种原因抛异常的(基本上是因为超时啊什么的),为了处理这个问题,我们需要在构造es的时候加几个参数,我也是开始导入了才发现还有这么多坑……
es = Elasticsearch(timeout=30, max_retries=10, retry_on_timeout=True)
在连接到数据库之后修改三个参数,以免超时提示Lost connection to MySQL server
cur.execute("SET GLOBAL max_allowed_packet=1073741824") cur.execute('SET GLOBAL CONNECT_TIMEOUT = 60') cur.execute('SET SESSION NET_READ_TIMEOUT = 600')
当然,具体数值需要大家自己掌握。
最后的总结
所以所以,这段函数能干啥?
接受俩参数,一个数据库名,一个表名,将表名作为index把全部数据批量高效可靠导入到ES中,不用担心内存问题,你只需要关心硬盘和选择一个合适的SIZE。
想要批量高效的导入数据到MongoDB也只不过是把helpers.bulk()
换成pymongo的insert_many()
而已。当然了,想要批量写入文件(比如说csv文件)……呃,还不更容易,都不用构造字典,用好控制内存那段代码就行了。
最后,还希望能够有高手指出,还有没有更方便简洁的方法,比如说什么什么库本身就提供了这个功能等等……
PS,把这个代码开源到GitHub,供大家学习使用,也欢迎PR……
开源地址
紧跟时代潮流,commit是要带签名的。搞得我现在基本上不想用图形化的客户端了,哎,装逼要紧啊,装完赶紧跑ε=ε=ε=(~ ̄▽ ̄)~
哦对了,秀一下我的kibana!!看起来就很高大上,是吧。
本地开发主题测试用WordPress的内心独白:
终于也有哥出头露面的一天了!
Signed-off-by
)求问:要不要用这个noreply地址给我密钥加一个Meta-UID???
脚本测试的是向我的服务器发起TCP 53的连接请求,如果被拒绝的话(服务器的TCP 53端口没有程序监听,所以内核会拒绝连接),那么就是放行了53;之所以没添加UDP 53的测试一是因为如果放行TCP 53,那么UDP 53也肯定是放行的,二是因为……服务器的UDP 53被占用了,再加上UDP53测试起来比较麻烦。
呐,你放心,未认证的网络自然nslookup都会有结果的,要不DNS隧道怎么会好用呢。
脚本报错不好用的话,最好填个issue,详细描述下你的环境啊、复现过程啊什么的。当然更欢迎Pull Request了