Python學習教程:定時庫APScheduler的原理及用法

Python學習教程之定時庫APScheduler的原理及用法:

APScheduler簡介

APscheduler全稱Advanced Python Scheduler

作用為在指定的時間規則執行指定的作業。

指定時間規則的方式可以是間隔多久執行,可以是指定日期時間的執行,也可以類似Linux系統中Crontab中的方式執行任務。

指定的任務就是一個Python函數。

APScheduler組件

APScheduler版本 3.6.3

APScheduler中幾個重要的概念

Job 作業

作用

Job作為APScheduler最小執行單位。

創建Job時指定執行的函數,函數中所需參數,Job執行時的一些設置信息。

構建說明

id:指定作業的唯一ID

name:指定作業的名字

trigger:apscheduler定義的觸發器,用于確定Job的執行時間,根據設置的trigger規則,計算得到下次執行此job的

時間, 滿足時將會執行

executor:apscheduler定義的執行器,job創建時設置執行器的名字,根據字符串你名字到scheduler獲取到執行此

job的 執行器,執行job指定的函數

max_instances:執行此job的最大實例數,executor執行job時,根據job的id來計算執行次數,根據設置的最大實例數

來確定是否可執行

next_run_time:Job下次的執行時間,創建Job時可以指定一個時間[datetime],不指定的話則默認根據trigger獲取觸

發時間

misfire_grace_time:Job的延遲執行時間,例如Job的計劃執行時間是21:00:00,但因服務重啟或其他原因導致

21:00:31才執行,如果設置此key為40,則該job會繼續執行,否則將會丟棄此job

coalesce:Job是否合并執行,是一個bool值。例如scheduler停止20s后重啟啟動,而job的觸發器設置為5s執行

一次,因此此job錯過了4個執行時間,如果設置為是,則會合并到一次執行,否則會逐個執行

func:Job執行的函數

args:Job執行函數需要的位置參數

kwargs:Job執行函數需要的關鍵字參數

Trigger 觸發器

Trigger綁定到Job,在scheduler調度篩選Job時,根據觸發器的規則計算出Job的觸發時間,然后與當前時間比較

確定此Job是否會被執行,總之就是根據trigger規則計算出下一個執行時間。

Trigger有多種種類,指定時間的DateTrigger,指定間隔時間的IntervalTrigger,像Linux的crontab

一樣的CronTrigger

目前APScheduler支持觸發器:

DateTrigger

IntervalTrigger

CronTrigger

Executor 執行器

Executor在scheduler中初始化,另外也可通過scheduler的add_executor動態添加Executor。

每個executor都會綁定一個alias,這個作為唯一標識綁定到Job,在實際執行時會根據Job綁定的executor

找到實際的執行器對象,然后根據執行器對象執行Job

Executor的種類會根據不同的調度來選擇,如果選擇AsyncIO作為調度的庫,那么選擇AsyncIOExecutor,如果

選擇tornado作為調度的庫,選擇TornadoExecutor,如果選擇啟動進程作為調度,

選擇ThreadPoolExecutor或者ProcessPoolExecutor都可以

Executor的選擇需要根據實際的scheduler來選擇不同的執行器

目前APScheduler支持的Executor:

AsyncIOExecutor

GeventExecutor

ThreadPoolExecutor

ProcessPoolExecutor

TornadoExecutor

TwistedExecutor

Jobstore 作業存儲

Jobstore在scheduler中初始化,另外也可通過scheduler的add_jobstore動態添加Jobstore。每個jobstore都會

綁定一個alias,scheduler在Add Job時,根據指定的jobstore在scheduler中找到相應的jobstore,

并將job添加到jobstore中。

Jobstore主要是通過pickle庫的loads和dumps【實現核心是通過python的__getstate__和__setstate__重寫實現】,

每次變更時將Job動態保存到存儲中,使用時再動態的加載出來,作為存儲的可以是redis,也可以是數據庫【通過

sqlarchemy這個庫集成多種數據庫】,也可以是mongodb等

目前APScheduler支持的Jobstore:

MemoryJobStore

MongoDBJobStore

RedisJobStore

RethinkDBJobStore

SQLAlchemyJobStore

ZooKeeperJobStore

Event 事件

Event是APScheduler在進行某些操作時觸發相應的事件,用戶可以自定義一些函數來監聽這些事件,

當觸發某些Event時,做一些具體的操作

常見的比如。Job執行異常事件 EVENT_JOB_ERROR。Job執行時間錯過事件 EVENT_JOB_MISSED。

目前APScheduler定義的Event

EVENT_SCHEDULER_STARTED

EVENT_SCHEDULER_START

EVENT_SCHEDULER_SHUTDOWN

EVENT_SCHEDULER_PAUSED

EVENT_SCHEDULER_RESUMED

EVENT_EXECUTOR_ADDED

EVENT_EXECUTOR_REMOVED

EVENT_JOBSTORE_ADDED

EVENT_JOBSTORE_REMOVED

EVENT_ALL_JOBS_REMOVED

EVENT_JOB_ADDED

EVENT_JOB_REMOVED

EVENT_JOB_MODIFIED

EVENT_JOB_EXECUTED

EVENT_JOB_ERROR

EVENT_JOB_MISSED

EVENT_JOB_SUBMITTED

EVENT_JOB_MAX_INSTANCES

Listener 監聽事件

Listener表示用戶自定義監聽的一些Event,當Job觸發了EVENT_JOB_MISSED事件時可以根據需求做一些其他處理。

Scheduler 調度器

Scheduler是APScheduler的核心,所有相關組件通過其定義。scheduler啟動之后,將開始按照配置的任務進行調度。

除了依據所有定義Job的trigger生成的將要調度時間喚醒調度之外。當發生Job信息變更時也會觸發調度。

scheduler可根據自身的需求選擇不同的組件,如果是使用AsyncIO則選擇AsyncIOScheduler,使用tornado則選擇

TornadoScheduler。

目前APScheduler支持的Scheduler:

AsyncIOScheduler

BackgroundScheduler

BlockingScheduler

GeventScheduler

QtScheduler

TornadoScheduler

TwistedScheduler

這里前面一期的Python學習教程里有提到過!

Scheduler工作流程圖

這里重點挑選兩個重要的流程畫一個簡陋的流程圖,來看一下scheduler的工作原理。其一個是添加add job,另一是scheduler每次喚醒調度時的執行過程

Scheduler添加job流程

Scheduler調度流程

APScheduler使用示例

AsyncIO調度示例

import asyncio

import datetime

from apscheduler.events import EVENT_JOB_EXECUTED

from apscheduler.executors.asyncio import AsyncIOExecutor

from apscheduler.jobstores.redis import RedisJobStore # 需要安裝redis

from apscheduler.schedulers.asyncio import AsyncIOScheduler

from apscheduler.triggers.interval import IntervalTrigger

from apscheduler.triggers.cron import CronTrigger

# 定義jobstore 使用redis 存儲job信息

default_redis_jobstore = RedisJobStore(

db=2,

jobs_key="apschedulers.default_jobs",

run_times_key="apschedulers.default_run_times",

host="127.0.0.1",

port=6379,

password="test"

)

# 定義executor 使用asyncio是的調度執行規則

first_executor = AsyncIOExecutor()

# 初始化scheduler時,可以直接指定jobstore和executor

init_scheduler_options = {

"jobstores": {

# first 為 jobstore的名字,在創建Job時直接直接此名字即可

"default": default_redis_jobstore

},

"executors": {

# first 為 executor 的名字,在創建Job時直接直接此名字,執行時則會使用此executor執行

"first": first_executor

},

# 創建job時的默認參數

"job_defaults": {

'coalesce': False, # 是否合并執行

'max_instances': 1 # 最大實例數

}

}

# 創建scheduler

scheduler = AsyncIOScheduler(**init_scheduler_options)

# 啟動調度

scheduler.start()

second_redis_jobstore = RedisJobStore(

db=2,

jobs_key="apschedulers.second_jobs",

run_times_key="apschedulers.second_run_times",

host="127.0.0.1",

port=6379,

password="test"

)

scheduler.add_jobstore(second_redis_jobstore, 'second')

# 定義executor 使用asyncio是的調度執行規則

second_executor = AsyncIOExecutor()

scheduler.add_executor(second_executor, "second")

# *********** 關于 APScheduler中有關Event相關使用示例 *************

# 定義函數監聽事件

def job_execute(event):

"""

監聽事件處理

:param event:

:return:

"""

print(

"job執行job:\ncode => {}\njob.id => {}\njobstore=>{}".format(

event.code,

event.job_id,

event.jobstore

))

# 給EVENT_JOB_EXECUTED[執行完成job事件]添加回調,這里就是每次Job執行完成了我們就輸出一些信息

scheduler.add_listener(job_execute, EVENT_JOB_EXECUTED)

# *********** 關于 APScheduler中有關Job使用示例 *************

# 使用的是asyncio,所以job執行的函數可以是一個協程,也可以是一個普通函數,AsyncIOExecutor會根據配置的函數來進行調度,

# 如果是協程則會直接丟入到loop中,如果是普通函數則會啟用線程處理

# 我們定義兩個函數來看看執行的結果

def interval_func(message):

print("現在時間: {}".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")))

print("我是普通函數")

print(message)

async def async_func(message):

print("現在時間: {}".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")))

print("我是協程")

print(message)

# 將上述的兩個函數按照不同的方式創造觸發器來執行

# *********** 關于 APScheduler中有關Trigger使用示例 *************

# 使用Trigger有兩種方式,一種是用類創建使用,另一個是使用字符串的方式

# 使用字符串指定別名, scheduler初始化時已將其定義的trigger加載,所以指定字符串可以直接使用

if scheduler.get_job("interval_func_test", "default"):

# 存在的話,先刪除

scheduler.remove_job("interval_func_test", "default")

# 立馬開始 2分鐘后結束, 每10s執行一次 存儲到first jobstore second執行

scheduler.add_job(interval_func, "interval",

args=["我是10s執行一次,存放在jobstore default, executor default"],

seconds=10,

id="interval_func_test",

jobstore="default",

executor="default",

start_date=datetime.datetime.now(),

end_date=datetime.datetime.now() + datetime.timedelta(seconds=240))

# 先創建tigger

trigger = IntervalTrigger(seconds=5)

if scheduler.get_job("interval_func_test_2", "second"):

# 存在的話,先刪除

scheduler.remove_job("interval_func_test_2", "second")

# 每隔5s執行一次

scheduler.add_job(async_func, trigger, args=["我是每隔5s執行一次,存放在jobstore second, executor = second"],

id="interval_func_test_2",

jobstore="second",

executor="second")

# 使用協程的函數執行,且使用cron的方式配置觸發器

if scheduler.get_job("cron_func_test", "default"):

# 存在的話,先刪除

scheduler.remove_job("cron_func_test", "default")

# 立馬開始 每10s執行一次

scheduler.add_job(async_func, "cron",

args=["我是 每分鐘 30s 時執行一次,存放在jobstore default, executor default"],

second='30',

id="cron_func_test",

jobstore="default",

executor="default")

# 先創建tigger

trigger = CronTrigger(second='20,40')

if scheduler.get_job("cron_func_test_2", "second"):

# 存在的話,先刪除

scheduler.remove_job("cron_func_test_2", "second")

# 每隔5s執行一次

scheduler.add_job(async_func, trigger, args=["我是每分鐘 20s 40s時各執行一次,存放在jobstore second, executor = second"],

id="cron_func_test_2",

jobstore="second",

executor="second")

# 使用創建trigger對象直接創建

print("啟動: {}".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")))

asyncio.get_event_loop().run_forever()

輸出結果部分截取

啟動之后,每隔5s運行一次的JOB

啟動: 2019-12-05 14:13:11

【這部分是定義的協程函數輸出的內容】

現在時間: 2019-12-05 14:13:16

我是協程

我是每隔5s執行一次,存放在jobstore second, executor = second

【這部分是監聽job執行完成之后的回調輸出】

job執行job:

code => 4096

job.id => interval_func_test_2

jobstore=>second

在20s和40s時各執行一次的Job

現在時間: 2019-12-05 14:13:20

我是協程

我是每分鐘 20s 40s時各執行一次,存放在jobstore second, executor = second

job執行job:

code => 4096

job.id => cron_func_test_2

jobstore=>second

每隔10s執行一次的job

現在時間: 2019-12-05 14:13:21

我是普通函數

我是10s執行一次,存放在jobstore default, executor default

現在時間: 2019-12-05 14:13:21

我是協程

我是每隔5s執行一次,存放在jobstore second, executor = second

job執行job:

code => 4096

job.id => interval_func_test

jobstore=>default

job執行job:

code => 4096

job.id => interval_func_test_2

jobstore=>second

每隔5s執行一次的Job

現在時間: 2019-12-05 14:13:26

我是協程

我是每隔5s執行一次,存放在jobstore second, executor = second

job執行job:

code => 4096

job.id => interval_func_test_2

jobstore=>second

每分鐘30s時執行一次

現在時間: 2019-12-05 14:13:30

我是協程

我是 每分鐘 30s 時執行一次,存放在jobstore default, executor default

job執行job:

code => 4096

job.id => cron_func_test

jobstore=>default

總結

apscheduler的工作原理及用法基本這樣。

apscheduler強大的地方是可以集成到tornado,django,flask等框架,也可以單獨運行。比如CronTrigger還有更強大的用法,可以參照官網的cron用法

上面例子只列舉了一些常規用法,其實還有一些更切合實際的用法,利用APSchedulder的特性,動態的添加Job,暫停Job,刪除Job,重啟Job等。先按照功能性質定義好不同的函數,然后開發一個web服務。在web服務中動態操作各種Job,可以想象在監控系統中根據需求添加一些任務,豈不美哉。

有時間將這部分做一個例子再來分享,大家可以期待一下下次的Python學習教程,本期Python教程有不清楚的地方可以留言哈!

免責聲明:本文僅代表文章作者的個人觀點,與本站無關。其原創性、真實性以及文中陳述文字和內容未經本站證實,對本文以及其中全部或者部分內容文字的真實性、完整性和原創性本站不作任何保證或承諾,請讀者僅作參考,并自行核實相關內容。

http://www.uswqb.club/style/images/nopic.gif
分享
評論
首頁
高速公路之王电子游艺
北京pk10精准计 哪些股票有投资价值 广西十一选五开奖结果走势图百度乐彩 华天科技股票 股票配资平台哪个好多少钱 山西省快乐十分走势 55125中国彩吧 世界杯比分文字直播 行情炒股 最赚钱的网站 欢乐游戏棋牌官网 三分pk10是哪里的 大赢家比分足球比分既时比分直播 甘肃11选5遗漏一定牛 互联网赚钱商机 熊猫娱乐官方下载