搜索
您的当前位置:首页正文

简述Python并发编程----多线程

来源:步旅网

Python中实现并发编程的三种方案:多线程、多进程和异步I/O。并发编程的好处在于可以提升程序的执行效率以及改善用户体验;坏处在于并发的程序不容易开发和调试,同时对其他程序来说它并不友好。

1. 提升执行性能 - 让程序中没有因果关系的部分可以并发的执行
2. 改善用户体验 - 让耗时间的操作不会造成程序的假死

进程 - 操作系统分配内存资源的基本单位 - 一个进程可以包含一个或多个线程
线程 - 操作系统分配调度CPU的基本单位


多线程:

Python中提供了Thread类并辅以Lock、Condition、Event、Semaphore和Barrier。Python中有GIL来防止多个线程同时执行本地字节码,这个锁对于CPython是必须的,因为CPython的内存管理并不是线程安全的,因为GIL的存在多线程并不能发挥CPU的多核特性。

"""
多线程程序如果没有竞争资源处理起来通常也比较简单
当多个线程竞争临界资源的时候如果缺乏必要的保护措施就会导致数据错乱
说明:临界资源就是被多个线程竞争的资源
"""
import time
import threading

from concurrent.futures import ThreadPoolExecutor


class Account(object):
    """银行账户"""

    def __init__(self):
        self.balance = 0.0
        self.lock = threading.Lock()

    def deposit(self, money):
        # 通过锁保护临界资源
        with self.lock:
            new_balance = self.balance + money
            time.sleep(0.001)
            self.balance = new_balance


class AddMoneyThread(threading.Thread):
    """自定义线程类"""

    def __init__(self, account, money):
        self.account = account
        self.money = money
        # 自定义线程的初始化方法中必须调用父类的初始化方法
        super().__init__()

    def run(self):
        # 线程启动之后要执行的操作
        self.account.deposit(self.money)

def main():
    """主函数"""
    account = Account()
    # 创建线程池
    pool = ThreadPoolExecutor(max_workers=10)
    futures = []
    for _ in range(100):
        # 创建线程的第1种方式
        # threading.Thread(
        #     target=account.deposit, args=(1, )
        # ).start()
        # 创建线程的第2种方式
        # AddMoneyThread(account, 1).start()
        # 创建线程的第3种方式
        # 调用线程池中的线程来执行特定的任务
        future = pool.submit(account.deposit, 1)
        futures.append(future)
    # 关闭线程池
    pool.shutdown()
    for future in futures:
        future.result()
    print(account.balance)


if __name__ == '__main__':
    main()

多个线程竞争资源的情况

import glob
import os
import threading

from PIL import Image

PREFIX = 'thumbnails'


def generate_thumbnail(infile, size, format='PNG'):
    """生成指定图片文件的缩略图"""
	file, ext = os.path.splitext(infile)
	file = file[file.rfind('/') + 1:]
	outfile = f'{PREFIX}/{file}_{size[0]}_{size[1]}.{ext}'
	img = Image.open(infile)
	img.thumbnail(size, Image.ANTIALIAS)
	img.save(outfile, format)


def main():
    """主函数"""
	if not os.path.exists(PREFIX):
		os.mkdir(PREFIX)
	for infile in glob.glob('images/*.png'):
		for size in (32, 64, 128):
            # 创建并启动线程
			threading.Thread(
				target=generate_thumbnail, 
				args=(infile, (size, size))
			).start()
			

if __name__ == '__main__':
	main()

修改上面的程序,启动5个线程向账户中存钱,5个线程从账户中取钱,取钱时如果余额不足就暂停线程进行等待。为了达到上述目标,需要对存钱和取钱的线程进行调度,在余额不足时取钱的线程暂停并释放锁,而存钱的线程将钱存入后要通知取钱的线程,使其从暂停状态被唤醒。可以使用threading模块的Condition来实现线程调度,该对象也是基于锁来创建的,代码如下所示:

"""
多个线程竞争一个资源 - 保护临界资源 - 锁(Lock/RLock)
多个线程竞争多个资源(线程数>资源数) - 信号量(Semaphore)
多个线程的调度 - 暂停线程执行/唤醒等待中的线程 - Condition
"""
from concurrent.futures import ThreadPoolExecutor
from random import randint
from time import sleep

import threading


class Account():
    """银行账户"""

    def __init__(self, balance=0):
        self.balance = balance
        lock = threading.Lock()
        self.condition = threading.Condition(lock)

    def withdraw(self, money):
        """取钱"""
        with self.condition:
            while money > self.balance:
                self.condition.wait()
            new_balance = self.balance - money
            sleep(0.001)
            self.balance = new_balance

    def deposit(self, money):
        """存钱"""
        with self.condition:
            new_balance = self.balance + money
            sleep(0.001)
            self.balance = new_balance
            self.condition.notify_all()


def add_money(account):
    while True:
        money = randint(5, 10)
        account.deposit(money)
        print(threading.current_thread().name, 
              ':', money, '====>', account.balance)
        sleep(0.5)


def sub_money(account):
    while True:
        money = randint(10, 30)
        account.withdraw(money)
        print(threading.current_thread().name, 
              ':', money, '<====', account.balance)
        sleep(1)


def main():
    account = Account()
    with ThreadPoolExecutor(max_workers=10) as pool:
        for _ in range(5):
            pool.submit(add_money, account)
            pool.submit(sub_money, account)


if __name__ == '__main__':
    main()

Python多线程与多线程中join()的用法

Python多线程与多进程中join()方法的效果是相同的。

下面仅以多线程为例:

首先需要明确几个概念:

知识点一:
当一个进程启动之后,会默认产生一个主线程,因为线程是程序执行流的最小单元,当设置多线程时,主线程会创建多个子线程,在python中,默认情况下(其实就是setDaemon(False)),主线程执行完自己的任务以后,就退出了,此时子线程会继续执行自己的任务,直到自己的任务结束,例子见下面一。

知识点二:
当我们使用setDaemon(True)方法,设置子线程为守护线程时,主线程一旦执行结束,则全部线程全部被终止执行,可能出现的情况就是,子线程的任务还没有完全执行结束,就被迫停止,例子见下面二。

知识点三:
此时join的作用就凸显出来了,join所完成的工作就是线程同步,即主线程任务结束之后,进入阻塞状态,一直等待其他的子线程执行结束之后,主线程在终止,例子见下面三。

知识点四:
join有一个timeout参数:

一:Python多线程的默认情况

import threading
import time

def run():
    time.sleep(2)
    print('当前线程的名字是: ', threading.current_thread().name)
    time.sleep(2)


if __name__ == '__main__':

    start_time = time.time()

    print('这是主线程:', threading.current_thread().name)
    thread_list = []
    for i in range(5):
        t = threading.Thread(target=run)
        thread_list.append(t)

    for t in thread_list:
        t.start()

    print('主线程结束!' , threading.current_thread().name)
    print('一共用时:', time.time()-start_time)

其执行结果如下

关键点:

  1. 我们的计时是对主线程计时,主线程结束,计时随之结束,打印出主线程的用时。
  2. 主线程的任务完成之后,主线程随之结束,子线程继续执行自己的任务,直到全部的子线程的任务全部结束,程序结束。

二:设置守护线程

import threading
import time

def run():

    time.sleep(2)
    print('当前线程的名字是: ', threading.current_thread().name)
    time.sleep(2)


if __name__ == '__main__':

    start_time = time.time()

    print('这是主线程:', threading.current_thread().name)
    thread_list = []
    for i in range(5):
        t = threading.Thread(target=run)
        thread_list.append(t)

    for t in thread_list:
        t.setDaemon(True)
        t.start()

    print('主线程结束了!' , threading.current_thread().name)
    print('一共用时:', time.time()-start_time)

其执行结果如下,注意请确保setDaemon()在start()之前。

关键点:

  1. 非常明显的看到,主线程结束以后,子线程还没有来得及执行,整个程序就退出了。

三:join的作用

import threading
import time

def run():

    time.sleep(2)
    print('当前线程的名字是: ', threading.current_thread().name)
    time.sleep(2)


if __name__ == '__main__':

    start_time = time.time()

    print('这是主线程:', threading.current_thread().name)
    thread_list = []
    for i in range(5):
        t = threading.Thread(target=run)
        thread_list.append(t)

    for t in thread_list:
        t.setDaemon(True)
        t.start()

    for t in thread_list:
        t.join()

    print('主线程结束了!' , threading.current_thread().name)
    print('一共用时:', time.time()-start_time)

其执行结果如下:

关键点:

  1. 可以看到,主线程一直等待全部的子线程结束之后,主线程自身才结束,程序退出。

因篇幅问题不能全部显示,请点此查看更多更全内容

Top