多线程和多进程爬虫
一.线程
1.什么是线程。
线程是操作系统能够进行运算调度的最小单位。它被包含在进程中,是进城中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个线程可以并发多个线程,每条线程执行不同的任务。
2.线程常用的方法
方法 | 说明 |
start() | 线程准备就绪,等待CPU调度 |
setName() | 为线程设置名称 |
getName() | 获取线程名称 |
setDaemon() | 设置为守护线程 |
Join() | 逐个执行每个线程,执行完毕后继续往下执行 |
run() | 线程被调度后会执行该方法,如果想自定义线程类,需要重写run()方法 |
3.Threading类
3.1 线程的普通创建方式
Threadding用于提供线程相关的操作,线程是应用程序中工作的最小单元。
import threadingimport timedef show(arg): time.sleep(1) print('thread' + str(arg))for i in range(10): t = threading.Thread(target=show, args=(i,)) t.start()print('主线程结束')结果:主线程结束thread0thread1thread2thread5thread4thread3thread7thread6thread8thread9
上述代码创建了10个“前台”线程,然后控制器就交给了CPU,CPU根据指定算法进行调度,分片执行指令。
3.2 自定义线程类
继承自threading.Thread类来自定义线程类,但是其本质却是重构thread类中的run()方法。
import threading class myThread(threading.Thread): def __init__(self, sum): threading.Thread.__init__(self) self.sum = sum def run(self): print('对象数是:',self.sum)if __name__ == '__main__': t1 = myThread(1) t2 = myThread(2) t1.start() t2.start()
3.3 计算子线程执行的时间
PS:sleep的时候是不会占用CPU的,操作系统会把线程挂起。
import threadingimport timedef show(n): time.sleep(1) print('thread' + str(n))start_time = time.time()obj_list = [] for i in range(5): t = threading.Thread(target=show,args=(i,)) t.start() obj_list.append(t)for obj in obj_list: obj.join()print('花费的时间为:',time.time() - start_time)
3.4 守护线程
线程的setDaemon(True)将线程变成主线程的守护线程,意思是当主进程结束后,子线程也会随之退出。意味着当主线程结束后,程序就结束了。
1 import threading 2 import time 3 4 def show(n): 5 time.sleep(1) 6 print('thread' + str(n)) 7 8 start_time = time.time() 9 obj_list = []10 11 for i in range(5):12 t = threading.Thread(target=show,args=(i,))13 t.setDaemon(True)14 t.start()15 obj_list.append(t)16 17 print('花费的时间为:',time.time() - start_time)
3.5 GIL(全局解释器锁)
在Python的运行环境中,无论电脑是单核还是双核,操作系统同时只会执行一个线程。究其原因,是因为GIL(全局解释器锁)。
在Python中,一个线程要想要执行,必须要先拿到GIL。可以吧GIL想象成一个“通行证”,并且在一个进程中,GIL只有一个。没有通行证的线程就不会被执行。
Python多线程的工作过程:
- 拿到公共数据
- 申请GIL
- Python解释器调用os的原生线程
- os操作CPU执行运算
- 当该线程的执行时间到了之后,无论是否执行完,GIL被释放
- 其他线程重复上面的操作
- 其他进程执行完成后,切换到原来的线程(从记录的上下文继续执行)
3.6 线程锁(Lock,RLock)
由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,当多个线程同时修改同一条数据时可能会出现脏数据,所以,出现了线程锁 - 同一时刻允许一个线程执行操作。
import threading,timedef run(n): global num num += 1num = 0obj_list = []for i in range(20000): t = threading.Thread(target=run,args=(i,)) t.start() obj_list.append(t)for obj in obj_list: obj.join()print('num:',num)脏数据:19999
3.6.1 互斥锁(Lock)
为了防止上面情况的发生,我们可以使用互斥锁(Lock)来解决。
import threading,timelock = threading.Lock() # 实例化一个锁对象def run(n): lock.acquire() # 获取锁 global num num += 1 lock.release() # 释放锁num = 0obj_list = []for i in range(20000): t = threading.Thread(target=run,args=(i,)) t.start() obj_list.append(t)for obj in obj_list: obj.join()print('num:',num)
3.6.2 递归锁(RLock)
RLock的用法和Lock一样,只是他支持嵌套。在多个锁没有释放的时候一般会使用Rlock类。
import threading,timelock = threading.RLock() # 实例化一个锁对象num = 0obj_list = []def run(n): lock.acquire() # 获取锁 global num num += 1 lock.release() # 释放锁 for i in range(20000): t = threading.Thread(target=run,args=(i,)) t.start() obj_list.append(t) for obj in obj_list: obj.join() print('num:',num)
3.7 信号量(Semaphore)
互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。
import threading,time lock = threading.BoundedSemaphore(6) # 实例化一个锁对象 def run(n): lock.acquire() # 获取锁 time.sleep(1) print('run the thread: %s' % n) lock.release() # 释放锁 num = 0 for i in range(200): t = threading.Thread(target=run,args=(i,)) t.start()
3.8事件(Event)
python线程的事件用于主线程控制其他线程的执行,事件是一个简单的线程同步对象,主要提供了以下几种方法:
方法 | 说明 |
clear() | 将flag设置为“false” |
set() | 将flag设置为“true” |
is_set() | 判断是否设置了flag |
wait() | 一直监听flag,没有检测到会一直处于阻塞状态 |
事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
import threading,timeevent = threading.Event() # 创建事件对象def lighter(): count = 0 event.set() #初始值为绿灯 while 1: if 5 < count <= 10: event.clear() #红灯,清楚标志位 print('\33[41;1mred light is on...\033[0m') elif count > 10: event.set() # 绿灯,设置标志位 count = 0 else: print('\33[41;1mred light is on...\033[0m') time.sleep(1) count += 1def car(name): while True: if event.is_set(): # 判断是否设置了标志位 print("[%s] 绿灯亮,请行驶..." % name) time.sleep(1) else: print("[%s] 红灯亮,请等待..." % name) event.wait() print("[%s] 绿灯亮,开始行驶..." % name)light = threading.Thread(target=lighter,) car = threading.Thread(target=car, args=('test',))
light.start() car.start()
3.9条件(Condition)
使得线程等待,只有满足某条件时,才释放n个线程。
是最简单的线程同步机制,Python提供的Condition对象提供了对复杂线程同步问题的支持。Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法。线程首先acquire一个条件变量,然后判断一些条件。如果条件不满足则wait;如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件。不断的重复这一过程,从而解决复杂的同步问题。
可以认为Condition对象维护了一个锁(Lock/RLock)和一个waiting池。线程通过acquire获得Condition对象,当调用wait方法时,线程会释放Condition内部的锁并进入blocked状态,同时在waiting池中记录这个线程。当调用notify方法时,Condition对象会从waiting池中挑选一个线程,通知其调用acquire方法尝试取到锁。
Condition对象的构造函数可以接受一个Lock/RLock对象作为参数,如果没有指定,则Condition对象会在内部自行创建一个RLock。
除了notify方法外,Condition对象还提供了notifyAll方法,可以通知waiting池中的所有线程尝试acquire内部锁。由于上述机制,处于waiting状态的线程只能通过notify方法唤醒,所以notifyAll的作用在于防止有线程永远处于沉默状态。
演示条件变量同步的经典问题是生产者与消费者问题:假设有一群生产者(Producer)和一群消费者(Consumer)通过一个市场来交互产品。生产者的”策略“是如果市场上剩余的产品少于1000个,那么就生产100个产品放到市场上;而消费者的”策略“是如果市场上剩余产品的数量多余100个,那么就消费3个产品。用Condition解决生产者与消费者问题的代码如下:
import threadingimport timeclass Producer(threading.Thread): def run(self): global count while True: if con.acquire(): if count > 1000: con.wait() else: count = count+100 msg = self.name+' produce 100, count=' + str(count) print msg con.notify() con.release() time.sleep(1)class Consumer(threading.Thread): def run(self): global count while True: if con.acquire(): if count < 100: con.wait() else: count = count-3 msg = self.name+' consume 3, count='+str(count) print msg con.notify() con.release() time.sleep(1)count = 500con = threading.Condition()def test(): for i in range(2): p = Producer() p.start() for i in range(5): c = Consumer() c.start()if __name__ == '__main__': test()
4.多线程的爬虫实例
'''使用多线程爬取1500个url中的图片''' import threading,requests,timelink_list = []with open('url.txt','r') as f: for u in f.readlines(): url = u.split()[0].replace('\n','') link_list.append(url)start_time = time.time()class myThread(threading.Thread): def __init__(self,name,link_range): threading.Thread.__init__(self) self.name = name self.link_range = link_range self.userName = 0 def run(self): print('开始:',self.name) self.craw(self.name,self.link_range) print('结束:',self.name) def writeImages(self, ThreadName, url): print("正在存储文件 %s ..." % ThreadName+str(self.userName)) path = r'D:\zhappian' + '\\' + ThreadName+str(self.userName) + '.png' file = open(path, 'wb') images = requests.get(url,timeout = 20).content file.write(images) file.close() self.userName += 1 def craw(self,name,link_range): for i in range(link_range[0],link_range[len(link_range)-1]): self.writeImages(name,link_list[i])obj_list = []url_list = [(0,300),(301,600),(601,900),(901,1200),(1201,1500)]# 创建新线程for i in range(5): t = myThread('Thread-'+str(i+1), url_list[i]) t.start() obj_list.append(t)# 等待所有线程执行完成for url in url_list: url.join()end_time = time.time()print('爬虫的运行时间为:',end_time - start_time)
5.队列(queue)
queue是python中的标准库,俗称队列。在python中,多个线程之间的数据是共享的,多个线程进行数据交换的时候,不能够保证数据的安全性和一致性,所以当多个线程需要进行数据交换的时候,队列就出现了,队列可以完美解决线程间的数据交换,保证线程间数据的安全性和一致性。
PS: 在python2.x中,模块名为Queue
queue模块有三种队列及构造函数
l Python queue模块的FIFO队列先进先出。 queue.Queue(maxsize)
l LIFO类似于堆,即先进后出。 queue.LifoQueue(maxsize)
l 还有一种是优先级队列级别越低越先出来。 queue.PriorityQueue(maxsize)
queue模块中的常用方法:
方法 | 说明 |
queue.qsize() | 返回队列的大小 |
queue.empty() | 如果队列为空,返回True,反之False |
queue.full() | 如果队列满了,返回True,反之False (queue.full 与 maxsize 大小对应) |
queue.get([block[, timeout]]) | 获取队列,立即取出一个元素, timeout超时时间 |
queue.put(item[, timeout]]) | 写入队列,立即放入一个元素, timeout超时时间 |
queue.join() | 阻塞调用线程,直到队列中的所有任务被处理掉, 实际上意味着等到队列为空,再执行别的操作 |
queue.task_done() | 在完成一项工作之后,queue.task_done()函数向任务已经完成的队列发送一个信号
|
5.1代码实例
l 创建队列
import queueq = queue.Queue()
l empty方法(如果队列为空,返回True)
import queueq = queue.Queue()print(q.empty())#输出:True
l full方法(如果队列满了,返回True)
import queueq = queue.Queue(1) #指定队列大小q.put('a')print(q.full())#输出:True
l put方法和get方法
import queueq = queue.Queue()q.put('a')q.put('b')print(q.get())#输出:aprint(q.get())#输出:b
l qsize方法(返回队列里元素个数)
import queueq = queue.Queue()q.put('a')q.put('b')print(q.qsize())#输出:2
5.2 生产者消费者
import threading,timeimport queueq = queue.Queue(maxsize=10)# 生产者def Producer(name): count = 1 while True: q.put("冠军%s" % count) print("冠军",count) count +=1 time.sleep(0.1)#消费者def Consumer(name): #while q.qsize()>0: while True: print("[%s] 取到[%s]..." %(name, q.get())) time.sleep(1)p = threading.Thread(target=Producer,args=("IG",))c = threading.Thread(target=Consumer,args=("LPL",))c1 = threading.Thread(target=Consumer,args=("LCK",))p.start()c.start()c1.start()
6.多线程和爬虫实例。
import threading,requests,timeimport queuestart_time = time.time()obj_list = []work_queue = queue.Queue(1500)link_list = []with open('url.txt','r') as f: for u in f.readlines(): url = u.split()[0].replace('\n','') link_list.append(url)# 填充队列for url in link_list: work_queue.put(url)class myThread(threading.Thread): def __init__(self,name,q): threading.Thread.__init__(self) self.name = name self.userName = 0 self.q = q def run(self): print('开始:',self.name) while True: try: self.craw(self.name,self.q) except Exception as e: break print('结束:',self.name) def writeImages(self, ThreadName, url): print("正在存储文件 %s ..." % ThreadName+str(self.userName)) path = r'D:\zhappian' + '\\' + ThreadName+str(self.userName) + '.png' file = open(path, 'wb') images = requests.get(url,timeout = 20).content file.write(images) file.close() self.userName += 1 def craw(self,name,q): url = q.get(timeout = 2) try: self.writeImages(name, url) except Exception as e: print(q.qsize(),url,e)# 创建新线程for i in range(5): t = myThread('Thread-'+str(i+1), work_queue) t.start() obj_list.append(t)# 等待所有线程执行完成for url in obj_list: url.join()end_time = time.time()print('爬虫的运行时间为:',end_time - start_time)