mongodb python同步两个数据库数据

有时候需要做一些迁移工作,需要对mongodb进行迁移。默认的工具貌似也十分好用的。缺少像Navicat 之于mysql的这样神级的软件。
 
所以自己动手写代码完成:
 
# -*- coding: utf-8 -*-
# @Time : 2022/4/6 4:41
# @File : database_migrate.py
# @Author : Rocky C@www.30daydo.com
import time
from loguru import logger
import pymongo

ignore_db = ['admin', 'config', 'local',
] # 忽略更新的库

ignore_col = [('db_stock','dfcf_list_full')]

logger.add('mongo.log')

# 数据库同步
def get_client(user, password, host, port):
connect_uri = f'mongodb://{user}:{password}@{host}:{port}'
client = pymongo.MongoClient(connect_uri)
return client


def origin():
return get_client('admin', 'password', '127.0.0.1', '27017')


def target():
return get_client('root', 'password', '127.0.0.1', '27017')


def transfer():
origin_client = origin()
target_client = target()
dbs = get_db_name(origin_client)
for db in dbs:
for col in get_collection_name(origin_client, db):

if (db,col) in ignore_col:
continue
items = []
logger.info(f'正在更新{db} {col}')
for i in get_item(origin_client, db, col):
items.append(i)

insert_item(target_client, db, col, items)
logger.info(f'更新数据库 {db} {col}')
# time.sleep(0.5)

def get_item(client, db_name, col):
return client[db_name][col].find()



def insert_item(client, db_name, col, data):
batch = 1000
count = len(data)//batch + 1
for i in range(count):
item = data[i*batch:(i+1)*batch]

if len(item)==0:
continue

try:
client[db_name][col].insert_many(item)
except Exception as e:
logger.error(e)
logger.error(f'{db_name} {col} 插入出错')


def get_db_name(client):
db_name = client.list_database_names()
dbs = []
for db in db_name:
if db not in ignore_db:
dbs.append(db)
return dbs

def delete_col(client,db,col):
try:
client[db][col].delete_many({})
except Exception as e:
logger.error(e)
logger.error(db)
logger.error(col)
return False
else:
return True

def server_compare():
'''
比较2个数据库是否相同,只是单纯比较条数
'''
origin_client = origin()
target_client = target()
dbs = get_db_name(origin_client)

for db in dbs:
for col in get_collection_name(origin_client, db):
origin_count = origin_client[db][col].count_documents({})
target_count = target_client[db][col].count_documents({})
if origin_count!=target_count:
logger.info(f'collection {db} {col}有区别')
#
if delete_col(target_client,db,col):
items = []
logger.info(f'正在更新{db} {col}')
for i in get_item(origin_client, db, col):
items.append(i)

insert_item(target_client, db, col, items)
logger.info(f'更新数据库 {db} {col}')
time.sleep(1)




def get_collection_name(client, db_name):
collection_names = client[db_name].list_collection_names(session=None)
return collection_names

def main():
server_compare()

if __name__ == '__main__':
main()

 原理就是不断迭代,不同的数据库,里面的不同的collection。
对于同名collection,通过条数是否一致,来决定是否要把原数据复制过来。 
 
保存上面文件为main.py
 
执行 python main.py
 
就可以进行数据同步工作啦。

0 个评论

要回复文章请先登录注册