好湿?好紧?好多水好爽自慰,久久久噜久噜久久综合,成人做爰A片免费看黄冈,机机对机机30分钟无遮挡

主頁 > 知識庫 > 5分鐘快速掌握Python定時任務框架的實現

5分鐘快速掌握Python定時任務框架的實現

熱門標簽:惠州龍門400電話要怎么申請 上海企業外呼系統 熱門電銷機器人 智能機器人電銷神器 okcc外呼系統怎么調速度 外呼電信系統 萬利達百貨商場地圖標注 河南虛擬外呼系統公司 電話機器人哪里有賣

APScheduler 簡介

在實際開發中我們經常會碰上一些重復性或周期性的任務,比如像每天定時爬取某個網站的數據、一定周期定時運行代碼訓練模型等,類似這類的任務通常需要我們手動來進行設定或調度,以便其能夠在我們設定好的時間內運行。

在 Windows 上我們可以通過計劃任務來手動實現,而在 Linux 系統上往往我們會用到更多關于 crontab 的相關操作。但手動管理并不是一個很好的選擇,如果我們需要有十幾個不同的定時任務需要管理,那么每次通過人工來進行干預未免有些笨拙,那這時候就真的是「人工智能」了。

所以將這些定時任務的調度代碼化才是能夠讓我們很好地從這種手動管理的純人力操作中解脫出來。

在 Python 生態中對于定時任務的一些操作主要有那么幾個:

  • schedule:第三方模塊,該模塊適合比較輕量級的一些調度任務,但卻不適用于復雜時間的調度
  • APScheduler:第三方定時任務框架,是對 Java 第三方定時任務框架 Quartz 的模仿與移植,能提供比 schedule 更復雜的應用場景,并且各種組件都是模塊化,易于使用與二次開發。
  • Celery Beat:屬于 celery 這分布式任務隊列第三方庫下的一個定時任務組件,如果使用需要配合 RabbitMQ 或 Redis 這類的消息隊列套件,需要花費一定的時間在環境搭建上,但在高版本中已經不支持 Windows。

所以為了滿足能夠相對復雜的時間條件,又不需要在前期的環境搭建上花費很多時間的前提下,選擇 APScheduler 來對我們的調度任務或定時任務進行管理是個性價比極高的選擇。而本文主要會帶你快速上手有關 APScheduler 的使用。

APScheduler 概念與組件

雖然說官方文檔上的內容不是很多,而且所列舉的 API 不是很多,但這側面也反映了這一框架的簡單易用。所以在使用 APScheduler 之前,我們需要對這個框架的一些概念簡單了解,主要有那么以下幾個:

  • 觸發器(trigger)
  • 任務持久化(job stores)
  • 執行器(executor)
  • 調度器(scheduler)

觸發器(trigger)

所謂的觸發器就是用以觸發定時任務的組件,在 APScheduler 中主要是指時間觸發器,并且主要有三類時間觸發器可供使用:

  • date:日期觸發器。日期觸發器主要是在某一日期時間點上運行任務時調用,是 APScheduler 里面最簡單的一種觸發器。所以通常也適用于一次性的任務或作業調度。
  • interval:間隔觸發器。間隔觸發器是在日期觸發器基礎上擴展了對時間部分,比如時、分、秒、天、周這幾個部分的設定。是我們用以對重復性任務進行設定或調度的一個常用調度器。設定了時間部分之后,從起始日期開始(默認是當前)會按照設定的時間去執行任務。
  • cron:cron 表達式觸發器。cron 表達式觸發器就等價于我們 Linux 上的 crontab,它主要用于更復雜的日期時間進行設定。但需要注意的是,APScheduler 不支持 6 位及以上的 cron 表達式,最多只支持到 5 位。

任務持久化(job stores)

任務持久化主要是用于將設定好的調度任務進行存儲,即便是程序因為意外情況,如斷電、電腦或服務器重啟時,只要重新運行程序時,APScheduler 就會根據對存儲好的調度任務結果進行判斷,如果出現已經過期但未執行的情況會進行相應的操作。

APScheduler 為我們提供了多種持久化任務的途徑,默認是使用 memory 也就是內存的形式,但內存并不是持久化最好的方式。最好的方式則是通過像數據庫這樣的載體來將我們的定時任務寫入到磁盤當中,只要磁盤沒有損壞就能將數據給恢復。

APScheduler 支持的且常用的數據庫主要有:

  • sqlalchemy 形式的數據庫,這里就主要是指各種傳統的關系型數據庫,如 MySQL、PostgreSQL、SQLite 等。
  • mongodb 非結構化的 Mongodb 數據庫,該類型數據庫經常用于對非結構化或版結構化數據的存儲或操作,如 JSON。
  • redis 內存數據庫,通常用作數據緩存來使用,當然通過一些主從復制等方式也能實現當中數據的持久化或保存。

通常我們可以在創建 Scheduler 實例時創建,或是單獨為任務指定。配置的方式相對簡單,我們只需要指定對應的數據庫鏈接即可。

執行器(executor)

執行器顧名思義就是執行我們任務的對象,在計算機內通常要么是 CPU 調度任務,要么是單獨維護一個線程來運行任務。所以 APScheduler 里的執行器通常就是 ThreadPoolExecutor 或 ProcessPoolExecutor 這樣的線程池和進程池兩種。

當然如果是和協程或異步相關的任務調度,還可以使用對應的 AsyncIOExecutor、TwistedExecutor 和 GeventExecutor 三種執行器。

調度器(scheduler)

調度器的選擇主要取決于你當前的程序環境以及 APScheduler 的用途。根據用途的不同,APScheduler 又提供了以下幾種調度器:

  • BlockingScheduler:阻塞調度器,當程序中沒有任何存在主進程之中運行東西時,就則使用該調度器。
  • BackgroundScheduler:后臺調度器,在不使用后面任何的調度器且希望在應用程序內部運行時的后臺啟動時才進行使用,如當前你已經開啟了一個 Django 或 Flask 服務。
  • AsyncIOScheduler:AsyncIO 調度器,如果代碼是通過 asyncio 模塊進行異步操作,使用該調度器。
  • GeventScheduler:Gevent 調度器,如果代碼是通過 gevent 模塊進行協程操作,使用該調度器
  • TornadoScheduler:Tornado 調度器,在 Tornado 框架中使用
  • TwistedScheduler:Twisted 調度器,在基于 Twisted 的框架或應用程序中使用
  • QtScheduler:Qt 調度器,在構建 Qt 應用中進行使用。

通常情況下如果不是和 Web 項目或應用集成共存,那么往往都首選 BlockingScheduler 調度器來進行操作,它會在當前進程中啟動相應的線程來進行任務調度與處理;反之,如果是和 Web 項目或應用共存,那么需要選擇 BackgroundScheduler 調度器,因為它不會干擾當前應用的線程或進程狀況。

基于對以上的概念和組件認識,我們就能基本上摸清 APScheduler 的運行流程:

  • 設定調度器(scheduler)用以對任務的調度與安排進行全局統籌
  • 對相應的函數或方法上設定相應的觸發器(trigger),并添加到調度器中
  • 如有任務持久化(job stores)需要則需要設定對應的持久化層,否則默認使用內存存儲任務
  • 當觸發器被觸發時,就將任務交由執行器(executor)進行執行

APScheduler 快速上手

雖然 APScheduler 里面的概念和組件看起來有點多,但在使用上并不算很復雜,我們可以通過本節的示例就能夠很快使用。

選擇對應的 scheduler

在使用之前我們需要先實例化一個 scheduler 對象,所有的 scheduler 對象都被放在了 apscheduler.schedulers 模塊下,我們可以直接通過查看 API 文檔或者借助 IDE 補全的提示來獲取相應的 scheduler 對象。

這里我直接選取了最基礎的 BlockingScheduler:

# main.py
 
from apscheduler.schedulers.blocking import BlockingScheduler
 
scheduler = BlockingScheduler()

配置 scheduler

對于 scheduler 的一些配置我們可以直接在實例化對象時就進行配置,當然也可以在創建實例化對象之后再進行配置。

實例化時進行參數配置:

# main.py
from datetime import datetime
 
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.blocking import BlockingScheduler
 
# 任務持久化 使用 SQLite
jobstores = {
  'default': SQLAlchemyJobStore(url = 'sqlite:///jobs.db')
}
# 執行器配置
executors = {
  'default': ThreadPoolExecutor(20),
}
# 關于 Job 的相關配置,見官方文檔 API
job_defaults = {
  'coalesce': False,
  'next_run_time': datetime.now()
}
scheduler = BlockingScheduler(
 jobstores = jobstores,
 executors = executors,
 job_defaults = job_defaults,
 timezone = 'Asia/Shanghai'
)

或是通過 scheduler.configure 方法進行同樣的操作:

scheduler = BlockingScheduler()
scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone='Asia/Shanghai')

添加并執行你的任務

創建 scheduler 對象之后,我們需要調用其下的 add_job() 或是 scheduled_job() 方法來將我們需要執行的函數進行注冊。前者是以傳參的形式指定對應的函數名,而后者則是以裝飾器的形式直接對我們要執行的函數進行修飾。

比如我現在有一個輸出此時此刻時間的函數 now():

from datetime import datetime
 
def now(trigger):
  print(f"trigger:{trigger} -> {datetime.now()}")

然后我打算每 5 秒的時候運行一次,那我們使用 add_job() 可以這樣寫:

if __name__ == '__main__':
  scheduler.add_job(now, trigger = "interval", args = ("interval",), seconds = 5)
  scheduler.start()

在調用 start() 方法之后調度器就會開始執行,并在控制臺上看到對應的結果了:

trigger:interval -> 2021-01-16 21:19:43.356674
trigger:interval -> 2021-01-16 21:19:46.679849
trigger:interval -> 2021-01-16 21:19:48.356595

當然使用 @scheduled_job 的方式來裝飾我們的任務或許會更加自由一些,于是上面的例子就可以寫成這樣:

@scheduler.scheduled_job(trigger = "interval", args = ("interval",), seconds = 5)
def now(trigger):
  print(f"trigger:{trigger} -> {datetime.now()}")
 
if __name__ == '__main__':
  scheduler.start()

運行之后就會在控制臺看到同樣的結果了。

不過需要注意的是,添加任務一定要在 start() 方法執行前調用,否則會找不到任務或是拋出異常。

將 APScheduler 集成到 Web 項目中

如果你是正在做有關的 Web 項目且存在一些定時任務,那么得益于 APScheduler 由于多樣的調度器,我們能夠將其和我們的項目結合到一起。

如果你正在使用 Flask,那么 Flask-APScheduler 這一別人寫好的第三方包裝庫就很適合你,雖然它沒有相關的文檔,但只要你了解了前面我所介紹的有關于 APScheduler 的概念和組件,你就能很輕易地看懂這個第三方庫倉庫里的示例代碼。

如果你使用的不是 Flask 框架,那么 APScheduler 本身也提供了一些對任務或作業的增刪改查操作,我們可以自己編寫一套合適的 API。

這里我使用的是 FastAPI 這一目前流行的 Web 框架。demo 項目結構如下:

temp-scheduler
├── config.py    # 配置項
├── main.py     # API 文件
└── scheduler.py  # APScheduler 相關設置

安裝依賴

這里我們需要的依賴不多,只需要簡單幾個即可:

pip install fastapi apscheduler sqlalchemy uvicorn

配置項

如果項目中模塊過多,那么使用一個文件或模塊來進行統一管理是最好的選擇。這里的 config.py 我們主要像 Flask 的配置那樣簡單設定:

from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.blocking import BlockingScheduler
 
class SchedulerConfig:
 
  JOBSTORES = {"default": SQLAlchemyJobStore(url="sqlite:///job.db")}
  EXECUTORS = {"default": ThreadPoolExecutor(20)}
  JOB_DEFAULTS = {"coalesce": False}
 
  @classmethod
  def to_dict(cls):
    return {
      "jobstores": cls.JOBSTORES,
      "executors": cls.EXECUTORS,
      "job_defaults": cls.JOB_DEFAULTS,
    }

在 SchedulerConfig 配置項中我們可以自己實現一個 to_dict() 類方法,以便我們后續傳參時通過解包的方式直接傳入配置參數即可。

Scheduler 相關設置

scheduler.py 模塊的設定也比較簡單,即設定對應的 scheduler 調度器即可。由于是演示 demo 我還將要定期執行的任務也放在了這個模塊當中:

import logging
from datetime import datetime
 
from apscheduler.schedulers.background import BackgroundScheduler
 
from config import SchedulerConfig
 
scheduler = BackgroundScheduler()
logger = logging.getLogger(__name__)
 
def init_scheduler() -> None:
  # config scheduler
  scheduler.configure(**SchedulerConfig.to_dict())
 
  logger.info("scheduler is running...")
 
  # schedule test
  scheduler.add_job(
    func=mytask,
    trigger="date",
    args=("APScheduler Initialize.",),
    next_run_time=datetime.now(),
  )
  scheduler.start()
 
def mytask(message: str) -> None:
  print(f"[{datetime.now()}] message: {message}")

在這一部分中:

  • init_scheduler() 方法主要用于在 API 服務啟動時被調用,然后對 scheduler 對象的配置以及測試
  • mytask() 則是我們要定期執行的任務,后續我們可以通過 APScheduler 提供的方法來自行添加任務

API 設置

在 main.py 模塊就主要存放著我們由 FastAPI 所構建的相關 API。如果在后續開發時存在多個接口,此時就需要將不同接口放在不同模塊文件中,以達到路由的分發與管理,類似于 Flask 的藍圖模式。

import logging
import uuid
from datetime import datetime
from typing import Any, Dict, Optional, Sequence, Union
 
from fastapi import FastAPI
from pydantic import BaseModel
 
from scheduler import init_scheduler, mytask, scheduler
 
logger = logging.getLogger(__name__)
 
app = FastAPI(title="APScheduler API")
app.add_event_handler("startup", init_scheduler)
 
class Job(BaseModel):
  id: Union[int, str, uuid.UUID]
  name: Optional[str] = None
  func: Optional[str] = None
  args: Optional[Sequence[Optional[str]]] = None
  kwargs: Optional[Dict[str, Any]] = None
  executor: Optional[str] = None
  misfire_grace_time: Optional[str] = None
  coalesce: Optional[bool] = None
  max_instances: Optional[int] = None
  next_run_time: Optional[Union[str, datetime]] = None
 
@app.post("/add")
def add_job(
  message: str,
  trigger: str,
  trigger_args: Optional[dict],
  id: Union[str, int, uuid.UUID],
):
  try:
    scheduler.add_job(
      func=mytask,
      trigger=trigger,
      kwargs={"message": message},
      id=id,
      **trigger_args,
    )
  except Exception as e:
    logger.exception(e.args)
    return {"status_code": 0, "message": "添加失敗"}
  return {"status_code": 1, "message": "添加成功"}
 
@app.delete("/delete/{id}")
def delete_job(id: Union[str, int, uuid.UUID]):
  """delete exist job by id"""
  try:
    scheduler.remove_job(job_id=id)
  except Exception:
    return dict(
      message="刪除失敗",
      status_code=0,
    )
  return dict(
    message="刪除成功",
    status_code=1,
  )
 
@app.put("/reschedule/{id}")
def reschedule_job(
  id: Union[str, int, uuid.UUID], trigger: str, trigger_args: Optional[dict]
):
  try:
    scheduler.reschedule_job(job_id=id, trigger=trigger, **trigger_args)
  except Exception as e:
    logger.exception(e.args)
    return dict(
      message="修改失敗",
      status_code=0,
    )
  return dict(
    message="修改成功",
    status_code=1,
  )
 
@app.get("/job")
def get_all_jobs():
  jobs = None
  try:
    job_list = scheduler.get_jobs()
    if job_list:
      jobs = [Job(**task.__getstate__()) for task in job_list]
  except Exception as e:
    logger.exception(e.args)
    return dict(
      message="查詢失敗",
      status_code=0,
      jobs=jobs,
    )
  return dict(
    message="查詢成功",
    status_code=1,
    jobs=jobs,
  )
 
@app.get("/job/{id}")
def get_job_by_id(id: Union[int, str, uuid.UUID]):
  jobs = []
  try:
    job = scheduler.get_job(job_id=id)
    if job:
      jobs = [Job(**job.__getstate__())]
  except Exception as e:
    logger.exception(e.args)
    return dict(
      message="查詢失敗",
      status_code=0,
      jobs=jobs,
    )
  return dict(
    message="查詢成功",
    status_code=1,
    jobs=jobs,
  )

以上代碼看起來很多,其實核心的就那么幾點:

FastAPI 對象 app 的初始化。這里用到的 add_event_handler() 方法就有點像 Flask 中的 before_first_request,會在 Web 服務請求伊始進行操作,理解為初始化相關的操作即可。

API 接口路由。路由通過 app 對象下的對應 HTTP 方法來實現,如 GET、POST、PUT 等。這里的裝飾器用法其實也和 Flask 很類似,就不多贅述。

scheduler 對象的增刪改查。從 scheduler.py 模塊中引入我們創建好的 scheduler 對象之后就可以直接用來做增刪改查的操作:

  • 增:使用 add_job() 方法,其主要的參數是要運行的函數(或方法)、觸發器以及觸發器參數等
  • 刪:使用 delete_job() 方法,我們需要傳入一個對應任務的 id 參數,用以能夠查找到對應的任務
  • 改:使用 reschedule_job() 方法,這里也需要一個對應任務的 id 參數,以及需要重新修改的觸發器及其參數
  • 查:使用 get_jobs() 和 get_job() 兩個方法,前者是直接獲取到當前調度的所有任務,返回的是一個包含了 APScheduler.job.Job 對象的列表,而后者是通過 id 參數來查找對應的任務對象;這里我通過底層源碼使用 __getstate__() 來獲取到任務的相關信息,這些信息我們通過事先設定好的 Job 對象來對其進行序列化,最后將信息從接口中返回。

運行

完成以上的所有操作之后,我們就可以打開控制臺,進入到該目錄下并激活我們的虛擬環境,之后運行:

uvicorn main:app 

之后我們就能在 FastAPI 默認的地址 http://127.0.0.1:8000/docs  中看到關于全部接口的 Swagger 文檔頁面了:

fastapi 集成的 swagger 頁面

之后我們可以直接在文檔里面或使用 Postman 來自己進行接口測試即可。

結尾

本文介紹了有關于 APScheduler 框架的概念及其用法,并進行了簡單的實踐。

得益于 APScheduler 的模塊化設計才可以讓我們更方便地去理解、使用它,并將其運用到我們實際的開發過程中。

從 APScheduler 目前的 Github 倉庫代碼以及 issue 來看,作者已經在開始重構 4.0 版本,當中的一些源代碼和 API 也有較大的變動,相信在 4.0 版本中將會引入更多的新特性。

但如果現階段你正打算使用或已經使用 APScheduler 用于實際生產中,那么希望本文能對會你有所幫助。

到此這篇關于5分鐘快速掌握Python定時任務框架的實現的文章就介紹到這了,更多相關Python 定時任務內容請搜索腳本之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持腳本之家!

您可能感興趣的文章:
  • 支持python的分布式計算框架Ray詳解
  • python 常用的異步框架匯總整理
  • Python編程pydantic觸發及訪問錯誤處理

標簽:綿陽 百色 淮安 秦皇島 周口 合肥 周口 綏化

巨人網絡通訊聲明:本文標題《5分鐘快速掌握Python定時任務框架的實現》,本文關鍵詞  5分鐘,快速,掌握,Python,定時,;如發現本文內容存在版權問題,煩請提供相關信息告之我們,我們將及時溝通與處理。本站內容系統采集于網絡,涉及言論、版權與本站無關。
  • 相關文章
  • 下面列出與本文章《5分鐘快速掌握Python定時任務框架的實現》相關的同類信息!
  • 本頁收集關于5分鐘快速掌握Python定時任務框架的實現的相關信息資訊供網民參考!
  • 推薦文章
    主站蜘蛛池模板: 久久er99热精品一区二区| 粗大猛烈进出呻吟声的视频| 久久中文字幕综合婷婷| 在ktv里面被客人吃奶| 女人被男人靠到爽视频30分钟| 佸伦短篇小说| 久久夜色精品国产欧美乱极品| 怡红院怡春脘| 久久久久精品白浆无码是什么意思| 国精一二二产品无人区免费应用| 国产精品高潮呻吟AV无码橙色| 天堂av2017男人的天堂| аⅴ中文在线天堂| 综合在线视频| 久久电影国产| 精品无码亚洲一区二区三区毛 | 在线播放真实国产乱子伦| 女被?c??扒衣服种草莓| 亚天堂| 成年色黄大色黄大片视频| 日本性交| 91高清免费视频| 国产的视频| 禁欲高肉h| 你的好大用力深一点| 粗大撑开紧窄惨叫| aⅴ女同性看片| 怡红院在线视频精品观看| 亚洲一区第一页| 日韩推理片在线免费看网站| 激烈大尺度呻吟床震视频| 男同在线观看未删减| 99福利备好纸巾| 亚洲偷窥女厕一区二区三区| FreeXXXXⅩ性大陆美女| 他的昂扬对准她湿润的入口| GOGO全球高清大尺度电影戏梦巴黎| 亚洲天堂久久久| japan极品人妻hd| 17c在线精品无码秘?入口| 日本热妇|