博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
爬虫与多线程
阅读量:6413 次
发布时间:2019-06-23

本文共 11041 字,大约阅读时间需要 36 分钟。

多线程和多进程爬虫

 

一.线程

1.什么是线程。

线程是操作系统能够进行运算调度的最小单位。它被包含在进程中,是进城中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个线程可以并发多个线程,每条线程执行不同的任务。

 

2.线程常用的方法

方法

说明

start()

线程准备就绪,等待CPU调度

setName()

为线程设置名称

getName()

获取线程名称

setDaemon()

设置为守护线程

Join()

逐个执行每个线程,执行完毕后继续往下执行

run()

线程被调度后会执行该方法,如果想自定义线程类,需要重写run()方法

 

3.Threading类

3.1 线程的普通创建方式

Threadding用于提供线程相关的操作,线程是应用程序中工作的最小单元。

import threadingimport timedef show(arg):    time.sleep(1)    print('thread' + str(arg))for i in range(10):    t = threading.Thread(target=show, args=(i,))    t.start()print('主线程结束')结果:主线程结束thread0thread1thread2thread5thread4thread3thread7thread6thread8thread9

 上述代码创建了10个“前台”线程,然后控制器就交给了CPU,CPU根据指定算法进行调度,分片执行指令。

 

3.2 自定义线程类

继承自threading.Thread类来自定义线程类,但是其本质却是重构thread类中的run()方法。

 

import threading class myThread(threading.Thread):    def __init__(self, sum):        threading.Thread.__init__(self)        self.sum = sum def run(self):        print('对象数是:',self.sum)if __name__ == '__main__':     t1 = myThread(1)    t2 = myThread(2)    t1.start()    t2.start()

 

3.3 计算子线程执行的时间 

PS:sleep的时候是不会占用CPU的,操作系统会把线程挂起。

import threadingimport timedef show(n):    time.sleep(1)    print('thread' + str(n))start_time = time.time()obj_list = [] for i in range(5):    t = threading.Thread(target=show,args=(i,))    t.start()    obj_list.append(t)for obj in obj_list:    obj.join()print('花费的时间为:',time.time() - start_time)

 

3.4 守护线程 

线程的setDaemon(True)将线程变成主线程的守护线程,意思是当主进程结束后,子线程也会随之退出。意味着当主线程结束后,程序就结束了。

1 import threading 2 import time 3  4 def show(n): 5     time.sleep(1) 6     print('thread' + str(n)) 7  8 start_time = time.time() 9 obj_list = []10 11 for i in range(5):12     t = threading.Thread(target=show,args=(i,))13     t.setDaemon(True)14     t.start()15     obj_list.append(t)16 17 print('花费的时间为:',time.time() - start_time)

 

3.5 GIL(全局解释器锁) 

在Python的运行环境中,无论电脑是单核还是双核,操作系统同时只会执行一个线程。究其原因,是因为GIL(全局解释器锁)。

在Python中,一个线程要想要执行,必须要先拿到GIL。可以吧GIL想象成一个“通行证”,并且在一个进程中,GIL只有一个。没有通行证的线程就不会被执行。

Python多线程的工作过程:

  •  拿到公共数据
  • 申请GIL
  • Python解释器调用os的原生线程
  • os操作CPU执行运算
  • 当该线程的执行时间到了之后,无论是否执行完,GIL被释放
  • 其他线程重复上面的操作
  • 其他进程执行完成后,切换到原来的线程(从记录的上下文继续执行)

 

3.6 线程锁(Lock,RLock)

由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,当多个线程同时修改同一条数据时可能会出现脏数据,所以,出现了线程锁 - 同一时刻允许一个线程执行操作。

import threading,timedef run(n):    global num    num += 1num = 0obj_list = []for i in range(20000):    t = threading.Thread(target=run,args=(i,))    t.start()    obj_list.append(t)for obj in obj_list:    obj.join()print('num:',num)脏数据:19999

 

3.6.1  互斥锁(Lock) 

为了防止上面情况的发生,我们可以使用互斥锁(Lock)来解决。

import threading,timelock = threading.Lock() # 实例化一个锁对象def run(n):    lock.acquire()  # 获取锁    global num    num += 1    lock.release()   # 释放锁num = 0obj_list = []for i in range(20000):    t = threading.Thread(target=run,args=(i,))    t.start()    obj_list.append(t)for obj in obj_list:    obj.join()print('num:',num)

 

3.6.2  递归锁(RLock) 

RLock的用法和Lock一样,只是他支持嵌套。在多个锁没有释放的时候一般会使用Rlock类。

import threading,timelock = threading.RLock() # 实例化一个锁对象num = 0obj_list = []def run(n):    lock.acquire() # 获取锁     global num     num += 1     lock.release() # 释放锁 for i in range(20000):     t = threading.Thread(target=run,args=(i,))     t.start()     obj_list.append(t) for obj in obj_list:     obj.join() print('num:',num)

 

3.7 信号量Semaphore 

互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。

import threading,time  lock = threading.BoundedSemaphore(6) # 实例化一个锁对象  def run(n):    lock.acquire()  # 获取锁    time.sleep(1)    print('run the thread: %s' % n)    lock.release()   # 释放锁  num = 0  for i in range(200):    t = threading.Thread(target=run,args=(i,))    t.start()

 

3.8事件(Event)

python线程的事件用于主线程控制其他线程的执行,事件是一个简单的线程同步对象,主要提供了以下几种方法:

方法

说明

clear()

将flag设置为“false”

set()

将flag设置为“true”

is_set()

判断是否设置了flag

wait()

一直监听flag,没有检测到会一直处于阻塞状态

事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞

 

import threading,timeevent = threading.Event()  # 创建事件对象def lighter():    count = 0    event.set()   #初始值为绿灯    while 1:        if 5 < count <= 10:            event.clear() #红灯,清楚标志位            print('\33[41;1mred light is on...\033[0m')        elif count > 10:            event.set()   # 绿灯,设置标志位            count = 0        else:            print('\33[41;1mred light is on...\033[0m')        time.sleep(1)        count += 1def car(name):    while True:        if event.is_set():  # 判断是否设置了标志位            print("[%s] 绿灯亮,请行驶..." % name)            time.sleep(1)        else:            print("[%s] 红灯亮,请等待..." % name)            event.wait()            print("[%s] 绿灯亮,开始行驶..." % name)light = threading.Thread(target=lighter,) car = threading.Thread(target=car, args=('test',))
light.start()  car.start()

 

 

3.9条件(Condition) 

使得线程等待,只有满足某条件时,才释放n个线程。

是最简单的线程同步机制,Python提供的Condition对象提供了对复杂线程同步问题的支持。Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法。线程首先acquire一个条件变量,然后判断一些条件。如果条件不满足则wait;如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件。不断的重复这一过程,从而解决复杂的同步问题。

可以认为Condition对象维护了一个锁(Lock/RLock)和一个waiting池。线程通过acquire获得Condition对象,当调用wait方法时,线程会释放Condition内部的锁并进入blocked状态,同时在waiting池中记录这个线程。当调用notify方法时,Condition对象会从waiting池中挑选一个线程,通知其调用acquire方法尝试取到锁。

Condition对象的构造函数可以接受一个Lock/RLock对象作为参数,如果没有指定,则Condition对象会在内部自行创建一个RLock。

除了notify方法外,Condition对象还提供了notifyAll方法,可以通知waiting池中的所有线程尝试acquire内部锁。由于上述机制,处于waiting状态的线程只能通过notify方法唤醒,所以notifyAll的作用在于防止有线程永远处于沉默状态。

演示条件变量同步的经典问题是生产者与消费者问题:假设有一群生产者(Producer)和一群消费者(Consumer)通过一个市场来交互产品。生产者的”策略“是如果市场上剩余的产品少于1000个,那么就生产100个产品放到市场上;而消费者的”策略“是如果市场上剩余产品的数量多余100个,那么就消费3个产品。用Condition解决生产者与消费者问题的代码如下:

import threadingimport timeclass Producer(threading.Thread):    def run(self):        global count        while True:            if con.acquire():                if count > 1000:                    con.wait()                else:                    count = count+100                    msg = self.name+' produce 100, count=' + str(count)                    print msg                    con.notify()                con.release()                time.sleep(1)class Consumer(threading.Thread):    def run(self):        global count        while True:            if con.acquire():                if count < 100:                    con.wait()                else:                    count = count-3                    msg = self.name+' consume 3, count='+str(count)                    print msg                    con.notify()                con.release()                time.sleep(1)count = 500con = threading.Condition()def test():    for i in range(2):        p = Producer()        p.start()    for i in range(5):        c = Consumer()        c.start()if __name__ == '__main__':    test()

 

  

4.多线程的爬虫实例

'''使用多线程爬取1500个url中的图片''' import threading,requests,timelink_list = []with open('url.txt','r') as f:    for u in f.readlines():        url = u.split()[0].replace('\n','')        link_list.append(url)start_time = time.time()class myThread(threading.Thread):    def __init__(self,name,link_range):        threading.Thread.__init__(self)        self.name = name        self.link_range = link_range        self.userName = 0    def run(self):        print('开始:',self.name)        self.craw(self.name,self.link_range)        print('结束:',self.name)    def writeImages(self, ThreadName, url):        print("正在存储文件 %s ..." % ThreadName+str(self.userName))        path = r'D:\zhappian' + '\\' + ThreadName+str(self.userName) + '.png'        file = open(path, 'wb')        images = requests.get(url,timeout = 20).content        file.write(images)        file.close()        self.userName += 1    def craw(self,name,link_range):        for i in range(link_range[0],link_range[len(link_range)-1]):            self.writeImages(name,link_list[i])obj_list = []url_list = [(0,300),(301,600),(601,900),(901,1200),(1201,1500)]# 创建新线程for i in range(5):    t = myThread('Thread-'+str(i+1), url_list[i])    t.start()    obj_list.append(t)# 等待所有线程执行完成for url in url_list:    url.join()end_time = time.time()print('爬虫的运行时间为:',end_time - start_time)

 

5.队列(queue)

queue是python中的标准库,俗称队列。在python中,多个线程之间的数据是共享的,多个线程进行数据交换的时候,不能够保证数据的安全性和一致性,所以当多个线程需要进行数据交换的时候,队列就出现了,队列可以完美解决线程间的数据交换,保证线程间数据的安全性和一致性。

PS: 在python2.x中,模块名为Queue

queue模块有三种队列及构造函数

l  Python queue模块的FIFO队列先进先出。 queue.Queue(maxsize)

l  LIFO类似于堆,即先进后出。 queue.LifoQueue(maxsize)

l  还有一种是优先级队列级别越低越先出来。 queue.PriorityQueue(maxsize)

queue模块中的常用方法:

方法

说明

queue.qsize()

返回队列的大小

queue.empty()

如果队列为空,返回True,反之False

queue.full()

如果队列满了,返回True,反之False (queue.full 与 maxsize 大小对应)

queue.get([block[, timeout]])

获取队列,立即取出一个元素, timeout超时时间

queue.put(item[, timeout]])

写入队列,立即放入一个元素, timeout超时时间

queue.join()

阻塞调用线程,直到队列中的所有任务被处理掉, 实际上意味着等到队列为空,再执行别的操作

queue.task_done()

在完成一项工作之后,queue.task_done()函数向任务已经完成的队列发送一个信号

 

 

5.1代码实例

l   创建队列

import queueq = queue.Queue()

l   empty方法(如果队列为空,返回True)

import queueq = queue.Queue()print(q.empty())#输出:True

l  full方法(如果队列满了,返回True)

import queueq = queue.Queue(1) #指定队列大小q.put('a')print(q.full())#输出:True

l  put方法和get方法

import queueq = queue.Queue()q.put('a')q.put('b')print(q.get())#输出:aprint(q.get())#输出:b

l  qsize方法(返回队列里元素个数)

import queueq = queue.Queue()q.put('a')q.put('b')print(q.qsize())#输出:2

5.2  生产者消费者 

import threading,timeimport queueq = queue.Queue(maxsize=10)# 生产者def Producer(name):    count = 1    while True:        q.put("冠军%s" % count)        print("冠军",count)        count +=1        time.sleep(0.1)#消费者def  Consumer(name):    #while q.qsize()>0:    while True:        print("[%s] 取到[%s]..." %(name, q.get()))        time.sleep(1)p = threading.Thread(target=Producer,args=("IG",))c = threading.Thread(target=Consumer,args=("LPL",))c1 = threading.Thread(target=Consumer,args=("LCK",))p.start()c.start()c1.start()

 

6.多线程和爬虫实例。

import threading,requests,timeimport queuestart_time = time.time()obj_list = []work_queue = queue.Queue(1500)link_list = []with open('url.txt','r') as f:    for u in f.readlines():        url = u.split()[0].replace('\n','')        link_list.append(url)# 填充队列for url in link_list:    work_queue.put(url)class myThread(threading.Thread):    def __init__(self,name,q):        threading.Thread.__init__(self)        self.name = name        self.userName = 0        self.q = q    def run(self):        print('开始:',self.name)        while True:            try:                self.craw(self.name,self.q)            except Exception as e:                break        print('结束:',self.name)    def writeImages(self, ThreadName, url):        print("正在存储文件 %s ..." % ThreadName+str(self.userName))        path = r'D:\zhappian' + '\\' + ThreadName+str(self.userName) + '.png'        file = open(path, 'wb')        images = requests.get(url,timeout = 20).content        file.write(images)        file.close()        self.userName += 1    def craw(self,name,q):        url = q.get(timeout = 2)        try:            self.writeImages(name, url)        except Exception as e:            print(q.qsize(),url,e)# 创建新线程for i in range(5):    t = myThread('Thread-'+str(i+1), work_queue)    t.start()    obj_list.append(t)# 等待所有线程执行完成for url in obj_list:    url.join()end_time = time.time()print('爬虫的运行时间为:',end_time - start_time)

转载于:https://www.cnblogs.com/wl443587/p/9911721.html

你可能感兴趣的文章
揭秘天猫双11背后:20万商家600万张海报,背后只有一个鹿班
查看>>
重置mysq root密码脚本
查看>>
我的友情链接
查看>>
MHA配置参数
查看>>
深入理解Lock
查看>>
vim的块选择
查看>>
XenAPP And XenDesktop 7.6 新特性简介
查看>>
CentOS6.x和CentOS7.x字符界面安装图形界面方法
查看>>
vista系统释放电脑本身内存
查看>>
我的友情链接
查看>>
简明 Vim 练级攻略
查看>>
Windows下使用命令创建rar压缩包
查看>>
HTML --块
查看>>
在DLL中获取主进程窗口句柄
查看>>
基于消息队列的双向通信
查看>>
一个不错的loading效果
查看>>
高中学渣逆袭入“大学”:如今月收入达五位数
查看>>
Debian允许root用户登录
查看>>
C++ - this指针
查看>>
Google Test and Google Mock Introduction
查看>>