python mongodb大数据(>3GB)转移Mysql数据库

数据约为5GB左右,如果直接用
for i in doc.find({})
进行逐行遍历的话,游标就会超时,而且越到后面速度越慢.
 
 于是使用了分段遍历的方法.
# -*-coding=utf-8-*-
import pandas as pd
import json
import pymongo
from sqlalchemy import create_engine

# 将mongo数据转移到mysql

client = pymongo.MongoClient('xxx')
doc = client['spider']['meituan']
engine = create_engine('mysql+pymysql://xxx:xxx@xxx:/xxx?charset=utf8')


def classic_method():
temp =
start = 0
# 数据太大还是会爆内存,或者游标丢失
for i in doc.find().batch_size(500):
start += 1
del i['_id']
temp.append(i)
print(start)

print('start to save to mysql')
df = pd.read_json(json.dumps(temp))
df = df.set_index('poiid', drop=True)
df.to_sql('meituan', con=engine, if_exists='replace')
print('done')


def chunksize_move():
block = 10000
total = doc.find({}).count()
iter_number = total // block

for i in range(iter_number + 1):
small_part = doc.find({}).limit(block).skip(i * block)
list_data =

for item in small_part:
del item['_id']
del item['crawl_time']
item['poiid'] = int(item['poiid'])
for k, v in item.items():
if isinstance(v, dict) or isinstance(v, list):

item[k] = json.dumps(v, ensure_ascii=False)

list_data.append(item)

df = pd.DataFrame(list_data)
df = df.set_index('poiid', drop=True)

try:
df.to_sql('meituan', con=engine, if_exists='append')
print('to sql {}'.format(i))
except Exception as e:
print(e)

chunksize_move()

 

to_sql.PNG

速度比一次批量的要快不少.

0 个评论

要回复文章请先登录注册