进程与线程
今日内容:
进程相关:
1.守护进程
2.互斥锁
3.进程间通信IPC:管道、进程queue
4.生产者消费者模型(******)
线程相关:
1.线程VS进程(******)
2.开启线程的两种方式(******)
3.线程对象的相关方法或属性(******)
4.守护线程
5.线程互斥锁
6.GIL全局解释器锁(******)
7.死锁与递归锁
8.信号量
9.事件
10.线程queue
11.进程池与线程池(******)
同步
异步
阻塞
非阻塞
单线程下实现并发
1.协程
'''
1.什么是守护进程
守护进程本质是一个“子进程”,守护进程会在主进程运行完毕(指的是主进程代码运行完毕)的情况下跟着一起结束
特点:
1.是一个“子进程”
2.守护进程内的任务在父进程完毕的情况下直接死掉
2.为何要用守护进程
3.如何用
'''
例1:
from multiprocessing import Process
import os,time
def task():
print('%s is running' %os.getppid())
time.sleep(2)
print('%s is done' %os.getpid())
if __name__ == '__main__':
p=Process(target=task)
p.daemon=True
p.start()
print('主')
例2:
#主进程代码运行完毕,守护进程就会结束
from multiprocessing import Process
from threading import Thread
import time
def foo():
print(123)
time.sleep(1)
print("end123")
def bar():
print(456)
time.sleep(1)
print("end456")
if __name__ == '__main__':
p1=Process(target=foo)
p2=Process(target=bar)
p1.daemon=True
p1.start()
p2.start()
print("main----")
执行结果:
二、互斥锁
#互斥锁的原理:就是将多任务的对共享数据修改操作有并发变成“串行”,牺牲了效率保证数据安全
from multiprocessing import Process,Lock
import json
import time,random
def search(i):
with open('db.json','rt',encoding='utf-8')as f:
dic=json.load(f)
time.sleep(1)
print('路人%s查看到剩余票数:%s' %(i,dic['count']))
def get(i):
with open('db.json','rt',encoding='utf-8')as f:
dic=json.load(f)
if dic['count']>0:
#有票
dic['count']-=1
time.sleep(random.randint(1,3))
with open('db.json','wt',encoding='utf-8')as f:
json.dump(dic,f)
print('路人%s抢票成功' %i)
else:
print('路人%s抢票失败' %i)
def task(i,mutex):
search(i)
mutex.acquire()
get(i)
mutex.release()
if __name__ == '__main__':
mutex=Lock()
for i in range(1,11):
p=Process(target=task,args=(i,mutex))
p.start()
# p.join()
print('主')
db.json
{"count": 0}
执行结果:
#IPC机制:管道,队列
from multiprocessing import Queue
q=Queue(3)
q.put('first')
q.put({'count':2})
q.put([3,])
# q.put([4,])
print(q.get())
print(q.get())
print(q.get())
# print(q.get())
了解:
q=Queue(3)
q.put(1,block=True,timeout=3)
q.put(2,block=True,timeout=3)
q.put(3,block=True,timeout=3)
# q.put(4,block=True,timeout=3)
# q.put_nowait(1) #q.put(1,block=False)
# q.put_nowait(2) #q.put(2,block=False)
# q.put_nowait(3) #q.put(3,block=False)
# q.put_nowait(4) #q.put(4,block=False)
# q.put(1)
# q.put(2)
# q.put(3)
print(q.get_nowait())
print(q.get_nowait())
print(q.get_nowait())
执行结果:
四、生产者消费者模型
'''
1.什么是生产者消费者模型
生产者:代指生产数据的任务
消费者:代指处理数据的任务
模型:
生产者生产数据然后交给消费者去处理
特点:
1.实现生产者与消费者解耦合
2.平衡了生产者的生产能力和消费者的消费能力
2.为何要用
当程序中存在明显的两类任务:一类负责造数据,另外一类负责处理数据,就可以用生产者消费者模型实现
解耦合和从而提高效率
3.如何实现
实现方式一:
生产者--->queue<---消费者
'''
例1:
from multiprocessing import Process,Queue
import time,random
def producer(food,name,q):
for i in range(3):
time.sleep(random.randint(1,3))
res='%s%s' %(food,i)
q.put(res)
print('厨师%s生产了%s' %(name,res))
q.put(None)
def consumer(name,q):
while True:
res=q.get()
if res is None:break
time.sleep(random.randint(1,3))
print('吃货%s吃了%s' %(name,res))
if __name__ == '__main__':
q=Queue()
#生产者
p1=Process(target=producer,args=('包子','frank',q))
#消费者
c1=Process(target=consumer,args=('sunny',q))
p1.start()
c1.start()
print('主')
执行结果:
例2:
from multiprocessing import Process,Queue
import time,random
def producer(food,name,q):
for i in range(3):
time.sleep(random.randint(1,3))
res='%s%s' %(food,i)
q.put(res)
print('厨师%s生产了%s' %(name,res))
def consumer(name,q):
while True:
res=q.get()
if res is None:break
time.sleep(random.randint(1,3))
print('吃货%s吃了%s' %(name,res))
if __name__ == '__main__':
q=Queue()
#生产者
p1=Process(target=producer,args=('包子','frank1',q))
p2 = Process(target=producer, args=('糖', 'frank2', q))
p3 = Process(target=producer, args=('花生', 'frank3', q))
#消费者
c1=Process(target=consumer,args=('sunny1',q))
c2 = Process(target=consumer, args=('sunny2', q))
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p1.join()
p2.join()
p3.join()
q.put(None)
q.put(None)
print('主')
执行结果:
例3:
from multiprocessing import Process,JoinableQueue
import time,random
def producer(food,name,q):
for i in range(3):
time.sleep(random.randint(1,3))
res='%s%s' %(food,i)
q.put(res)
print('厨师%s生产了%s' %(name,res))
def consumer(name,q):
while True:
res=q.get()
if res is None:break
time.sleep(random.randint(1,3))
print('吃货%s吃了%s' %(name,res))
q.task_done()
if __name__ == '__main__':
q=JoinableQueue()
#生产者
p1=Process(target=producer,args=('包子','frank1',q))
p2 = Process(target=producer, args=('糖', 'frank2', q))
p3 = Process(target=producer, args=('花生', 'frank3', q))
#消费者
c1=Process(target=consumer,args=('sunny1',q))
c2 = Process(target=consumer, args=('sunny2', q))
c1.daemon=True
c2.daemon=True
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p1.join()
p2.join()
p3.join()
q.join()#主进程最后一行代码运行完毕,生产者全部正常死亡,消费者也没有存在的意义
执行结果:
注意:JoinableQueue q.task_done() c1.daemon=True c2.daemon=True q.join()
五、线程理论
1.什么是线程
进程其实是一个资源单位,每启动一个进程,改进程内至少有一个线程(主线程,“父线程”)
线程代表的是该进程内代码的过程
2.为何要用线程
线程的两个特点:
1.同一进程下的多个线程共享改进程的资源
2.开启一个线程的开销要远远小于一个进程
3.如何用线程
六、开启线程的两种方式
例1;
from threading import Thread
import time
def task(name):
print('%s is running' %name)
time.sleep(2)
print('%s is done' %name)
if __name__ == '__main__':
t=Thread(target=task,args=('线程1',))
t.start() #几乎是信号发出的同时,线程就开启了,证明线程的创建开销远远小于进程
print('主')#主线程的生命周就是其所在进程的生命周期,进程应该在进程内所有线程运行完毕了才应该结束
执行结果:
例2:
from threading import Thread
import time
class Mythread(Thread):
def run(self):
print('%s is running' %self.name)
time.sleep(2)
print('%s is done' %self.name)
if __name__ == '__main__':
t=Mythread()
t.start()
print('主')
执行结果:
七、线程对象其他相关属性或方法
例1:join
from threading import Thread
import time
def task(name):
print('%s is running' %name)
time.sleep(1)
print('%s is done' %name)
if __name__ == '__main__':
t=Thread(target=task,args=('线程1',))
t.start()
t.join()
print('主')
执行结果:
例2:子线程共享资源
from threading import Thread
from multiprocessing import Process
import time
n=10
def task():
global n
n=0
if __name__ == '__main__':
t=Thread(target=task)
t.start()
t.join()
print('主',n)
执行结果:
例3:子进程资源独立
from threading import Thread
from multiprocessing import Process
import time
n=10
def task():
global n
n=0
if __name__ == '__main__':
t=Process(target=task)
t.start()
t.join()
print('主',n)
执行结果:
例4:current_thread
from threading import Thread,current_thread
import time
def task():
print('%s is running' %current_thread().name)
time.sleep(1)
print('%s is done' %current_thread().name)
if __name__ == '__main__':
t=Thread(target=task,name='线程1')
t.start()
print('主',current_thread().name)
执行结果:
例5:其他,线程存活个数
from threading import Thread,current_thread,active_count
import time
def task():
print('%s is running' %current_thread().name)
time.sleep(1)
print('%s is done' %current_thread().name)
if __name__ == '__main__':
t=Thread(target=task,name='线程1')
t.start()
print('主',current_thread().name)
print(active_count())
执行结果:
八、守护线程
例1:
from threading import Thread
import time
def task(name):
print('%s is runing' %name)
time.sleep(1)
print('%s is done' %name)
if __name__ == '__main__':
t=Thread(target=task,args=('线程1',))
t.daemon=True
t.start()
print('主')
#守护进程会在主进程内那个主线程代码运行完毕后立即销毁
#守护线程会在该进程内所有非守护线程都运行完毕后才结束
例2:守护线程
from threading import Thread
import time
def foo():
print(123)
time.sleep(1)
print("end123")
def bar():
print(456)
time.sleep(1)
print("end456")
if __name__ == '__main__':
t1=Thread(target=foo)
t2=Thread(target=bar)
t1.daemon=True
t1.start()
t2.start()
print('main---------')
九、线程互斥锁
from threading import Thread,Lock
import time
mutex=Lock()
n=100
def task():
# global n
# mutex.acquire()
# temp=n
# time.sleep(0.1)
# n=temp-1
# mutex.release()
# 写法二:
global n
with mutex:
temp=n
time.sleep(0.1)
n=temp-1
if __name__ == '__main__':
l=[]
star=time.time()
for i in range(100):
t=Thread(target=task)
l.append(t)
t.start()
for t in l:
t.join()
print(n,time.time()-star)
执行结果:
十、GIL全局解释器锁
'''
1.什么是GIL
GIL本质就是一把互斥锁,有了GIL的存在会导致同一个进程下的多个线程不能够并行但能够并发
2.为何要有GIL
3.有了GIL以后,应该如何使用多进程,多线程
'''
例1:计算密集型:多进程效率高
from multiprocessing import Process
from threading import Thread
import os,time
def work():
res=0
for i in range(10000):
res*=i
if __name__ == '__main__':
l=[]
print(os.cpu_count())
start=time.time()
for i in range(os.cpu_count()):
p=Thread(target=work)
# print(p)
l.append(p)
p.start()
# print(l)
for p in l:
p.join()
stop=time.time()
print('run time is %s' %(stop -start))
例二、I/O密集型:多线程高效率
from multiprocessing import Process
from threading import Thread
import os,time
def work():
time.sleep(2)
if __name__ == '__main__':
l=[]
print(os.cpu_count())
start=time.time()
for i in range(400):
p=Process(target=work)
l.append(p)
print(l)
p.start()
for p in l:
p.join()
stop=time.time()
print('run time is %s' %(stop-start))
例四:
from threading import Thread,Lock
import time
mutex=Lock()
n=100
def task():
global n
with mutex:
temp=n
time.sleep(0.1)
n=temp-1
if __name__ == '__main__':
for i in range(3):
t=Thread(target=task)
t.start()
十一、死锁现象与递归锁
from threading import Thread,Lock,RLock
import time
# mutexA=Lock()
# mutexB=Lock() #这种操作会出现死锁现象
mutexB=mutexA=RLock()
class Mythread(Thread):
def run(self):
self.f1()
self.f2()
def f1(self):
mutexA.acquire()
print('%s 抢到了A锁' %self.name)
mutexB.acquire()
print('%s 抢到了B锁' %self.name)
mutexB.release()
mutexA.release()
def f2(self):
mutexB.acquire()
print('%s 抢到了B锁' %self.name)
time.sleep(1)
mutexA.acquire()
print('%s 抢到了A锁' %self.name)
mutexA.release()
mutexB.release()
if __name__ == '__main__':
for i in range(10):
t=Mythread()
t.start()
执行结果:
十二、信号量
import time,random
sm=Semaphore(2)
def task(i):
sm.acquire()
print('学生%s 正在排队吃饭' %i)
time.sleep(random.randint(1,3))
sm.release()
if __name__ == '__main__':
for i in range(6):
t=Thread(target=task,args=(i,))
t.start()
注意:同一个时刻只有2个人买饭
执行结果:
十三、事件
from threading import Thread,Event
import time,random
event=Event()
def light():
print('红灯正在亮着...')
time.sleep(random.randint(1,3))
event.set() #event.clear()
def car(i):
print('车%s 正在等待绿灯...' %i)
event.wait()
print('车%s 正在通行...' %i)
if __name__ == '__main__':
for i in range(3):
t=Thread(target=car,args=(i,))
t.start()
t1=Thread(target=light)
t1.start()
执行结果:
十四、线程queue
import queue
q=queue.Queue(3) #队列:先进先出
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())
执行结果:
q=queue.LifoQueue() #堆栈:后进先出
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())
执行结果:
q=queue.PriorityQueue(3) #优先级队列,数字越小优先级越高
q.put((10,'first'))
q.put((-1,'second'))
q.put((11,'third'))
print(q.get())
print(q.get())
print(q.get())
执行结果: