【python】-- 多进程的基本语法 、进程间数据交互与共享、进程锁和进程池的使用...

mac2024-10-16  50

多进程

进程之间是相互独立的,python是启动进程的时候,是启动的是原生进程。进程是没有GIL锁的,而且不存在锁的概念,进程之间的数据式不能共享的,而线程是可以的。

1、进程的定义

用muliprocessing这个包中的Process来定义多进程,跟定义多线程类似

? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from multiprocessing import Process   # 导入进程模块 import time     def run(name):      time.sleep( 2 )      print ( "hello" , name)   if __name__ = = "__main__" :      p_obj_list = list ()    # 存放进程对象      for i in range ( 10 ):    # 启动10个进程          p = Process(target = run, args = ( "QQ{0}" . format (i),))  # 产生一个进程实例          p.start()   # 启动进程          p_obj_list.append(p)         for p in p_obj_list:          p.join()   # 等待进程结果

2、进程中加入线程

+ View Code ? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 from multiprocessing import Process import time,threading     def thread_run(name):   # 定义线程执行的方法      print ( "{0}:{1}" . format (name, threading.get_ident()))  # thread.get_ident ()返回当前线程的标识符,标识符是一个非零整数     def run(name):      time.sleep( 2 )      print ( "hello" , name)      t = threading.Thread(target = thread_run, args = (name,))   # 嵌入线程      t.start()   # 执行线程     if __name__ = = "__main__" :      p_obj_list = list ()      for i in range ( 10 ):          p = Process(target = run, args = ( "QQ{0}" . format (i),))          p.start()          p_obj_list.append(p)        for p in p_obj_list:          p.join()

3、父子进程

每个子进程都是由一个父进程启动的,每个程序也是有一个父进程

+ View Code ? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 from multiprocessing import Process import os     def info(title):      print (title)      print ( 'module name:' , __name__)      print ( 'parent process:' , os.getppid())  # 获得父进程ID      print ( 'process id:' , os.getpid())  # 获得子进程ID      print ( "\n\n" )     def f(name):      info( '\033[31;1m function f\033[0m' )      print ( 'hello' , name)   if __name__ = = '__main__' :      info( '\033[32;1m main process line\033[0m' )      p = Process(target = f, args = ( 'QQ' ,))      p.start()      p.join()

  

 

 

进程间数据交互与共享

知道不同进程之间内存是不共享的,要想实现两个进程间的通信需要用到multiprocessing库中的queue(队列)模块,这个multiprocessing库中的queue模块跟单纯的queue库是不一样的。进程导入前者(这里的queue是专门为进程之间的通信设计的)不出错,导入后者(这里的queue主要是线程间数据交互)出错。

1、线程访问queue

+ View Code ? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import queue,threading     def f(q):      q.put([ 66 , None , 'hello word' ])   if __name__ = = '__main__' :      q = queue.Queue()   # 把这个q传给了子线程      p = threading.Thread(target = f, args = (q,))   # 子线程访问父线程的q      p.start()      print (q.get())      p.join()   #执行结果 [ 66 , None , 'hello word' ]

2、进程访问queue

+ View Code ? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 from multiprocessing import Process import queue     def f(q):      q.put([ 66 , None , 'hello word' ])   if __name__ = = '__main__' :      q = queue.Queue()   # 把这个q传给了子线程      p = Process(target = f, args = (q,))   # 子线程访问父线程的q      p.start()      print (q.get())      p.join()   #执行结果 Traceback (most recent call last):    File "C:/Users/dell/PycharmProjects/untitled/process/进程的定义.py" , line 77 , in <module>      p.start()    File "C:\Python36\lib\multiprocessing\process.py" , line 105 , in start      self ._popen = self ._Popen( self )    File "C:\Python36\lib\multiprocessing\context.py" , line 223 , in _Popen      return _default_context.get_context().Process._Popen(process_obj)    File "C:\Python36\lib\multiprocessing\context.py" , line 322 , in _Popen      return Popen(process_obj)    File "C:\Python36\lib\multiprocessing\popen_spawn_win32.py" , line 65 , in __init__      reduction.dump(process_obj, to_child)    File "C:\Python36\lib\multiprocessing\reduction.py" , line 60 , in dump      ForkingPickler( file , protocol).dump(obj) TypeError: can't pickle _thread.lock objects

3、进程访问multiprocessing库中的Queue模块

? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from multiprocessing import Process,Queue     def f(q):      q.put([ 66 , None , 'hello word' ])   if __name__ = = '__main__' :      q = Queue()   # 把这个q传给了子线程      p = Process(target = f, args = (q,))   # 子线程访问父线程的q      p.start()      print (q.get())      p.join()   #执行结果 [ 66 , None , 'hello word' ]

父进程相当于克隆一个Q,把自己的Q克隆了一份交给子进程,子进程这个时候往Q里面放了一份数据,然后父进程又能实际的获取到。但是你克隆了一份是不是就和父进程没有关系了,为什么还能联系在一起呢?但是实际上:等于这两个Q里面的数据又把它序列化了,序列化到一个中间的地方,类似于翻译,然后反序列化给这个父进程这边来了,其实这两个Q就是通过pickle来序列化的,不是一个真正的Q。

小结:两个线程之间可以修改一个数据,不加锁,可能就会出错。现在进程中的Queue,是实现了数据的传递,不是在修改同一份数据,只是实现一个进程的数据传给了另外一个进程。

 

4、通过Pipe()实现进程间的数据交互,manger实现数据共享

上面的例子是通过进程中的Queue,来进行数据共享的,其实还有一种方式实现数据共享,那就是管道,pipe,以及数据共享manger。

4.1、Pipe()函数

管道函数会返回由管道双方连接的一组连接对象,该管道默认是双向的(双向的)。

? 1 2 3 4 5 6 7 8 9 10 11 12 13 from multiprocessing import Process, Pipe     def f(conn):      conn.send([ 66 , None , 'hello,word' ])  # 发送消息给父进程      conn.close()   if __name__ = = '__main__' :      parent_conn, child_conn = Pipe()  # 管道生成返回两个实例,是双向的,这边把第1个作为父连接,第2个作为子连接。也可以,两者角色调换一下      p = Process(target = f, args = (child_conn,))      p.start()      print (parent_conn.recv())   # 接收子进程的消息      p.join()

4.2、接受多次和发送多次

+ View Code ? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from multiprocessing import Process, Pipe     def f(conn):      conn.send([ 66 , None , 'hello,word' ])  # 发送消息给父进程      conn.send( "QQ" )  # 发送消息给父进程      print (conn.recv())   # 接收父进程的消息      conn.close()   if __name__ = = '__main__' :      parent_conn, child_conn = Pipe()  # 管道生成返回两个实例,是双向的,这边把第1个作为父连接,第2个作为子连接。也可以,两者角色调换一下      p = Process(target = f, args = (child_conn,))      p.start()      print (parent_conn.recv())      print (parent_conn.recv())  # 接收两次      parent_conn.send( "微信" )   # 发送给子进程      p.join()

4.3、manger

manger可以完成数据间的共享。

? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from multiprocessing import Process, Manager import os     def f(d, l):      d[os.getpid()] = os.getpid()      l.append(os.getpid())      print (l)   if __name__ = = '__main__' :      with Manager() as manager:          d = manager. dict ()   # 声明一个字典,这个字典是用manger声明的,不是用dict()声明的          # manger.dict()是用专门的语法生产一个可在多进程之间进行传递和共享的一个字典          l = manager. list ( range ( 5 ))  # 同样声明一个列表          p_list = []          for i in range ( 10 ):              p = Process(target = f, args = (d, l))              p.start()              p_list.append(p)          for res in p_list:              res.join()          print (d)          print (l)

线程修改同一份数据的时候需要加锁,进程修改数据呢:不用加锁,因为这个manger已经帮你加锁了,它就默认不允许两个进程同时修改一份数据。两个进程没有办法同时修改一份数据,进程之间是独立的,它自己也要加锁,因为它把自己的东西同时copy好几份,跟刚刚的那个Queue一样,copy10个字典最终合成一个字典

 

 

 

进程锁和进程池的使用

1、进程锁

通过multiprocessing中的Lock模块来实现进程锁

? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 from multiprocessing import Process,Lock   # 导入进程锁     def f(l, i):      l.acquire()    # 加锁      try :          print ( "hello word" , i)      finally :          l.release()   # 释放锁   if __name__ = = "__main__" :      lock = Lock()     # 定义锁      for num in range ( 10 ):          Process(target = f, args = (lock, num,)).start()  # 把锁传入进程中

进程中不是相互独立的吗?为什么还要加锁:虽然每个进程都是独立运行的,但是问题来了,它们共享一块屏幕。这个锁存在的意义就是屏幕共享。如果进程1想着打印数据,而进程2想也想打印数据的情况,就有可能乱套了,然后通过这个锁来控制,去打印的时候,这个屏幕只有我独占,导致屏幕不会乱。

2、进程池apply和apply_saync

2.1、appley

同步执行,也就是串行执行的

? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from multiprocessing import Pool  # 导入进程池模块pool import time,os     def foo(i):      time.sleep( 2 )      print ( "in process" , os.getpid())  # 打印进程号     if __name__ = = "__main__" :      pool = Pool(processes = 5 )   # 设置进程池个数为5,也可以写成pool = Pool(5),允许进程池同时放入5个进程,并且这5个进程交给cpu去运行      for i in range ( 10 ):          pool. apply (func = foo, args = (i,))   # 同步执行挂起进程      print ( 'end' )      pool.close()      pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

2.2、apply_saync

异步执行,也就是并行执行。

? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from multiprocessing import Pool  # 导入进程池模块pool import time,os     def foo(i):      time.sleep( 2 )      print ( "in process" , os.getpid())  # 打印进程号     if __name__ = = "__main__" :      pool = Pool(processes = 5 )   # 设置进程池个数为5,也可以写成pool = Pool(5),允许进程池同时放入5个进程,并且这5个进程交给cpu去运行      for i in range ( 10 ):          pool.apply_async(func = foo, args = (i,))   # 采用异步方式执行foo函数      print ( 'end' )      pool.close()      pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

2.3、异步下回调函数

程序执行完毕之后,再回调过来执行这个Bar函数。

? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 from multiprocessing import Process,Pool import time,os     def foo(i):      time.sleep( 2 )      print ( "in process" , os.getpid())  # 打印子进程的进程号     def bar(arg):      print ( '-->exec done:' , arg, os.getpid())   # 打印进程号   if __name__ = = "__main__" :      pool = Pool(processes = 2 )      print ( "主进程" , os.getpid())   # 主进程的进程号      for i in range ( 3 ):          pool.apply_async(func = foo, args = (i,), callback = bar)   # 执行回调函数callback=Bar      print ( 'end' )      pool.close()      pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。   #执行结果 主进程 752 end in process 2348 - - > exec done: None 752 in process 8364 - - > exec done: None 752 in process 2348 - - > exec done: None 752

注:

回调函数说明fun=Foo干不完就不执行bar函数,等Foo执行完就去执行Bar这个回调函数是主进程去调用的,而不是每个子进程去调用的。

回调函数的用处:

  比如说你从各个机器上备份完毕,在回调函数中自动写一个脚本,说备份完毕

回调函数是主进程调用的原因?

  如果是子进程去调用这个回调函数,有多少个子进程就有多少个连接,如果是主进程的话,只需要一次长连接就可以了,这个效率就高了

  

 

最新回复(0)