登录
  • 人们都希望被别人需要 却往往事与愿违
  • 投资是预测资产收益的活动, 而投机是预测市场心理的活动@凯恩斯

使用Python将海量MySQL数据导入Elastic Search/MongoDB

编程 Benny小土豆 10500次浏览 6948字 18个评论
文章目录[显示]
这篇文章在 2018年07月12日13:39:17 更新了哦~

话说啊,几天之前,我接到了这么一个任务,就是把大量MySQL的数据导入到Elastic Search中,再久之前我还有个把数据导入到MongoDB中。MongoDB和ES本身还是比较接近的,只不过这次导入到ES的数据量非常大,最大的表大概有4GiB左右,咱得必须想点方法优化一下这个导入过程。

当然了,凡是涉及到对海量数据进行处理的,都可以用本篇提到的思路进行处理。

咱先说说思路。咋办捏?最笨的办法哈,从MySQL中读取一行,根据列名转换成对应形式的字典,然后插入到MongoDB/ES中,然后再读一行,转换,插入,直到读取完。这样吧,说实话,效率是有点低,但是也能完成任务对吧。包括俺的mentor、还有其他Python的专家们,都反对俺这种读一行写一行的处理方式。

其实啊,我也知道insert_many()executemany()writelines()这种批量的方法,另一种思路呢,就是把数据存到一个列表里,然后迭代批量操作。但是怎奈我都是处理很大的数据的,如果一下子把这些东西塞到内存里,怕是没等运行完呢,就Memory Error或者发生大量的swap交换再Kernel Panic了吧。

当然了,第二种思路其实是能够有办法完成的,要不我写这篇文章是干嘛的!

当然了,咱要先从导入少量数据开始。

导入少量数据到MongoDB

废话少说,直接贴以前的代码。太懒了,直接贴图了。

使用Python将海量MySQL数据导入Elastic Search/MongoDB

大概就是这么回事,逻辑非常简单明了。

data = cur.fetchall()将数据库表中的内容(结果集)一下子塞到内存里,其形式为列表内套元组,然后下面根据它的长度进行循环,以此进行构造字典、调用插入,直到不满足循环条件。

当然了,大家看我注释能发现,pymongo还是有些坑的,比如说就关于Duplicate Key Error这个错误,如果把mongo_dic这个变量写到循环外面,那就要用copy()浅拷贝来让驱动生成不同的_id,如果放在循环里,那么每次每次循环驱动都会生成不同的_id啦。

这种做法有何问题呢?有很多

一行一个操作效率太低

这可能是这种做法最大的问题了,效率太低,对数据库压力也不小,唯一开心的是内存了。

无法快速重用代码

假如某一天我想导入另一个数据表,列很有可能是不同的,那么构造字典这一块就要推倒重来,很难做到快速重用代码。况且,用update()方法来构造字典好像不是那么常见,咱都是mongo_dic=dict(key1=value1,key2=value2)的。

无法处理大量数据

咦?咱开篇不是说到,一行一行的读是可以处理大量数据只不过效率偏低。

没错啊,但是那是指的使用fetchone()fetchmany()这种方法。在这个demo里我图省事用了fetchall(),如果表太大,内存就炸了。想要改进,那就得放到一个while循环里用fetchone(),这里就不用演示了,估计有点基础的人都能分分钟写出来。

其他小问题

什么docstring写的不对,不Pythonic,try...except不规范等等等就不用说了。我后来还写了一个OOP的版本。反正这玩意是,能用,但是性能太差,不好用;糊弄小盆友可以,工作上用就等着被开除吧(尽管我mentor说,嗯这次代码写得很干净,很不错,但是咱怎么能知足呢!)

痛定思痛

于是乎,在国庆假期之前,我被Python的多线程彻底坑了一次(GIL,全局解释器锁),然后顺带着开始了熟悉Elastic Search(然而现在依旧什么都不会);

然后在国庆假期之后,我的新任务就是熟悉ELK(Elastic Search+Logstash+Kibana),居然有一种大数据工程师的感觉。

然而我并没有抽出时间来熟悉这玩意,因为我忙着难过、写Telegram Bot、翻译、维护博客、维护两款主题来着

然后某一天,我就有了个小任务,要想办法把某个数据库里最大的4个表导入到另一台机器上的ES中。感受一下这13GiB的表……

使用Python将海量MySQL数据导入Elastic Search/MongoDB

按着MongoDB的思路来

此路不太好走,内心深处的我纠结了很久才决定回头。

构造四个字典

我:这算啥事,哥哥小时候很穷,新衣服都穿不起,没少吃苦,这点算啥,哪怕是曾经138列,我都敢再来一次。为什么我的眼里常含泪水,因为我对你爱的深沉啊

某童鞋:你真无聊。

fetchone读一行写一行

我:效率是有点低,但是任务还是能完成的。

老板:你丫拿着工资不干正事,这效率太低了。

CPU:我XXXX。算了,我忍。

硬盘:你妹!你是测试我4K性能呢么?

Python:哎内存大哥,把这个放进去,哎把那个拿出来,哎把这个放进去给原来的拿走,哎给这个拿走。

内存:有完没完……( ̄_ ̄|||)

读到一个列表里,一起插入

我:效率是高了点,就是不知道能不能行。

老板:嗯,看样子还是不错的哈……

内存:Python大哥你悠着点,我这才8G,你这表4G就占了一半了啊。

Python:内存大哥你放心,我保证把200MiB的表读入内存时就跟你要4G的地

硬盘:我了割草内核你干啥突然要来写入/swap,我这老胳膊老腿了,你有啥事找隔壁内存啊,我就是个备胎啊找你真爱内存去

内核:我也很无奈啊,没办法了只有你能救我我还是爱你的。哎,Python那家伙吃了太多内存,我等等看看要不要手起刀落施展生杀大权

其他程序:喂喂喂 内核,我要I/O,阻塞好久了!!快,快!一会主人逼急了断电了你负责么?

CPU:大家干啥呢那么热闹?

内核:大家别急,让我为民除害,杀了Python这个……负  心  汉

Python:内核大哥,别别别,不是我的错,是我主人太太太…

主人:哎Python你干啥啊读200MiB的数据库跟人要4G的内存,造反啊

Kernel Panic了

神说要有光,就有了光。坐在椅子上的我仰天长笑(并做出一种神经病一样的诡异表情):Let there be light.

处理大量结果集、读取多行并写入ES的方法

既然如此,那么咱只好采取读多行,写入ES,再读取多行,写入ES,直到全部完成为止。

鉴于这样的需求,我们需要解决那些问题呢?

读取多行时的内存消耗

我使用的MySQL驱动是由MySQL官方维护的MySQL Connector/Python,而不是大部分网上教程所用的MySQLdb(MySQL-Python)。不选择用MySQLdb的原因是它上次更新是2014年,这都2017年了,真是不敢用不更新的基础库啊

使用Python将海量MySQL数据导入Elastic Search/MongoDB

图片来源,pypi 

根据MySQL Connector官方文档的说明,Connector默认不会缓存结果集。换句话说,在执行cur.execute()之后,程序应该负责处理数据(大白话,你就是select了一个100TB的表,内存也是不会暴涨的)。假如结果集很小,能够一次性处理,那么可以选择buffered=True来缓存数据(或者是设置另一个游标),咱的经验嘛,fetchall()也行,就是得每次运行之后都来一遍。

注意:貌似MySQLdb是默认缓存的。

那如果结果集很大呢,自然不能fetchall(),也不能设置buffered=True了,而要用fetchone()或者fetchmany()。其中fetchmany()这个方法就神奇了,它的参数是指定本次读取多少行结果集(不写就是1行,和fetchone()一样),然后再次运行会继续读取对应行的结果集,一行都不对丢,全都读完了返回空列表。所以咱程序员们完全不用考虑offset和limit这类烦人的问题。

PS,我觉得这就是库提供的处理大结果集的方式……没找到还有没有更好的方法了。

哎,咱得说一句,网上的这群人啊,你抄我我抄你,你说有用吧还说得过去,都是没用的错误的,有意思嘛。

啊呸,跑题太远了,咱得说说读取多行时怎么限制内存消耗。俺想到了魔术方法__sizeof__sys.getsizeof(),这俩的区别嘛,sys.getsizeof()会调用对象的__sizeof__方法,如果对象有垃圾回收器的话,还会加上垃圾回收器的内存占用量。

所以咱测试占用100MiB内存是这么写的:

使用Python将海量MySQL数据导入Elastic Search/MongoDB

别说,还挺准。

所以咱测试结果集占用内存的时候也差不多是这么写的,然后发现,坑爹了。待我堆上一个Excel折线图:

使用Python将海量MySQL数据导入Elastic Search/MongoDB

横轴是getsizeof显示的结果集占用内存的数值(单位理应为字节),取得的数据量依次为1000、10000、10W、20W、30W。我也不知道为啥这个getsizeof显示的数值是不正确的,结果集是标准的内置对象啊,可能他只是显示了外层的列表的内存占用,我还得乘以内层的元组内存占用?(是的我算上了乘以内部元组的大小,结果还是错误的,但也是线性的,还望高人指教怎么做乘法才会得到正确的结果)

我是不是该计算y=kx一次函数了?反正就是,这玩意能用,非得计算,或者做参考,还是可以的。

后来经过一段时间的尝试,我终于用它拿到了正确的内存占用,代码可以看这里。之前拿到的结果是呈线性的原因是getsizeof只会获得最外层数据类型的内存占用,所以如果想获得整个变量的内存占用,要递归到基础的不可变类型。当然这递归效率就不太高了,

反正管他怎么的,管他看不看这个折线图的,咱起码可以好好利用fetchmany()这个方法了,所以这块控制的代码大概就是下面这个样子:

cur.execute('SELECT * FROM sometable')
while True:
  data = cur.fetchmany(SIZE)
  if data:
    pass
  else:
    break

其中SIZE可以放到文件的最开始,或者写到配置文件中,甚至是另写一个函数、方法来控制计算内存占用。

批量插入ES

这一点似乎没什么好说的,类似MongoDB,基本上他们需要的批量插入的数据类型就是列表里面包含着字典,也就是[{},{}...],我们只需要构造字典,在列表中不停的使用append()方法,就可以达到这个目的了。

手动构造字典?我拒绝

咱在最上面的demo,是手动根据数据库的列名来构造字典的key的,但是这表说不定很复杂,俺以前穷,吃点苦手动也就算了;虽说我现在也很穷,但是,我傲娇啊,我高冷啊,就要这么不近人情,不想手动搞四个字典!(再加上这代码也没办法复用啊)

根据列名自动获得键

想起来了,咱可以通过查询语句得知表里有那些列,然后把列名取出来作为字典的key,再正常执行查询得到值,然后使用某种方法把这俩动态的捏到一起,再append()到SIZE的次数~( ̄▽ ̄)~*

没错,看起来这个思路是很好的。

那么咋搞出来列名呢?

cur.execute('SHOW COLUMNS from tb')
col_data = cur.fetchall()
col_field = [i[0] for i in col_data]

由于列名也不能多到哪去,直接fetchall()肯定是没问题的,顺便应用了一下列表推导,这样col_field这个列表里就会保存有整个表的列名啦。

 

根据键和表内容动态构造字典

下面可能就到了本篇中最不好搞的一部分了,反正我也不知道我是怎么搞出来的,大概是看了一眼《流畅的Python》吧,差点连字典推导都用上了(ノ*・ω・)ノ

动态构造字典大概是这样的,其中datafetchmany()的结果集(列表套元组),bulk_dic是一个列表(我这名字起的不太好):

for i in range(len(data)):
  es_dic = dict(zip(col_field, data[i]))
  bulk_dic.append(es_dic)

别问我这动态构造字典咋搞出来的,我也是吭吃瘪肚的胡编乱造了好久才弄出来的(哎这就是我和大牛的差距啊,人家一杯茶的功夫,我得搞一个小时)

友情提示:

为什么不尝试使用下pymysql.cursors.DictCursor呢?是吧是吧,DictCursor最方便了

齐活!拼凑代码

所以,基本上思路都有了,各个模块的功能也都实现了,那么就剩下把代码组合在一起,然后测试了!

大体思路就是,利用上面介绍的fetchmany()限制内存占用,利用col_field和取得的结果集构造一个正确的字典append()到一个列表里,执行插入,然后重新开始一轮循环。

俺最终写出来的代码大概是这样子的(懒,贴图):

使用Python将海量MySQL数据导入Elastic Search/MongoDB

exe_time是俺写的一个装饰器,用于测量一个函数的执行时间。只要调用这个函数,传递两个参数,就可以不管不顾了,这个函数会把表的名字当作_index插入到ES中。当然了,我这个SIZE=100有点小,应该根据机器配置进行适当的调整

其中还有几点我不是很理解的地方、以及我觉得有必要再多说几句的地方(我也是瞎猫碰死耗子啊

paramStyle

看注释可以得知,

cur.execute('SHOW COLUMNS from %s' % tb)

这一句实际上是存在注入风险的。

按照mysql.connector的参数风格pyformat,正常应该是字典或者元组模式就可以的,

字典示例代码如下:

cursor.execute("SELECT spam FROM eggs WHERE lumberjack = %(jack)s", {'jack': lumberjack})

元组示例代码如下:

cursor.execute("SELECT spam FROM eggs WHERE lumberjack = %s", (lumberjack,))

然而在实际过程中却可耻的抛异常了,不晓得为什么,只好用%和格式化字符拼接下了。

_index与_type

对于_index,如果接触过ES的人大概知道,_index大概相当于关系数据库里面的数据库;_type就有点像表了;

咱看个表格吧,对比下SQL、NoSQL和ES,只是个人的理解,未必准确:

SQL Database Table Row Column
NoSQL Database Collection Document Field
ES Index Type Document Field

当然了,由于mentor的要求是把每一个表名作为index(数据库),所以上面代码中我是这么写的:

es_dic.update(_index=tb, _type='hey')

这个_type就是随意起的了;

可能你的要求是index叫test,type叫表名,那就 es_dic.update(_index='test', _type=tb),很简单的。

哦对了,我自然还是会写一个函数从information_schema中得出指定数据库中最大的四个表,然后自动处理的,我怎么会自己写四个调用呢!!!我这么傲娇这么高冷,要是你叫高晓松我叫韩寒我肯定天天把“高处不甚寒”挂在嘴边

理论与实际总是存在差距的

圈里有一句话,

In theory, there is no difference between theory and practice. In practice there is.

上面的这堆代码,在你处理非常非常大的表的时候,很有可能还会因为种种原因抛异常的(基本上是因为超时啊什么的),为了处理这个问题,我们需要在构造es的时候加几个参数,我也是开始导入了才发现还有这么多坑……

es = Elasticsearch(timeout=30, max_retries=10, retry_on_timeout=True)

在连接到数据库之后修改三个参数,以免超时提示Lost connection to MySQL server

cur.execute("SET GLOBAL max_allowed_packet=1073741824")
cur.execute('SET GLOBAL CONNECT_TIMEOUT = 60')
cur.execute('SET SESSION NET_READ_TIMEOUT = 600')

当然,具体数值需要大家自己掌握。

最后的总结

所以所以,这段函数能干啥?

接受俩参数,一个数据库名,一个表名,将表名作为index把全部数据批量高效可靠导入到ES中,不用担心内存问题,你只需要关心硬盘和选择一个合适的SIZE。

想要批量高效的导入数据到MongoDB也只不过是把helpers.bulk()换成pymongo的insert_many()而已。当然了,想要批量写入文件(比如说csv文件)……呃,还不更容易,都不用构造字典,用好控制内存那段代码就行了。

最后,还希望能够有高手指出,还有没有更方便简洁的方法,比如说什么什么库本身就提供了这个功能等等……

PS,把这个代码开源到GitHub,供大家学习使用,也欢迎PR……
开源地址

紧跟时代潮流,commit是要带签名的。搞得我现在基本上不想用图形化的客户端了,哎,装逼要紧啊,装完赶紧跑ε=ε=ε=(~ ̄▽ ̄)~

使用Python将海量MySQL数据导入Elastic Search/MongoDB

哦对了,秀一下我的kibana!!看起来就很高大上,是吧。

使用Python将海量MySQL数据导入Elastic Search/MongoDB

本地开发主题测试用WordPress的内心独白:

终于也有哥出头露面的一天了!


文章版权归原作者所有丨本站默认采用CC-BY-NC-SA 4.0协议进行授权|
转载必须包含本声明,并以超链接形式注明原作者和本文原始地址:
https://dmesg.app/mysql-es-mongo.html
喜欢 (23)
分享:-)
关于作者:
If you have any further questions, feel free to contact me in English or Chinese.
发表我的评论
取消评论

                     

去你妹的实名制!

  • 昵称 (必填)
  • 邮箱 (必填,不要邮件提醒可以随便写)
  • 网址 (选填)
(18)个小伙伴在吐槽
  1. 我的实现是采用SScursor 读取mysql ,多线程+bulk写入es,写入在20000左右,瓶颈在mysql的读取,导了六千万条的数据花了一个多小时
    gooff2019-07-31 13:36 回复
  2. 如果导入一部分发生失败了要怎么定位
    哈哈2018-07-05 14:05 回复
    • 看报错的信息是什么啊
      Benny小土豆2018-07-05 14:39 回复
  3. 如何导入一部分发生失败呢
    哈哈2018-07-05 14:05 回复
  4. 开源了吗,求代码一看啊,大神。。。https://github.com/BennyThink/Intern-Life 这个下面真没找到啊。。。 :sad:
    Devops2018-04-23 14:31 回复
    • 在mission 4下面,看这里
      Benny小土豆2018-04-23 14:34 回复
      • :razz: 好滴,好到了,回复的很及时呐 ,感谢感谢~
        Devops2018-04-23 14:36 回复
        • ?不客气~~欢迎⭐
          Benny小土豆2018-04-23 14:37
  5. 已知:https://github.com/ShadowRZ/Bash-Beginners-Guide-CN/commit/a213e44de091b960592682549a3f51a5b84a563e(注意我故意加的Signed-off-by
    求问:要不要用这个noreply地址给我密钥加一个Meta-UID???
    布偶君2017-11-18 12:21 回复
    • 你是啥意思?我没搞懂……
      Benny小土豆2017-11-18 14:13 回复
  6. 今天要不是因为博主贴出了github主页,我都不知道博主有一个校园网测试脚本,我之前是用SlowDNS这个APP测试的
    cok3302017-11-12 22:07 回复
    • ?好像那篇博文里也贴了唉~
      Benny小土豆2017-11-12 22:12 回复
      • 刚刚看到readme里面的指引才发现那边贴了,不过我九月初研究时还没有这个脚本呢。 网关是锐捷。 话说我用openvpn(port=53;base on softether server)和slowdns都能穿透未认证网络。但是用博主的脚本的话,认证以后测试会祝贺我;未认证测试,就一直停留在start testing,看了代码不知何解? 话说那个IP是博主的服务器吗?还是个公共DNS? 我是在Android 5.1下用termux 0.54测试的,已经apt install python==3.6,直接cd到下载目录然后python udp53.py执行之,未有报错。 我接着在未认证网络用nslookup sina.cn,8.8.8.8迅速返回了结果。不解呢~
        cok3302017-11-12 22:37 回复
        • 9月?这个脚本16年9月就有了。
          脚本测试的是向我的服务器发起TCP 53的连接请求,如果被拒绝的话(服务器的TCP 53端口没有程序监听,所以内核会拒绝连接),那么就是放行了53;之所以没添加UDP 53的测试一是因为如果放行TCP 53,那么UDP 53也肯定是放行的,二是因为……服务器的UDP 53被占用了,再加上UDP53测试起来比较麻烦。
          呐,你放心,未认证的网络自然nslookup都会有结果的,要不DNS隧道怎么会好用呢。
          脚本报错不好用的话,最好填个issue,详细描述下你的环境啊、复现过程啊什么的。当然更欢迎Pull Request了
          Benny小土豆2017-11-12 22:51
        • 哦对了,我的softether server是在实验室机房里面,和手机同在校园局域网内(10.x.x.x),可以不用认证直连外部网的。 slowdns是连接德国和尼德兰的服务器。
          cok3302017-11-12 22:54
        • 好吧,这也是个办法。
          Benny小土豆2017-11-12 22:58
  7. 一楼
    Kios2017-11-04 11:15 回复
    • 你好,我是王大锤。
      Benny小土豆2017-11-04 14:06 回复