BugBug

Асинхронное программирование на Python с применением библиотеки asyncio

Модуль ayncio появился в Python версии 3.4 и согласно документации обеспечивает возможность написания параллельного однопоточного кода с применением Coroutine, и асинхронного доступа к вводу\выводу по сокетам и другим ресурсам, запуску сетевых клиентов и серверов. В этой статье дается вводная информация по использованию данного модуля.

Что значит асинхронно?

Модуль asyncio основан на цикле событий. Он в основном ожидает, что что-то произойдет, а затем реагирует на событие. Он отвечает за обработку операций ввода\вывода и системные события.

asyncio реализует цикл несколькими способами. Модуль по умолчанию наиболее эффективен для используемой ОС, однако есть возможность выбрать цикл явно, если вы этого хотите. Цикл событий работает по следующему принципу: когда случится событие А, выполни функцию В.

Можно провести аналогию с сервером, который ожидает запросов. Если сайт не пользуется популярностью, сервер простаивает в течении длительного времени. Но когда он получает запрос, сервер должен реагировать. Эта и будет обработка событий. Когда клиент запрашивает веб-ресурс, сервер вызывает один или несколько обработчиков событий. В модуле asyncio для этих целей используются Coroutine.

Coroutine - это специфическая функция, которая может передать ход выполнения программы другой функции при этом не теряя своего состояния. Coroutine по сути являться расширенным генератором. Одно из больших преимуществ потоков заключается в том, что они не используют много памяти для выполнения. Запомните, что когда вы вызываете функцию Coroutine, она фактически не выполняется, а возвращает объект coroutine, который возможно передать в цикл событий, чтобы он выполнился либо сразу, либо позже.

Еще необходимый термин при использовании asyncio - это Future. Future - это объект, который представляет результат не завершенной работы. Цикл может просматривать Future-объекты и ожидать их окончания. Как только Future-объект прекратит работу, цикл будет выполнен. Asyncio также использует светофоры и блокировки.

async и await

async и await появились в Python 3.5 и используются для инициализаций Coroutine.


import asyncio

async def coroutine():
    await func()
				

Определив Coroutine таким образом, вы не можете использовать yield внутри функции. Вместо этого применяйте return либо await, использующиеся для возврата управления циклу событий. Запомните что await может использоваться только внутри async def функции. Наверное к этому моменту вы совсем запутались, поэтому давайте разберем все на примере.

Плохой пример асинхронной программы

Начнем с простого примера и загрузим несколько файлов из интернета асинхронно. Напишем несколько Coroutine, которые могут это сделать.


import asyncio
import os
from urllib.request import urlopen

async def coroutine_download(url):
    #Coroutine для загрузки отдельного URL
    request=urlopen(url)
    file_name=os.path.basename((url))

    with open(file_name, 'wb') as file:
        while True:
            chuck=request.read(1024)
            if not chuck:
                break
            file.write(chuck)
    message='{file_name} успешно скачен'.format(file_name=file_name)
    return message

async def main(urls):
    #Создаем группу Coroutine и ждем их завершения

    coroutines=[coroutine_download(url) for url in urls]
    finish, pending= await asyncio.wait(coroutines)
    for item in finish:
        print(item.result())

if __name__=='__main__':
    urls=["http://www.irs.gov/pub/irs-pdf/f1040.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"]
    event_loop=asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(main(urls))
    finally:
        event_loop.close()
        

Давайте разберемся что здесь происходит. В коде две функции Coroutine. Первая, coroutine_download использует Python библиотеку urlib для загрузки любого URL-адреса, передаваемого ей. Когда файлы будут загружены выводиться строка подтверждающая успешную загрузку.

Вторая функция, main, Coroutine - основная. Она принимает список URL-адресов и ставит их в очередь. Мы используем wait функцию asyncio для ожидания завершения выполнения функции coroutine_download.

Что бы фактически запустить наши Coroutine их нужно добавить в цикл событий. Сделано это в конце, где создается цикл событий, а затем вызывается метод run_until_complete и передается Coroutine main аргументом. Это запускает главную Coroutine-функцию, которая запускает вторую Coroutine-функцию. Это называется цепной Coroutine-функцией.

Почему это плохой пример? В действительности это не совсем асинхронное выполнение так, как urllib не является асинхронной, а функция download_coroutine не использует await или yield. Лучшим способом сделать это использование пакета aiohttp.

Хороший пример асинхронной программы.

Пакет aiohttp предназначен для асинхронной работы протокола HTTP. Устанавливаем его используя PIP.

pip install aiohttp

Теперь мы можем выполнить нашу задачу с использованием данного пакета.


import asyncio
import os
import async_timeout
import aiohttp

async def corutine_download(session,url):
    with async_timeout.timeout(10):
        print(1)
        async with session.get(url) as response:
            file_name=os.path.basename(url)
            with open(file_name,'wb') as file:
                while True:
                    chunk=await response.content.read(1024)
                    if not chunk:
                        break
                    file.write(chunk)
                print(file_name)
            return await response.release()

async def main(loop):
    urls=["http://www.irs.gov/pub/irs-pdf/f1040.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"]
    async with aiohttp.ClientSession(loop=loop) as session:
        tasks=[corutine_download(session,url) for url in urls]
        await asyncio.gather(*tasks)

if __name__=='__main__':
    loop=asyncio.get_event_loop()
    loop.run_until_complete(main(loop))

Мы добавили новые библиотеки aiohttp и async_timeout. Последняя из них необходима для корректной работы aiohttp и позволяет создавать диспетчер тайм-аута.

Начнем разбор кода с конца. Мы запускаем асинхронный цикл событий и вызываем главную функцию. В ней создается объект ClientSession, который передаем нашей функции загрузки для каждого URL-адреса. В функции corutine_download мы создаем async_timeout.timeout() диспетчер контекста, создающий таймер на X секунд. Когда таймер подходит к концу, менеджер контекста заканчивается или истекает. После мы вызываем метод get сессии, который возвращает ответ. В дальнейшем, когда мы используем атрибут ответа content, нам возвращается экземпляр aiohttp.SrteamReader, позволяющий нам загружать файлы частями любого размера. После сохранения файла на диск вызывается метод release() завершающий обработку ответа.

Согласно документации aiohttp созданный в диспетчере контекстов объект ответа неявно вызывает метод release(), но согласно философии Python его лучше вызвать явно.

Этот код тоже не идеален, поскольку запись файла на диск у нас не асинхронна. Для того, что бы это исправить можно воспользоваться библиотекой aiofiles.

Планирование вызовов

Возможно планирование вызовов регулярных функций используя цикл событий asyncio. Первый метод который мы рассмотрим: call_soon. Его работа похожа на очередь FIFO поэтому если функция в цикле требует время на выполнения, остальные будут отложены до ее завершения. Разберем пример:

	
import asyncio
import functools

def handler_event(loop,stop=False):
    print("Вызов функции")
    if stop:
        print('Остановка цикла')
        loop.stop()

if __name__=='__main__':
    loop=asyncio.get_event_loop()
    try:
        loop.call_soon(functools.partial(handler_event,loop))
        print('Начало цикла')
        loop.call_soon(functools.partial(handler_event,loop,stop=True))
        loop.run_forever()
    finally:
        print('Закрываем цикл')
        loop.close()
			

Большинству asyncio функций нельзя передать параметры, поэтому нужна библиотека functools, для того что бы передать параметры в обработчик событий. Регулярная функция будет выводить текст при каждом вызове. Для остановки цикла можно передать параметр stop=True. Если запустить этот код, вы увидите следующие:


Начало цикла
Вызов функции
Вызов функции
Остановка цикла
Закрываем цикл
		

Другой метод: call_soon_threadsafe. Работает он так же как и call_soon, но он потокобезопасен.

Для выполнения функции с задержкой следует использовать функцию call_later:


loop.call_later(1, handler_event, loop)
			

В данном случаи задержка составит 1 секунду. После чего цикл передаст его на выполнение в качестве первого параметра.

Имеется возможность запуска функции в определенное время с помощью функции call_at:


time_current=loop.time()
loop.call_at(time_current+600, handler_event, loop)
		

Задания

Задания являются подклассом Future и оберткой Coroutine. Они дают возможность отслеживать окончания обработки. Так как они являются подклассом Future, Сoroutine могут ожидать задания, либо получать результат их выполнения по окончанию их обработки. На примере это будет легче понять:


async def task(seconds):
    #задание выполняется в течение нескольких секунд
    print('Задание выполняется в течении {} секунд'.format(seconds))
    await asyncio.sleep(seconds)
    return 'задание завершено'
if __name__=='__main__':
    event_loop=asyncio.get_event_loop()
    try:
        print('Начало выполнения задания')
        obj_task=event_loop.create_task(task(seconds=5))
        event_loop.run_until_complete(obj_task)
    finally:
        event_loop.close()
    print('Результат выполнения задания: {}'.format(obj_task.result()))
	

В примере рассматривается асинхронная функция, которой передается в качестве аргумента количество секунд необходимое для ее выполнения. После реализуется цикл событий и объект задач, вызывающий функцию create_task объекта цикла. Аргументом create_task принимает функцию которую мы хотим превратить в задание. Указываем, что цикл событий выполняется до завершения задания. И в конце получаем результат его выполнения.

Заключение

В этой статье изложены основы использования библиотеки asyncio, которые помогут вам сделать первые шаги в создании асинхронных программ. Надеюсь она будет вам полезна в изучении программирования на языке Python. Для более подробной информации обращайтесь к официальной документации