python 多线程编程之threading 模块

发布于 2020-05-06  669 次阅读


Python的线程操作在旧版本中使用的是thread模块,在Python27和Python3中引入了threading模块,同时thread模块在Python3中改名为_thread模块,threading模块相较于thread模块,对于线程的操作更加的丰富,而且threading模块本身也是相当于对thread模块的进一步封装而成,thread模块有的功能threading模块也都有,所以涉及到对线程的操作,推荐使用threading模块。

一、threading函数

  • threading.currentThread(): 返回当前的线程变量。
  • threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
  • threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

二、线程对象threading.Thread

threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

  • group:应该设为None,即不用设置,使用默认值就好,因为这个参数是为了以后实现ThreadGroup类而保留的。
  • target:在run方法中调用的可调用对象,即需要开启线程的可调用对象,比如函数或方法。
  • name:线程名称,默认为“Thread-N”形式的名称,N为较小的十进制数。
  • args:在参数target中传入的可调用对象的参数元组,默认为空元组()。
  • kwargs:在参数target中传入的可调用对象的关键字参数字典,默认为空字典{}。
  • daemon:默认为None,即继承当前调用者线程(即开启线程的线程,一般就是主线程)的守护模式属性,如果不为None,则无论该线程是否为守护模式,都会被设置为“守护模式”。

THREAD类方法

  • run(): 用以表示线程活动的方法。
  • start():启动线程活动。
  • join([timeout=None]):让当前调用者线程(即开启线程的线程,一般就是主线程)等待,直到线程结束(无论它是什么原因结束的),timeout参数是以秒为单位的浮点数,用于设置操作超时的时间,返回值为None。如果想要判断线程是否超时,只能通过线程的is_alive方法来进行判断。join方法可以被调用多次。如果对当前线程使用join方法(即线程在内部调用自己的join方法),或者在线程没有开始前使用join方法,都会报RuntimeError错误。
  • isAlive(): 返回线程是否活动的。
  • getName(): 返回线程名。
  • setName(): 设置线程名。

三、使用threading模块创建线程

使用Thread类,可以有多种方法创建线程:

  • 创建Thread类的实例,传递一个函数
  • 创建Thread类的实例,传递一个可调用的类实例
  • 派生Thread类的子类,并创建子类的实例

一般的,我们会采用第一种或者第三种方法。如果需要一个更加符合面向对象的接口时,倾向于选择第三种方法,否则就用第一种方法吧。

1.创建Thread类,传递一个参数

import time
import threading

def run_thread(params='Hi', sleep=3):
    """线程执行函数"""
    time.sleep(sleep)
    print(params)

def main():
    t1 = threading.Thread(target=run_thread, args=())
    t2 = threading.Thread(target=run_thread, args=('hello', 1))

    t1.start()
    t2.start()
    # t1.join()
    # t2.join()
    print('主线程执行完!')

if __name__ =='__main__':
    main()

2.创建Thread实例,传递一个可调用的类实例

import time
import threading

class ThreadFuc():
    def __init__(self,func,args, name=''):
        self.func = func
        self.args = args
        self.name = name

    def __call__(self):
        self.func(*self.args)

def run_thread(params='Hi', sleep=3):
    """线程执行函数"""
    time.sleep(sleep)
    print(params)

def main():
    t1 = threading.Thread(target=ThreadFuc(run_thread,('nihao',2)))
    t2 = threading.Thread(target=ThreadFuc(run_thread,('hello', 1)))

    t1.start()
    t2.start()
    # t1.join()
    # t2.join()
    print('主线程执行完!')

if __name__ =='__main__':
    main()

3.使用threading.Thread类的子类创建线程

import time
import threading

class ThreadFuc():
    def __init__(self,func,args, name=''):
        self.func = func
        self.args = args
        self.name = name

    def __call__(self):
        self.func(*self.args)

class MyThread(threading.Thread):
    def __init__(self, func, parmas):
        # threading.Thread.__init__(self) #手动调用父类方法,重写threading.Thread的__init__方法,确保在所有操作之前先调用threading.Thread.__init__方法
        super(MyThread, self).__init__() #super自动找到父类,帮助调用父类方法
        self.func = func
        self.parmas = parmas

    def run(self):
        self.func(*self.parmas)


def run_thread(params='Hi', sleep=3):
    """线程执行函数"""
    time.sleep(sleep)
    print(params)

def main():
    t1 = MyThread(run_thread, ('Hello',1))
    t2 = MyThread(run_thread,('你怎么这么久才执行完', 5))
    t1.start()
    t2.start()
    # t1.join()
    # t2.join()
    print('主线程执行完!')

if __name__ =='__main__':
    main()

4.join方法使用

def run_thread(params='Hi', sleep=3):
    """线程执行函数"""
    time.sleep(sleep)
    print(params)

def main():

    # 创建线程
    t1 = threading.Thread(target=run_thread,args=('Hello', 2))
    t2 = threading.Thread(target=run_thread,args=('World', 10))

    # 启动线程
    t1.start()
    t2.start()
    print('马上执行join方法了')
    # 执行join方法会阻塞调用线程(主线程),直到调用join方法的线程(t1)结束
    t1.join()
    print('线程t1已结束')
    t2.join()
    print('主线程执行完!')

if __name__ =='__main__':
    main()

5.线程同步

如果多个线程共同对某个数据修改,则可能出现不可预料的结果,为了保证数据的正确性,需要对多个线程进行同步。

使用 Thread 对象的 Lock 和 Rlock 可以实现简单的线程同步,这两个对象都有 acquire 方法和 release 方法,对于那些需要每次只允许一个线程操作的数据,可以将其操作放到 acquire 和 release 方法之间。

当锁在被锁定时,它并不属于某一个特定的线程。

锁只有“锁定”和“非锁定”两种状态,当锁被创建时,是处于“非锁定”状态的。当锁已经被锁定时,其他线程再次调用acquire()方法会被阻塞执行,直到锁被获得锁的线程调用release()方法释放掉锁并将其状态改为“非锁定”。

同一个线程获取锁后,如果在释放锁之前再次获取锁会导致当前线程阻塞,除非有另外的线程来释放锁,如果只有一个线程,并且发生了这种情况,会导致这个线程一直阻塞下去,即形成了死锁。所以在获取锁时需要保证锁已经被释放掉了,或者使用递归锁来解决这种情况。

  • acquire(blocking=True, timeout=-1):获取锁,并将锁的状态改为“锁定”,成功返回True,失败返回False。当一个线程获得锁时,会阻塞其他尝试获取锁的线程,直到这个锁被释放掉。timeout默认值为-1,即将无限阻塞等待直到获得锁,如果设为其他的值时(单位为秒的浮点数),将最多阻塞等待timeout指定的秒数。当blocking为False时,timeout参数被忽略,即没有获得锁也不进行阻塞。
  • release():释放一个锁,并将其状态改为“非锁定”,需要注意的是任何线程都可以释放锁,不只是获得锁的线程(因为锁不属于特定的线程)。release()方法只能在锁处于“锁定”状态时调用,如果在“非锁定”状态时调用则会报RuntimeError错误。

多线程的优势在于可以同时运行多个任务(至少感觉起来是这样)。但是当线程需要共享数据时,可能存在数据不同步的问题。

import threading
import time

# 创建锁
lock = threading.Lock()

resource = [None] * 5

def run_thread(params='Hi', sleep=3):
    """线程执行函数"""

    # 上锁
    lock.acquire()
    global resource
    for i in range(len(resource)):
        resource[i] = params
        time.sleep(sleep)

    print('resource为-----------------------',resource)
    # 释放锁
    lock.release()

def main():
    # 创建线程
    t1 = threading.Thread(target=run_thread, args=('Hello', 2))
    t2 = threading.Thread(target=run_thread, args=('nihao', 1))

    t1.start()
    t2.start()

    print('主线程执行结束------')

if __name__ == '__main__':
    main()


主线程执行结束------
resource为----------------------- ['Hello', 'Hello', 'Hello', 'Hello', 'Hello']
resource为----------------------- ['nihao', 'nihao', 'nihao', 'nihao', 'nihao']

6.线程优先级队列(Queue)

Python 的 Queue 模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列 PriorityQueue。

这些队列都实现了锁原语,能够在多线程中直接使用,可以使用队列来实现线程间的同步。

Queue 模块中的常用方法:

  • Queue.qsize() 返回队列的大小
  • Queue.empty() 如果队列为空,返回True,反之False
  • Queue.full() 如果队列满了,返回True,反之False
  • Queue.full 与 maxsize 大小对应
  • Queue.get([block[, timeout]])获取队列,timeout等待时间
  • Queue.get_nowait() 相当Queue.get(False)
  • Queue.put(item) 写入队列,timeout等待时间
  • Queue.put_nowait(item) 相当Queue.put(item, False)
  • Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
  • Queue.join() 实际上意味着等到队列为空,再执行别的操作
import threading
import queue #优先级队列
import time

exitFlag = 0

threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]

# 创建锁
queueLock = threading.Lock()
# 定义队列
workQueue = queue.Queue(10)

threads = []
threadID = 1

class MyThread (threading.Thread):
    def __init__(self, threadID, name, q):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.q = q

    def run(self):
        print ("开启线程:" + self.name)
        process_data(self.name, self.q)
        print ("退出线程:" + self.name)

def process_data(threadName, q):
    """线程执行函数"""
    while not exitFlag:
        queueLock.acquire()
        if not workQueue.empty():
            data = q.get()
            queueLock.release()
            print ("%s processing %s" % (threadName, data))
        else:
            queueLock.release()
        time.sleep(1)

def main():

    global threadID
    global threadList
    global nameList
    global exitFlag

    # 创建新线程
    for tName in threadList:
        thread = MyThread(threadID, tName, workQueue)
        thread.start()
        threads.append(thread)
        threadID += 1

    # 填充队列
    queueLock.acquire()
    for word in nameList:
        workQueue.put(word)
    queueLock.release()

    # 等待队列清空
    while not workQueue.empty():
        pass

    # 通知线程是时候退出
    exitFlag = 1

    # 等待所有线程完成
    for t in threads:
        t.join()
    print ("退出主线程")

if __name__ == '__main__':
        main()


开启线程:Thread-1
开启线程:Thread-2
开启线程:Thread-3
Thread-3 processing One
Thread-1 processing Two
Thread-2 processing Three
Thread-3 processing Four
Thread-3 processing Five
退出线程:Thread-3
退出线程:Thread-1
退出线程:Thread-2
退出主线程


一名测试工作者,专注接口测试、自动化测试、性能测试、Python技术。