最近學習了下,celery原始碼,看了一點點皮毛後,自己動手寫了個簡易的celery,通過redis作為broker,沒有複雜的路由匹配規則,佇列和任務之間乙個直接匹配的簡易規則。這裡對專案簡單的記錄下。
****** 是celery類所在位置,具體實現了celery的啟動,載入配置檔案,任務裝飾器;
utils 下base是任務類的實現,任務的發布方法,任務繫結celery例項方法;
redisbase 是連線redis資料的類,主要是往佇列裡插入資料;
example 是我自己寫的用例
# -*- coding: utf-8 -*-
'''******celery @2019 03 26
author xuxiaolong
redis 作為broker
redis 作為結果返回result_backend
'''from time import sleep
from utils.base import basetask
import json
import multiprocessing
from importlib import import_module
from utils.redisbase import redishelper
class celery(object):
def __init__(self, name):
self.name = name
self.queuedic = dict()
self._task = dict()
def start(self):
'''啟動方法
從redis list 中獲取message ,並找到對應的任務例項去執行,通過呼叫task.runtask()方法執行
'''#_redis = redishelper(self.host,self.port,self.db,self.password)
#為每乙個佇列開啟乙個程序,執行對應的任務
queue = set([v["queue"] for v in self.taskdic.values()])
# pool = multiprocessing.pool(processes=len(queue))
def runloop(queue):
_redis = redishelper(host=self.host, port=self.port, db=self.db, password=self.password)
while true:
retjson = _redis.lpop(queue)
print retjson
if retjson is none:
sleep(5) #加了個休眠,避免terminal 刷的太快
continue
message = json.loads(retjson)
#print message
#print self.taskdic
for fun in self.queuedic[queue]:
#print message["name"] == fun.split(".")[-1:][0]
if message["name"] == fun.split(".")[-1:][0]:
print self._task[message["name"].encode('utf-8')](*message["args"],**message["kwargs"])
for q in queue:
runloop(q)
#pool.close()
#pool.join()
def config_from_object(self, include=none):
'''獲取基本的配置資訊'''
self.config = import_module(include)
# redis_url 配置redis位址資訊
redis,hostpass, portdb = self.config.redis_url.split(':')
self.password, self.host = hostpass.split('@')
self.port, self.db = portdb.split('/')
# celery_route配置路由資訊
self.taskdic = self.config.celery_route
for k,v in self.taskdic.items():
if v["queue"] not in self.queuedic.keys():
self.queuedic[v["queue"]] = [k]
else:
#任務裝飾器
def task(self,*args,**kwargs):
#print args,kwargs
def create_inner_task():
def create_tasks(func):
ret = self.create_task_fromfun(func)
return ret
return create_tasks
return create_inner_task()
#通過函式建立任務物件
def create_task_fromfun(self,func):
tasks = type(func.__name__, (basetask,), dict())()
#print isinstance(tasks,basetask)
if tasks.name not in self._task:
self._task[tasks.name] = tasks.run
tasks.bind(self)
return tasks
這裡task裝飾器的,裝飾的是任務函式,原理是,將任務函式,轉換成乙個任務物件的例項,也就是basetask,並把方法體,賦給basetask的run方法,利用的是type元類的來實現的,
tasks = type(func.name, (basetask,), dict())()
這裡有個小地方需要注意下,任務函式需要轉成非繫結的方法,也就是通過staticmethod方法,裝成靜態方法;
start 方法,可以理解成監聽函式,是乙個無限迴圈,不斷的從配置的佇列中讀取資料,然後,交給tasks去執行,也就是呼叫我們上面說的,run方法
# -*- coding: utf-8 -*-
import json
from redisbase import redishelper
class basetask(object):
'''任務類的基類,所有任務的拓展都繼承此類
'''def runtask(self,*args,**kwargs):
'''任務執行的方法
'''messagedic = dict()
messagedic["name"] = self.name
messagedic["args"] = args
messagedic["kwargs"] = kwargs
print self.run(*args,**kwargs)
if k.split(".")[-1:][0] == self.name:
print _redis.lpush(v["queue"],json.dumps(messagedic))
''''''
runtask 是我們的任務的發布函式,通過在客戶端呼叫,往指定佇列插入資料,服務端讀取執行
redisbase 就不介紹了,沒啥可說的,就是運算元據庫的
exampe 是示例 :
celery.py檔案**:
from ****** import celery
# 載入配置
def add(x,y):
#print(x+y)
return x+y
if __name__ == '__main__':
這裡例項化我們的celery,並且實現具體的任務函式,記得一定要加上task裝飾器,和celery一樣 的,
celery.py 是在服務端執行的,它會去具體執行任務;
config.py
是配置檔案,安裝這個規則寫就行,示例如下:
redis_url=「redis//?****:8004/0」
celery_route=}
run.py:
from example.celery import add
if __name__ == '__main__':
print(add.runtask(1,2))
這是在客戶端執行的,具體的任務發布,通過runtask來發布 簡易版redux實現
redux其實只有幾個重要的api,getstate,subscribe和dispatch,getstate用來獲取狀態,subscribe監聽狀態的改變,dispatch派發事件改變狀態,下面就來看下。首先是createstore,它接收三個引數,分別是reducer函式,初始狀態值,還有就是中介...
簡易版的Tween
與之前的tween 類似,只是這個為簡潔版 動畫處理器 緩動效果 param obj dom物件 param prop 要改變的樣式屬性,如left 填opacity時,1表示不透明,0表示完全透明 param v1 初始值 不帶px param v2 終止值 不帶px param opt obje...
用C實現簡易版掃雷
用兩個盤實現該遊戲 乙個是雷盤,乙個是展示盤 就是玩遊戲的盤 該 可以實現以下幾個功能 1.列印雷盤和展示盤。隨機產生雷的位置 2.保證第一次掃雷不會被炸死。3.點一下可以展開一片。4.判斷是否贏。注意 要注意兩個盤的座標和下標。還有,呼叫函式和傳參。test.c include include i...