MySQL單表千萬(wàn)級(jí)數(shù)據(jù)處理的思路分享
在處理過(guò)程中,今天上午需要更新A字段,下午爬蟲(chóng)組完成了規(guī)格書(shū)或圖片的爬取又需要更新圖片和規(guī)格書(shū)字段,由于單表千萬(wàn)級(jí)深度翻頁(yè)會(huì)導(dǎo)致處理速度越來(lái)越慢。
select a,b,c from db.tb limit 10000 offset 9000000
但是時(shí)間是有限的,是否有更好的方法去解決這種問(wèn)題呢?
改進(jìn)思路是否有可以不需要深度翻頁(yè)也可以進(jìn)行數(shù)據(jù)更新的憑據(jù)?是的,利用自增id列
觀察數(shù)據(jù)特征此單表有自增id列且為主鍵,根據(jù)索引列查詢數(shù)據(jù)和更新數(shù)據(jù)是最理想的途徑。
select a,b, c from db.tb where id=9999999;update db.tb set a=x where id=9999999;多進(jìn)程處理
每個(gè)進(jìn)程處理一定id范圍內(nèi)的數(shù)據(jù),這樣既避免的深度翻頁(yè)又可以同時(shí)多進(jìn)程處理數(shù)據(jù)。提高數(shù)據(jù)查詢速度的同時(shí)也提高了數(shù)據(jù)處理速度。下面是我編寫(xiě)的任務(wù)分配函數(shù),供參考:
def mission_handler(all_missions, worker_mission_size): ''' 根據(jù)總?cè)蝿?wù)數(shù)和每個(gè)worker的任務(wù)數(shù)計(jì)算出任務(wù)列表, 任務(wù)列表元素為(任務(wù)開(kāi)始id, 任務(wù)結(jié)束id)。 例: 總?cè)蝿?wù)數(shù)100個(gè),每個(gè)worker的任務(wù)數(shù)40, 那么任務(wù)列表為:[(1, 40), (41, 80), (81, 100)] :param all_missions: 總?cè)蝿?wù)數(shù) :param worker_mission_size: 每個(gè)worker的最大任務(wù)數(shù) :return: [(start_id, end_id), (start_id, end_id), ...] ''' worker_mission_ids = [] current_id = 0 while current_id <= all_missions:start_id = all_missions if current_id + 1 >= all_missions else current_id + 1end_id = all_missions if current_id + worker_mission_size >= all_missions else current_id + worker_mission_sizeif start_id == end_id: if worker_mission_ids[-1][1] == start_id:breakworker_mission_ids.append((start_id, end_id))current_id += worker_mission_size return worker_mission_ids
假設(shè)單表id最大值為100, 然后我們希望每個(gè)進(jìn)程處理20個(gè)id,那么任務(wù)列表將為:
>>> mission_handler(100, 40)[(1, 40), (41, 80), (81, 100)]
那么,進(jìn)程1將只需要處理id between 1 to 40的數(shù)據(jù);進(jìn)程2將只需要處理id between 41 to 80的數(shù)據(jù);進(jìn)程3將只需要處理id between 81 to 100的數(shù)據(jù)。
from concurrent.futures import ProcessPoolExecutordef main(): # 自增id最大值 max_id = 30000000 # 單worker處理數(shù)據(jù)量 worker_mission_size = 1000000 # 使用多進(jìn)程進(jìn)行處理 missions = mission_handler(max_id, worker_mission_size) workers = [] executor = ProcessPoolExecutor() for idx, mission in enumerate(missions):start_id, end_id = missionworkers.append(executor.submit(data_handler, start_id, end_id, idx))def data_handler(start_id, end_id, worker_id): pass思路總結(jié) 避免深度翻頁(yè)進(jìn)而使用自增id進(jìn)行查詢數(shù)據(jù)和數(shù)據(jù) 使用多進(jìn)程處理數(shù)據(jù) 數(shù)據(jù)處理技巧
記錄處理成功與處理失敗的數(shù)據(jù)id,以便后續(xù)跟進(jìn)處理
# 用另外一張表記錄處理狀態(tài)insert into db.tb_handle_status(row_id, success) values (999, 0);
循環(huán)體內(nèi)進(jìn)行異常捕獲,避免程序異常退出
def data_handler(start_id, end_id, worker_id): # 數(shù)據(jù)連接 conn, cursor = mysql() current_id = start_idtry: while current_id <= end_id:try: # TODO 數(shù)據(jù)處理代碼 passexcept Exception as e: # TODO 記錄處理結(jié)果 # 數(shù)據(jù)移動(dòng)到下一條 current_id += 1 continueelse: # 無(wú)異常,繼續(xù)處理下一條數(shù)據(jù) current_id += 1except Exception as e: return ’worker_id({}): result({})’.format(worker_id, False)finally: # 數(shù)據(jù)庫(kù)資源釋放 cursor.close() conn.close()return ’worker_id({}): result({})’.format(worker_id, True)
更新數(shù)據(jù)庫(kù)數(shù)據(jù)盡量使用批量提交
sql = '''update db.tb set a=%s, b=%s where id=%s'''values = [ (’a_value’, ’b_value’, 9999), (’a_value’, ’b_value’, 9998), ... ]# 批量提交,減少網(wǎng)絡(luò)io以及鎖獲取頻率cursor.executemany(sql, values)
以上就是MySQL單表千萬(wàn)級(jí)數(shù)據(jù)處理的思路分享的詳細(xì)內(nèi)容,更多關(guān)于MySQL單表千萬(wàn)級(jí)數(shù)據(jù)處理的資料請(qǐng)關(guān)注好吧啦網(wǎng)其它相關(guān)文章!
相關(guān)文章:
1. mysql-bin.000001文件的來(lái)源及處理方法2. MySQL雙主(主主)架構(gòu)配置方案3. MySQL 的啟動(dòng)選項(xiàng)和系統(tǒng)變量實(shí)例詳解4. Access數(shù)據(jù)庫(kù)安全的幾個(gè)問(wèn)題5. DB2中多種常用功能的解決方法(1)6. Sqlserver之死鎖查詢以及批量解鎖的實(shí)現(xiàn)方法7. SQLite3 API 編程手冊(cè)8. 初識(shí)SQLITE3數(shù)據(jù)庫(kù)9. SQL Server數(shù)據(jù)庫(kù)連接查詢和子查詢實(shí)戰(zhàn)案例10. 一文帶你了解MySQL的左連接與右連接
