import threading
import time
import subprocess
def ping(ip):
# subprocess.popen() 建立子程序
# ping -c 2 指定ping的次數 count為2
# 指定bash -c是因為linux中可能有很多shell,bash -c 是為了保證命令使用 bash shell 來執行。c for choose
# check = subprocess.popen(["/bin/bash","-c","ping -c 2 127.0.0.1"],stdin=subprocess.pipe,stdout=subprocess.pipe)
#也可以直接指定shell=true表示使用作業系統預設的shell來執行命令
#將子程序的標準輸入和標準輸出都傳送給subprocess.pipe
#返回乙個代表子程序的物件
p = subprocess.popen("ping -c 2 "+ip,shell=true,stdin=subprocess.pipe,stdout=subprocess.pipe)
#讀取子程序的標準輸出
data = p.stdout.read()
# print(data.decode('utf-8'))
if "ttl" in data.decode("utf-8"):
print(ip+":\tup")
else:
print(ip+":\tdead")
def main():
for i in range(1,255):
ip = "101.200.142."+str(i)
worker = threading.thread(target=ping,args = (ip,))
worker.start()
time.sleep(1000)
if __name__ == '__main__':
main()
改進:
import threading
import time
import subprocess
def ping(ip):
p = subprocess.popen("ping -c 2 "+ip,shell=true,stdin=subprocess.pipe,stdout=subprocess.pipe)
data = p.stdout.read()
if "ttl" in data.decode("utf-8"):
print(ip+":\tup")
else:
print(ip+":\tdead")
def main():
workers =
for i in range(1,255):
ip = "192.168.0."+str(i)
w = threading.thread(target=ping,args = (ip,))
w.start()
for i in range(1,255):
workers[i-1].join() #檢查對應執行緒是否完成工作,如果沒有則阻塞
if __name__ == '__main__':
main()
改進:
'''
author: your name
date: 2021-01-13 21:24:05
lastedittime: 2021-01-14 23:09:14
lasteditors: please set lasteditors
description: in user settings edit
filepath: /pythonlearning/hack/channel.py
'''import threading
import queue
import subprocess
class worker(threading.thread):
def __init__(self, queue, id):
threading.thread.__init__(self)
self._queue = queue # channel
self._id = id # 執行緒編號
def run(self):
while not self._queue.empty():
ip = self._queue.get()
print("worker %d get %s"%(self._id,ip))
self._ping(ip)
def _ping(self,ip):
p = subprocess.popen("ping -c 2 "+ip, shell=true,
stdin=subprocess.pipe, stdout=subprocess.pipe)
data = p.stdout.read()
if "ttl" in data.decode("utf-8"):
print("worker %d find %s is up"%(self._id,ip))
else:
print("worker %d find %s is down"%(self._id,ip))
def main():
threads =
threads_count = 255
q = queue.queue() # channel
# 將所有ip位址都塞入管道中
for i in range(1, 255):
q.put("192.168.0."+str(i))
# 將管道共享給每個thread (記憶體位址) 多個執行緒搶同乙個管道中的東西
for i in range(threads_count):
# 啟動所有的執行緒
for t in threads:
t.start()
# 檢查每個執行緒的工作是否完成
for t in threads:
t.join()
if __name__ == '__main__':
main()
Python實現的乙個簡單LRU cache
起因 我的同事需要乙個固定大小的cache,如果記錄在cache中,直接從cache中讀取,否則從資料庫中讀取。python的dict 是乙個非常簡單的cache,但是由於資料量很大,記憶體很可能增長的過大,因此需要限定記錄數,並用lru演算法丟棄舊記錄。key 是整型,value是10kb左右的p...
C 實現乙個簡單的unordered map
無序關聯容器的底層是乙個鏈式雜湊表,包括下列四種 無序關聯容器 特點unordered set 無序集合,只儲存key且不允許重複 unordered multiset 無序多重集合,只儲存key且允許重複 unordered map 無序對映表,儲存key value且不允許重複 unordere...
Python 實現乙個簡單的多執行緒
import threading def main str print str def create thread num,args threads for i in range num try t threading.thread target main,args args t.start exc...