from threading import Thread import time def say(name): time.sleep(2) print("%s hello"%name) if __name__ =="__main__": t = Thread(target=say,args=("alex",)) t.start() print("主线程") 方式一 from threading import Thread import time class say(Thread): def __init__ (self,name): super().__init__() self.name = name def run(self): time.sleep(2) print("%s say hello" %self.name) if __name__ == "__main__": t = say("alex") t.start() print("主线程") 方式二
基于套接字实现多线程
from socket import * from threading import Thread def communicate(conn): while True: try: data = conn.recv(1024) if not data:break conn.send(data.upper()) except ConnectionResetError: break conn.close() def server(ip,port): server = socket(AF_INET,SOCK_STREAM) server.bind((ip,port)) server.listen(5) while True: conn,addr = server.accept() t = Thread(target=communicate,args=(conn,)) t.start() server.close() if __name__ == "__main__": server("127.0.0.1",8080) server from socket import * client = socket(AF_INET,SOCK_STREAM) client.connect(("127.0.0.1",8080)) while True: msg = input(">>: ").strip() if not msg:continue client.send(msg.encode("utf-8")) data = client.recv(1024) print(data.decode("utf-8")) client.close() client
区别:
2 同一个进程内多线程它们的PID都是一样的,多进程的PID 不同
from threading import Thread import os def work(): print("hello",os.getpid()) if __name__ == "__main__": t1 = Thread(target=work) t2 = Thread(target=work) t1.start() t2.start() print("end..") ''' hello 2684 hello 2684 end.. '''
from multiprocessing import Process import os def work(): print("hello",os.getpid()) if __name__ == "__main__": t1 = Process(target=work) t2 = Process(target=work) t1.start() t2.start() print("end..") ''' end.. hello 16372 hello 8456 '''
3 同一个进程内的线程共享进程的数据, 进程之间地址空间是隔离的
from multiprocessing import Process def work(): global n n = 10 if __name__ == "__main__": n = 100 p = Process(target=work) p.start() p.join() print("主",n) ''' 主 100 '''
from threading import Thread def work(): global n n = 10 if __name__ == "__main__": n = 100 p = Thread(target=work) p.start() p.join() print("主",n) ''' 主 10 '''
''' thread 实例对象的方法 isAlive() 返回线程是否活动的 getName() 返回线程名 setName() 设置线程名 threadming 模块提供的一些方法 threading.currentThread() 返回当前线程变量 threading.enumerate() 返回一个包含正在运行的线程的list。正在运行指线程启动后,结束前,不包括启动前和终止后的进程 threading.activeCount() 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果 '''
from threading import Thread import threading import time def work(): time.sleep(2) threading.currentThread().setName("wwwww") # 设置线程名 print(threading.currentThread().getName()) if __name__ == "__main__": t = Thread(target=work) t.start() print(threading.current_thread().getName()) # MainThread print(threading.current_thread()) # <_MainThread(MainThread, started 12296)> print(threading.enumerate()) # [<_MainThread(MainThread, started 12296)>, <Thread(Thread-1, started 3420)>] print(threading.active_count()) # 2 print('主线程/主进程') # 主线程/主进程 ''' MainThread <_MainThread(MainThread, started 5860)> [<_MainThread(MainThread, started 5860)>, <Thread(Thread-1, started 16080)>] 2 主线程/主进程 wwwww '''
主线程等待子线程结束
from threading import Thread import time def say(name): time.sleep(2) print("%s say hello" %name) if __name__== "__main__": t = Thread(target=say,args=("egon",)) t.start() t.join() print("end") print(t.is_alive()) ''' egon say hello end False '''
1 主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束, 2 主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。
from threading import Thread import time def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(2) print("end456") if __name__ == "__main__": t1 = Thread(target=foo) t2 = Thread(target=bar) t1.daemon = True t1.start() t2.start() print("----------------------") ''' 123 456 ---------------------- end123 end456 '''
GIL本质就是一把互斥锁,既然是互斥锁,所有互斥锁的本质都一样,都是将并发运行变成串行,以此来控制同一时间内共享数据只能被一个任务所修改,进而保证数据安全。 保护不同的数据的安全,就应该加不同的锁。
解释器的代码是所有线程共享的,所以垃圾回收线程也可能访问到解释器的代码而去执行,这就导致了一个问题:对于同一个数据100,可能线程1执行x=100的同时,而垃圾回收执行的是回收100的操作,解决这种问题没有什么高明的方法,就是加锁处理,如下图的GIL,保证python解释器同一时间只能执行一个任务的代码
1、100个线程去抢GIL锁,即抢执行权限 2、肯定有一个线程先抢到GIL(暂且称为线程1),然后开始执行,一旦执行就会拿到lock.acquire() 3、极有可能线程1还未运行完毕,就有另外一个线程2抢到GIL,然后开始运行,但线程2发现互斥锁lock还未被线程1释放,于是阻塞,被迫交出执行权限,即释放GIL 4、直到线程1重新抢到GIL,开始从上次暂停的位置继续执行,直到正常释放互斥锁lock,然后其他的线程再重复2 3 4的过程
有了GIL的存在,同一时刻同一进程只有一个线程被执行
现在的计算机基本上都是多核,python对于计算密集型的任务开多线程的效率并不能带来多大性能上的提升,甚至不如串行(没有大量切换),但是,对于IO密集型的任务效率还是有显著提升的。
from multiprocessing import Process from threading import Thread import os,time def work(): res = 0 for i in range(10000000): res *=i if __name__ == "__main__": l = [] start = time.time() for i in range(4): # 4 核 p = Process(target=work) #p = Thread(target=work) l.append(p) p.start() for p in l: p.join() stop = time.time() print("run time is %s "%(stop - start )) ''' 多线程用于IO密集型,如socket,爬虫,web 多进程用于计算密集型,如金融分析 '''
from threading import Thread,Event,currentThread import time event = Event() def conn(): n = 0 while not event.is_set(): if n ==3: print("%s try too many times" %currentThread().getName()) return print("%s try %s" %(currentThread().getName(),n)) event.wait(2) n +=1 print("%s is connected" %currentThread().getName()) def check(): print("%s is checking" %currentThread().getName()) time.sleep(3) event.set() if __name__ == "__main__": for i in range(3): t = Thread(target=conn) t.start() t = Thread(target=check) t.start()
import time from threading import Event,Thread event = Event() class Boss(Thread): def run(self): print("BOSS: 今晚大家都要加班到22:00") event.set() time.sleep(5) print("Boss: <22:00>可以下班") event.set() class Worker(Thread): def run(self): event.wait() print("worker: 哎……命苦啊!") time.sleep(1) event.clear() event.wait() print("worker OhYeah!") if __name__ == "__main__": threads = [] for i in range(5): threads.append(Worker()) threads.append(Boss()) for t in threads: t.start() for t in threads: t.join() print("ending...")
基本方法 ''' 1. submit(fn,*args,**kwargs) 异步提交任务 2. map(func,*iterables,timeout=None,chunksize=1) 取代for循环submit的操作 3. shutdown(wait=True) 相当于进程池的pool.close()+pool.join() 操作 wait=True 等待池内所有任务执行完毕回收完资源后,才继续 wait=False 立刻返回 并不会等待池内的任务执行完毕 但不管wait参数为何值,整个程序都会等到所有任务执行完毕 submit和map 必须在shutdown之前 4. result(timeout=None) 取得结果 5. add_done_callback(fn) 回调函数 '''
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ProcessPoolExecutor(max_workers=3) futures=[] for i in range(3): future=executor.submit(task,i) futures.append(future) executor.shutdown(True) print('+++>') for future in futures: print(future.result())
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import os,time,random from threading import currentThread def task(): print("name :%s pid: %s run" %(currentThread().getName(),os.getpid())) time.sleep(random.randint(1,3)) if __name__ == "__main__": pool = ThreadPoolExecutor(5) for i in range(10): pool.submit(task,) pool.shutdown(wait=True) print("主")
转载于:https://www.cnblogs.com/augustyang/p/11044332.html
