我使用tornado做了乙個實時日誌檢視的系統,遠端日誌檢視的部分使用了redis的訂閱和發布功能,
基本功能ok,不過,使用過程中出現了幾次too many open files…的錯誤,下面就是針對這一現象所做的總結。
剛開始不太清楚具體的原因,以為是檢視日誌的人比較多,存在大量連線導致的。重啟了實時日誌服務後恢復正常了。而且由於系統預設的程序開啟檔案上限是1024,在大併發場景下顯然是不夠的,因此就改了
這個上限值為10萬,沒有繼續深究。
後來,我做了個程序監控的工具,能在頁面中展示伺服器各個程序的詳細資訊,包括開啟檔案總數和具體的連線資訊。我發現日誌服務中存在大量的redis連線,而客戶端的連線數並不多,這顯然是不合理的。首先可以肯定的是我使用redis的方式有問題,於是我認真讀了一下redis模組的**,對發布訂閱部分的實現限了解清楚。python**中的訂閱方式是這樣的:
r = redis.strictredis(host=settings.redis_host, port=settings.redis_port,
password=settings.redis_passwd, db=5)
pubsub = r.pubsub(ignore_subscribe_messages=true)
# pubsub.subscribe(*args, **kwargs)
其中,乙個pubsub物件會維護乙個連線池,由於tornado是單執行緒的,因此,這個連線池只會有乙個連線我在**中是每乙個請求就會例項化一次redis獲取pubsub物件,因此,每一次請求都會新建立連線,
而之前的請求建立的連線在請求結束後就不再使用,由於**中沒有正確釋放這些連線,導致連線數逐漸增多。事實上,因為tornado是單執行緒的,因此,整個伺服器程序只需要維護乙個redis連線就可以了,多餘的連線都沒有用處。所以,不需要在每次請求時去例項化redis,只需要在服務啟動時例項化乙個連線就夠了,具體的做法就是上面的**寫到乙個函式裡,用乙個全域性變數儲存呼叫結果就行了,這樣,不管來多少請求,伺服器端始終只有乙個redis客戶端(redis會在斷開後自動重連,因此不用專門在每一次檢測這個唯一的連線的有效性)。原來的方式是每乙個請求對應乙個redis客戶端,所以,每乙個客戶端只需要訂閱乙個頻道就行了,頻道由伺服器和日誌路徑唯一確定。現在的話,這個唯一的redis客戶端就需要訂閱多個頻道了,之前的頻道確定方法就不可行了。因為同乙個日誌會被多個使用者檢視,頻道中的資訊被某乙個使用者讀取之後其他的使用者就
讀不到了。那麼就可以再加上使用者身份(remote_ip, remote_port)來確定唯一頻道。不過問題又來了,原來多個使用者同時讀取乙個日誌檔案時,只需要乙個程序執行日誌的實時讀取和傳送。現在的話,頻道變了,即使是乙個日誌檔案的檢視也需要多個程序來執行了,乙個頻道對應乙個程序。這樣既浪費程序資源,也很浪費網路資源。是否有更好的解決方案呢?
有的,可以借鑑redis中訂閱和發布的實現方式:
訂閱與發布
redis-server中維護乙個pubsub_channels 字典。key為頻道,value為訂閱這個頻道的客戶端列表。當有訊息傳送到頻道上時,這條訊息會被傳送到頻道對應的所有客戶端。由於只有乙個redis客戶端,但是使用者卻有很多,因此,可以維護乙個類似pubsub_channels的資料結構實現相同的功能。方法如下:還是根據伺服器和日誌確定唯一頻道,如果有多個使用者同時檢視同乙個日誌,以channel為key,value為乙個字典,儲存所有的檢視這個日誌的使用者。當收到頻道中的訊息時,根據這個channel找到所有的使用者連線,發資訊給這些使用者,實現**如下:
from collections import defaultdict
from logs.utility import get_last_lines, get_pubsub
class
subwebsocket
(tornado.websocket.websockethandler):
# 維護channel和使用者請求的資料結構
client_map = defaultdict(dict)
defopen
(self, *args, **kwargs):
print("opened")
defassemble_cmd
(self, log_path, cmd):
kill_cmd = "kill `ps aux|grep logtail.py|grep %s|grep -v grep|awk ''`" % (log_path,)
return
";".format(kill=kill_cmd, tail=cmd)
@gen.coroutine
defon_message
(self, message):
hostname, log_path, cmd = message.split("||")
# 確保只有乙個程序在實時讀取及發布日誌
cmd = self.assemble_cmd(log_path, cmd)
local = salt.client.localclient()
channel = settings.log_key.format(
server=hostname.strip(), log_path=log_path.strip())
self.register(channel)
if channel not
in pubsub.channels: # 避免重複訂閱
pubsub.subscribe(**)
local.cmd_async(hostname, "cmd.run", [cmd])
while
true:
ifnot pubsub.connection.can_read(timeout=0):
yield gen.sleep(0.05)
else:
pubsub.get_message()
@classmethod
defchannel_callback
(cls, message):
for client, callback in subwebsocket.client_map[message["channel"]].iteritems():
callback(message)
defclient_callback
(self, message):
line = format_line(message["data"])
try:
self.write_message(line)
except tornado.websocket.websocketclosederror:
self.unregister(message["channel"])
defregister
(self, channel):
client = self.request.connection.context.address
subwebsocket.client_map[channel][client] = self.client_callback
defunregister
(self, channel):
client = self.request.connection.context.address
subwebsocket.client_map[channel].pop(client, none)
ifnot subwebsocket.client_map[channel]:
subwebsocket.client_map.pop(channel, none)
pubsub.unsubscribe(channel)
defon_close
(self):
print("closed")
目前來看,這種方式是最優的方式了,使用最少的連線,最少的頻道,最少的程序以及最少的網路資源消耗。
完整的**參見:
github**
快速排序演算法實現(遞迴實現 棧實現)
基本思想 選擇乙個基準元素,比如選擇最後乙個元素,通過一趟掃瞄,將待排序列分成兩部分,一部分比基準元素小,一部分大於等於基準元素,此時基準元素在其排好序後的正確位置,又稱為軸位置,此位置的元素確定後不再參與排序,然後再用同樣的方法遞迴地排序劃分的兩部分。分析 快速排序是不穩定的排序。快速排序的時間複...
介面實現與配置實現
在實現系統功能的時候,通常會首先定義好功能的介面,在系統功能不斷被實現的過程中,慢慢的發現有些介面的實現很類似,這個時候通常會開始做一次抽象,形 成乙個共同的部分,慢慢的系統形成了乙個抽象的層次,而為了通用,通常是通過定義介面,形成乙個抽象類,抽象類中暴露出一些抽象方法供外部擴充套件實 現,逐步的積...
js分頁實現,前端實現。
主要是借鑑了網上乙個例子,自己重新加了樣式,新增了跳轉,修改了一些小地方,用於和大家一起分享,前端分頁的技巧,的資料是我已經寫好了,其實大家也可以前端渲染 然後再分頁,都是可以的。其實分頁最關鍵是這兩句 var startrow currentpage 1 pagesize 1 currentpag...