項目背景
在處理過程中,今天上午需要更新A字段,下午爬蟲組完成了規格書或圖片的爬取又需要更新圖片和規格書字段,由于單表千萬級深度翻頁會導致處理速度越來越慢。
select a,b,c from db.tb limit 10000 offset 9000000
但是時間是有限的,是否有更好的方法去解決這種問題呢?
改進思路
是否有可以不需要深度翻頁也可以進行數據更新的憑據?
是的,利用自增id列
觀察數據特征
此單表有自增id列且為主鍵,根據索引列查詢數據和更新數據是最理想的途徑。
select a,b, c from db.tb where id=9999999;
update db.tb set a=x where id=9999999;
多進程處理
每個進程處理一定id范圍內的數據,這樣既避免的深度翻頁又可以同時多進程處理數據。
提高數據查詢速度的同時也提高了數據處理速度。
下面是我編寫的任務分配函數,供參考:
def mission_handler(all_missions, worker_mission_size):
"""
根據總任務數和每個worker的任務數計算出任務列表, 任務列表元素為(任務開始id, 任務結束id)。
例: 總任務數100個,每個worker的任務數40, 那么任務列表為:[(1, 40), (41, 80), (81, 100)]
:param all_missions: 總任務數
:param worker_mission_size: 每個worker的最大任務數
:return: [(start_id, end_id), (start_id, end_id), ...]
"""
worker_mission_ids = []
current_id = 0
while current_id = all_missions:
start_id = all_missions if current_id + 1 >= all_missions else current_id + 1
end_id = all_missions if current_id + worker_mission_size >= all_missions else current_id + worker_mission_size
if start_id == end_id:
if worker_mission_ids[-1][1] == start_id:
break
worker_mission_ids.append((start_id, end_id))
current_id += worker_mission_size
return worker_mission_ids
假設單表id最大值為100, 然后我們希望每個進程處理20個id,那么任務列表將為:
>>> mission_handler(100, 40)
[(1, 40), (41, 80), (81, 100)]
那么,
進程1將只需要處理id between 1 to 40的數據;
進程2將只需要處理id between 41 to 80的數據;
進程3將只需要處理id between 81 to 100的數據。
from concurrent.futures import ProcessPoolExecutor
def main():
# 自增id最大值
max_id = 30000000
# 單worker處理數據量
worker_mission_size = 1000000
# 使用多進程進行處理
missions = mission_handler(max_id, worker_mission_size)
workers = []
executor = ProcessPoolExecutor()
for idx, mission in enumerate(missions):
start_id, end_id = mission
workers.append(executor.submit(data_handler, start_id, end_id, idx))
def data_handler(start_id, end_id, worker_id):
pass
思路總結
- 避免深度翻頁進而使用自增id進行查詢數據和數據
- 使用多進程處理數據
數據處理技巧
記錄處理成功與處理失敗的數據id,以便后續跟進處理
# 用另外一張表記錄處理狀態
insert into db.tb_handle_status(row_id, success) values (999, 0);
循環體內進行異常捕獲,避免程序異常退出
def data_handler(start_id, end_id, worker_id):
# 數據連接
conn, cursor = mysql()
current_id = start_id
try:
while current_id = end_id:
try:
# TODO 數據處理代碼
pass
except Exception as e:
# TODO 記錄處理結果
# 數據移動到下一條
current_id += 1
continue
else:
# 無異常,繼續處理下一條數據
current_id += 1
except Exception as e:
return 'worker_id({}): result({})'.format(worker_id, False)
finally:
# 數據庫資源釋放
cursor.close()
conn.close()
return 'worker_id({}): result({})'.format(worker_id, True)
更新數據庫數據盡量使用批量提交
sql = """update db.tb set a=%s, b=%s where id=%s"""
values = [
('a_value', 'b_value', 9999),
('a_value', 'b_value', 9998),
...
]
# 批量提交,減少網絡io以及鎖獲取頻率
cursor.executemany(sql, values)
以上就是MySQL單表千萬級數據處理的思路分享的詳細內容,更多關于MySQL單表千萬級數據處理的資料請關注腳本之家其它相關文章!
您可能感興趣的文章:- mysql千萬級數據大表該如何優化?
- MySQL千萬級數據表的優化實戰記錄