from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from threading import current_thread
import time
import os
'''
池子中创建的进程/线程创建一次就不会再创建了,这样的话节省了反复开辟进程/
线程的资源
'''
# pool = ProcessPoolExecutor(
5) # 默认是当前计算机cpu的个数
pool = ThreadPoolExecutor(
5) # 括号内可以传参数指定线程池内的线程个数,不传默认是当前所在计算机的cpu个数乘5
def test(n):
# print(n,os.getpid()) # 查看当前进程号
print(n,current_thread().name) # 查看当前线程号
time.sleep(1)
return n**
2
def call_back(n):
print('haha',n.result())
"""
提交任务的方式
同步:提交任务之后 原地等待任务的返回结果 期间不做任何事
异步:提交任务之后 不等待任务的返回结果,直接执行下一行代码
"""# pool.submit(task,1) # 朝线程池中提交任务 异步提交# print('主')
# 异步回调机制:当异步提交的任务有返回结果之后,会自动触发回调函数的执行
if __name__ ==
'__main__':
for i
in range(
20):
pool.submit(test,i).add_done_callback(call_back) # 提交任务的时候 绑定一个回调函数 一旦该任务有结果 立刻执行对于的回调函数
pool.shutdown() #关闭池子 等待池子中所有的任务执行完毕之后 才会往下运行代码
协程
#串行执行 0.8540799617767334
mport time
def func1():
for i
in range(
1000000):
i +
1
def func2():
for i
in range(
1000000):
i +
1
start =
time.time()
func1()
func2()
stop =
time.time()
print(stop-
start)
#基于yield并发执行
import time
def func1():
while True:
10000000+
1
yield
def func2():
g=
func1()
for i
in range(
5):
#time.sleep(1) # 模拟IO,yield并不会捕捉到并自动切换
i+
1
next(g)
start=
time.time()
func2()
stop=
time.time()
print(stop-start)
gevent模块是能够识别IO的一个工具
from gevent import monkey;monkey.patch_all() # 能够识别IO
from gevent import spawn # 切换cpu
import time
def ha():
print('ha')
time.sleep(1)
print('ha')
def hei():
print('hei')
time.sleep(1)
print('hei')
start =
time.time()
g1 =
spawn(ha) #spawn 会检测所有任务
g2 =
spawn(hei)
g1.join()
g2.join()
print(time.time()-start)
通过协程实现并发
客户端
import socket
from threading import Thread,current_thread
def test():
client =
socket.socket()
client.connect(('127.0.0.1',
8080))
n =
0
while True:
data =
'%s %s'%
(current_thread().name,n)
client.send(data.encode('utf-8'))
res = client.recv(
1024)
print(res.decode('utf-8'))
n +=
1
for i
in range(
400):
t = Thread(target=
test)
t.start()
服务端
from gevent import monkey;monkey.patch_all()
import socket
from gevent import spawn
server =
socket.socket()
server.bind(('127.0.0.1',
8080))
server.listen(5)
def task(conn):
while True:
try:
res = conn.recv(
1024)
if len(res) ==
0:
break
print(res.decode('utf-8'))
conn.send(res.upper())
except ConnectionResetError as e:
print(e)
break
conn.close()
def server1():
while True:
conn, addr =
server.accept()
spawn(task,conn)
if __name__ ==
'__main__':
g =
spawn(server1)
g.join()
转载于:https://www.cnblogs.com/zrh-960906/p/11358661.html