Frank的学习之路

Day_11_总结_进程与线程

进程与线程

今日内容:

    进程相关:

        1.守护进程

        2.互斥锁

        3.进程间通信IPC:管道、进程queue

        4.生产者消费者模型(******

       

    线程相关:

        1.线程VS进程(******

        2.开启线程的两种方式(******

        3.线程对象的相关方法或属性(******

        4.守护线程

        5.线程互斥锁

        6.GIL全局解释器锁(******

        7.死锁与递归锁

        8.信号量

        9.事件

        10.线程queue

       

        11.进程池与线程池(******

            同步

            异步

            阻塞

            非阻塞

         

    单线程下实现并发

        1.协程

'''

1.什么是守护进程

    守护进程本质是一个“子进程”,守护进程会在主进程运行完毕(指的是主进程代码运行完毕)的情况下跟着一起结束

    特点:

        1.是一个“子进程”

        2.守护进程内的任务在父进程完毕的情况下直接死掉

2.为何要用守护进程

3.如何用

'''

1

from multiprocessing import Process

import os,time

def task():

    print('%s is running' %os.getppid())

    time.sleep(2)

    print('%s is done' %os.getpid())

if __name__ == '__main__':

    p=Process(target=task)

    p.daemon=True

    p.start()

print('')

2

#主进程代码运行完毕,守护进程就会结束

from multiprocessing import Process

from threading import Thread

import time

def foo():

    print(123)

    time.sleep(1)

    print("end123")

def bar():

    print(456)

    time.sleep(1)

    print("end456")

if __name__ == '__main__':

    p1=Process(target=foo)

    p2=Process(target=bar)

    p1.daemon=True

    p1.start()

    p2.start()

print("main----")

执行结果:


二、互斥锁

#互斥锁的原理:就是将多任务的对共享数据修改操作有并发变成“串行”,牺牲了效率保证数据安全

from multiprocessing import Process,Lock

import json

import time,random

def search(i):

    with open('db.json','rt',encoding='utf-8')as f:

        dic=json.load(f)

        time.sleep(1)

        print('路人%s查看到剩余票数:%s' %(i,dic['count']))

def get(i):

    with open('db.json','rt',encoding='utf-8')as f:

        dic=json.load(f)

    if dic['count']>0:

        #有票

        dic['count']-=1

        time.sleep(random.randint(1,3))

        with open('db.json','wt',encoding='utf-8')as f:

            json.dump(dic,f)

            print('路人%s抢票成功' %i)

    else:

        print('路人%s抢票失败' %i)

def task(i,mutex):

    search(i)

    mutex.acquire()

    get(i)

    mutex.release()

if __name__ == '__main__':

    mutex=Lock()

    for i in range(1,11):

        p=Process(target=task,args=(i,mutex))

        p.start()

        # p.join()

print('')

db.json

{"count": 0}

执行结果:

#IPC机制:管道,队列

from multiprocessing import Queue

q=Queue(3)

q.put('first')

q.put({'count':2})

q.put([3,])

# q.put([4,])

print(q.get())

print(q.get())

print(q.get())

# print(q.get())

了解:

q=Queue(3)

q.put(1,block=True,timeout=3)

q.put(2,block=True,timeout=3)

q.put(3,block=True,timeout=3)

# q.put(4,block=True,timeout=3)

# q.put_nowait(1) #q.put(1,block=False)

# q.put_nowait(2) #q.put(2,block=False)

# q.put_nowait(3) #q.put(3,block=False)

# q.put_nowait(4) #q.put(4,block=False)

# q.put(1)

# q.put(2)

# q.put(3)

print(q.get_nowait())

print(q.get_nowait())

print(q.get_nowait())

执行结果:

四、生产者消费者模型

'''

1.什么是生产者消费者模型

    生产者:代指生产数据的任务

    消费者:代指处理数据的任务

    模型:

        生产者生产数据然后交给消费者去处理

       

    特点:

        1.实现生产者与消费者解耦合

        2.平衡了生产者的生产能力和消费者的消费能力

       

2.为何要用

    当程序中存在明显的两类任务:一类负责造数据,另外一类负责处理数据,就可以用生产者消费者模型实现

    解耦合和从而提高效率

    

3.如何实现

    实现方式一:

        生产者--->queue<---消费者

'''

1

from multiprocessing import Process,Queue

import time,random

def producer(food,name,q):

    for i in range(3):

        time.sleep(random.randint(1,3))

        res='%s%s' %(food,i)

        q.put(res)

        print('厨师%s生产了%s' %(name,res))

    q.put(None)

def consumer(name,q):

    while True:

        res=q.get()

        if res is None:break

        time.sleep(random.randint(1,3))

        print('吃货%s吃了%s' %(name,res))

if __name__ == '__main__':

    q=Queue()

    #生产者

    p1=Process(target=producer,args=('包子','frank',q))

    #消费者

    c1=Process(target=consumer,args=('sunny',q))

    p1.start()

    c1.start()

print('')

执行结果:

2

from multiprocessing import Process,Queue

import time,random

def producer(food,name,q):

    for i in range(3):

        time.sleep(random.randint(1,3))

        res='%s%s' %(food,i)

        q.put(res)

        print('厨师%s生产了%s' %(name,res))

def consumer(name,q):

    while True:

        res=q.get()

        if res is None:break

        time.sleep(random.randint(1,3))

        print('吃货%s吃了%s' %(name,res))

if __name__ == '__main__':

    q=Queue()

    #生产者

    p1=Process(target=producer,args=('包子','frank1',q))

    p2 = Process(target=producer, args=('', 'frank2', q))

    p3 = Process(target=producer, args=('花生', 'frank3', q))

    #消费者

    c1=Process(target=consumer,args=('sunny1',q))

    c2 = Process(target=consumer, args=('sunny2', q))

    p1.start()

    p2.start()

    p3.start()

    c1.start()

    c2.start()

    p1.join()

    p2.join()

    p3.join()

    q.put(None)

    q.put(None)

    print('')

执行结果:

3

from multiprocessing import Process,JoinableQueue

import time,random

def producer(food,name,q):

    for i in range(3):

        time.sleep(random.randint(1,3))

        res='%s%s' %(food,i)

        q.put(res)

        print('厨师%s生产了%s' %(name,res))

def consumer(name,q):

    while True:

        res=q.get()

        if res is None:break

        time.sleep(random.randint(1,3))

        print('吃货%s吃了%s' %(name,res))

        q.task_done()

       

if __name__ == '__main__':

    q=JoinableQueue()

    #生产者

    p1=Process(target=producer,args=('包子','frank1',q))

    p2 = Process(target=producer, args=('', 'frank2', q))

    p3 = Process(target=producer, args=('花生', 'frank3', q))

    #消费者

    c1=Process(target=consumer,args=('sunny1',q))

    c2 = Process(target=consumer, args=('sunny2', q))

    c1.daemon=True

    c2.daemon=True

    p1.start()

    p2.start()

    p3.start()

    c1.start()

    c2.start()

    p1.join()

    p2.join()

    p3.join()

    q.join()#主进程最后一行代码运行完毕,生产者全部正常死亡,消费者也没有存在的意义

执行结果:

注意:JoinableQueue  q.task_done()   c1.daemon=True   c2.daemon=True  q.join()

五、线程理论

1.什么是线程

         进程其实是一个资源单位,每启动一个进程,改进程内至少有一个线程(主线程,“父线程”)

         线程代表的是该进程内代码的过程

2.为何要用线程

         线程的两个特点:

1.同一进程下的多个线程共享改进程的资源

2.开启一个线程的开销要远远小于一个进程

3.如何用线程

六、开启线程的两种方式

1;

from threading import Thread

import time

def task(name):

    print('%s is running' %name)

    time.sleep(2)

    print('%s is done' %name)

if __name__ == '__main__':

    t=Thread(target=task,args=('线程1',))

    t.start() #几乎是信号发出的同时,线程就开启了,证明线程的创建开销远远小于进程

    print('')#主线程的生命周就是其所在进程的生命周期,进程应该在进程内所有线程运行完毕了才应该结束

执行结果:

2

from threading import Thread

import time

class Mythread(Thread):

    def run(self):

        print('%s is running' %self.name)

        time.sleep(2)

        print('%s is done' %self.name)

if __name__ == '__main__':

    t=Mythread()

    t.start()

print('')

执行结果:

七、线程对象其他相关属性或方法

1join

from threading import Thread

import time

def task(name):

    print('%s is running' %name)

    time.sleep(1)

    print('%s is done' %name)

if __name__ == '__main__':

    t=Thread(target=task,args=('线程1',))

    t.start()

    t.join()

print('')

执行结果:

2:子线程共享资源

from threading import  Thread

from  multiprocessing import  Process

import  time

n=10

def task():

    global n

    n=0

if __name__ == '__main__':

    t=Thread(target=task)

    t.start()

    t.join()

print('',n)

执行结果:

3:子进程资源独立

from threading import  Thread

from  multiprocessing import  Process

import  time

n=10

def task():

    global n

    n=0

if __name__ == '__main__':

    t=Process(target=task)

    t.start()

    t.join()

print('',n)

执行结果:

4current_thread

from  threading import Thread,current_thread

import time

def task():

    print('%s is running' %current_thread().name)

    time.sleep(1)

    print('%s is done' %current_thread().name)

   

if __name__ == '__main__':

    t=Thread(target=task,name='线程1')

    t.start()

print('',current_thread().name)

执行结果:

5:其他,线程存活个数

from threading import  Thread,current_thread,active_count

import time

def task():

    print('%s is running' %current_thread().name)

    time.sleep(1)

    print('%s is done' %current_thread().name)

if __name__ == '__main__':

    t=Thread(target=task,name='线程1')

    t.start()

    print('',current_thread().name)

print(active_count())

执行结果:

八、守护线程

1

from threading import  Thread

import time

def task(name):

    print('%s is runing' %name)

    time.sleep(1)

    print('%s is done' %name)

if __name__ == '__main__':

    t=Thread(target=task,args=('线程1',))

    t.daemon=True

    t.start()

    print('')

#守护进程会在主进程内那个主线程代码运行完毕后立即销毁

#守护线程会在该进程内所有非守护线程都运行完毕后才结束

2:守护线程

from threading import Thread

import time

def foo():

    print(123)

    time.sleep(1)

    print("end123")

def bar():

    print(456)

    time.sleep(1)

    print("end456")

if __name__ == '__main__':

    t1=Thread(target=foo)

    t2=Thread(target=bar)

    t1.daemon=True

    t1.start()

    t2.start()

print('main---------')

九、线程互斥锁

from threading import  Thread,Lock

import time

mutex=Lock()

n=100

def task():

    # global n

    # mutex.acquire()

    # temp=n

    # time.sleep(0.1)

    # n=temp-1

    # mutex.release()

    # 写法二:

    global n

    with mutex:

        temp=n

        time.sleep(0.1)

        n=temp-1

if __name__ == '__main__':

    l=[]

    star=time.time()

    for i in range(100):

        t=Thread(target=task)

        l.append(t)

        t.start()

    for t in l:

        t.join()

print(n,time.time()-star)

执行结果:

十、GIL全局解释器锁

'''

1.什么是GIL

    GIL本质就是一把互斥锁,有了GIL的存在会导致同一个进程下的多个线程不能够并行但能够并发

2.为何要有GIL

3.有了GIL以后,应该如何使用多进程,多线程

'''

1:计算密集型:多进程效率高

from multiprocessing import Process

from threading import Thread

import os,time

def work():

    res=0

    for i in range(10000):

        res*=i

if __name__ == '__main__':

    l=[]

    print(os.cpu_count())

    start=time.time()

    for i in range(os.cpu_count()):

        p=Thread(target=work)

        # print(p)

        l.append(p)

        p.start()

    # print(l)

    for p in l:

        p.join()

    stop=time.time()

print('run time is %s' %(stop -start))

例二、I/O密集型:多线程高效率

from multiprocessing import Process

from threading import Thread

import os,time

def work():

    time.sleep(2)

if __name__ == '__main__':

    l=[]

    print(os.cpu_count())

    start=time.time()

    for i in range(400):

        p=Process(target=work)

        l.append(p)

        print(l)

        p.start()

    for p in l:

        p.join()

    stop=time.time()

print('run time is %s' %(stop-start))

例四:

from threading import  Thread,Lock

import time

mutex=Lock()

n=100

def task():

    global n

    with mutex:

        temp=n

        time.sleep(0.1)

        n=temp-1

if __name__ == '__main__':

    for i in range(3):

        t=Thread(target=task)

        t.start()

十一、死锁现象与递归锁

from threading import Thread,Lock,RLock

import time

# mutexA=Lock()

# mutexB=Lock()  #这种操作会出现死锁现象

mutexB=mutexA=RLock()

class Mythread(Thread):

    def run(self):

        self.f1()

        self.f2()

    def f1(self):

        mutexA.acquire()

        print('%s 抢到了A' %self.name)

        mutexB.acquire()

        print('%s 抢到了B' %self.name)

        mutexB.release()

        mutexA.release()

    def f2(self):

        mutexB.acquire()

        print('%s 抢到了B' %self.name)

        time.sleep(1)

        mutexA.acquire()

        print('%s 抢到了A' %self.name)

        mutexA.release()

        mutexB.release()

if __name__ == '__main__':

    for i in range(10):

        t=Mythread()

        t.start()

执行结果:

十二、信号量

import time,random

sm=Semaphore(2)

def task(i):

    sm.acquire()

    print('学生%s 正在排队吃饭' %i)

    time.sleep(random.randint(1,3))

    sm.release()

if __name__ == '__main__':

    for i in range(6):

        t=Thread(target=task,args=(i,))

        t.start()

注意:同一个时刻只有2个人买饭

执行结果:

十三、事件

from threading import Thread,Event

import time,random

event=Event()

def light():

    print('红灯正在亮着...')

    time.sleep(random.randint(1,3))

    event.set() #event.clear()

def car(i):

    print('%s 正在等待绿灯...' %i)

    event.wait()

    print('%s 正在通行...' %i)

if __name__ == '__main__':

    for i in range(3):

        t=Thread(target=car,args=(i,))

        t.start()

    t1=Thread(target=light)

t1.start()

执行结果:

十四、线程queue

import  queue

q=queue.Queue(3) #队列:先进先出

q.put(1)

q.put(2)

q.put(3)

print(q.get())

print(q.get())

print(q.get())

执行结果:

q=queue.LifoQueue() #堆栈:后进先出

q.put(1)

q.put(2)

q.put(3)

print(q.get())

print(q.get())

print(q.get())

执行结果:

q=queue.PriorityQueue(3) #优先级队列,数字越小优先级越高

q.put((10,'first'))

q.put((-1,'second'))

q.put((11,'third'))

print(q.get())

print(q.get())

print(q.get())

执行结果:

返回顶部