WebSockets client

Перевод официальной документации aiohttp WebSockets client

Для Python3.5+

Клиент вебсокетов

aiohttp работает с вебсокетами с коробки.
Клиент должен использовать метод aiohttp.ClientSession.ws_connect() для установления вебсокет-соединения. Он принимает в качестве первого обязательного параметра урл сервера и возвращает ClientWebSocketResponse объект с помощью которого Вы можете общаться с сервером используя методы этого объекта:
session = aiohttp.ClientSession()
async with session.ws_connect('http://example.org/websocket') as ws:

    async for msg in ws:
        if msg.tp == aiohttp.MsgType.text:
            if msg.data == 'close cmd':
                await ws.close()
                break
            else:
                ws.send_str(msg.data + '/answer')
        elif msg.tp == aiohttp.MsgType.closed:
            break
        elif msg.tp == aiohttp.MsgType.error:
            break

Вы также можете установить вебсокет-соединения без инстанса ClientSession с помощью aiohttp.ws_connect():
async with aiohttp.ws_connect('http://example.org/websocket') as ws:
    …

Как для чтения (например await ws.receive() или async for msg in ws:) так и для записи Вы должны использовать только одну задачу, но асинхронная отправка данных (например ws.send_str('data')) может быть в нескольких задачах

ws_connect

Для подключения к вебсокет серверу нужно использовать aiohttp.ws_connect () или aiohttp.ClientSession.ws_connect().
coroutine aiohttp.ws_connect(
    url, *, protocols=(), timeout=10.0, connector=None,
    auth=None, ws_response_class=ClientWebSocketResponse,
    autoclose=True, autoping=True, loop=None, origin=None)

Эта функция создает вебсокет-соединение, проверяет ответ и возвращает объект ClientWebSocketResponse. В случае ошибки будет вызвано WSServerHandshakeError исключение.
Параметры:
  • url (str) – урл вебсокет-сервера
  • protocols (tuple) – протоколы вебсокета
  • timeout (float) – таймаут для чтения вебсокета. По-умолчанию 10 секунд
  • connector (obj) – коннектор - объект TCPConnector
  • ws_response_class – класс реализации получения ответа. По-умолчанию ClientWebSocketResponse
  • autoclose (bool) – автоматически закрыть вебсокет соединение при получении сообщения о закрытии. Если установлено в False, то нужно обрабатывать вручную. По-умолчанию True
import asyncio
import aiohttp

async def test(autoclose):
    ws = await aiohttp.ws_connect('ws://127.0.0.1:8888', autoclose=autoclose)
    ws.send_str('close me') # отправляем серверу 'close me'
    # на сервере прописана такая логика
    # tornado websocket server
    # if message == 'close me':
    #     self.close()
    r = await ws.receive() # ждем ответа
    print(autoclose, ws.closed)
    if not ws.closed and r.tp == 8: # если соединение не закрыто 
        #и тип сообщение равен 8 (8 говорит о MSG_CLOSE)
        print("autoclose = False")
        await ws.close()

loop = asyncio.get_event_loop()
tasks = [
    test(True),
    test(False)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

  • autoping (bool) – автоматически отправлять понг на пинг от сервера
  • auth (aiohttp.helpers.BasicAuth) – BasicAuth кортеж с именованными полями который представляет HTTP Basic Authorization
  • loop – событийный цикл используемый для обработки HTTP запросов. Если loop=None, то будет вызвано asyncio.get_event_loop() для получения событийного цикла по-умолчанию, но мы настоятельно рекомендуем явно указывать событийный цикл.
  • origin (str) – Заголовки происхождения (источника) которые будут переданы на сервер

ClientWebSocketResponse

class aiohttp.ClientWebSocketResponse
Класс для обработки вебсокетов на стороне клиента
  • closed — свойство только для чтения. True если закрыто соединение (было вызвано close), иначе — False. Смотреть предыдущий пример
  • protocol — субпротоколы вебсокет-соединения выбранные после вызова start(). None если сервер и клиент не используют одинаковые подпротоколы
  • exception() - возвращает исключение если оно было при соединении, иначе None
Давайте рассмотрим пример где на стороне сервера произошла какая-то ошибка, например
. . . # tornado
    self.write_message(1/0)
. . .

при этом соединение не будет разорвано и клиент будет ожидать ответа далее, поэтому  чтобы отлавливать ошибки и ghfdbkmyj обрабатывать их, (например, aiohttp.errors.ServerDisconnectedError) можно делать так:
async def test():
    session = aiohttp.ClientSession()
    async with session.ws_connect('ws://127.0.0.1:8888') as ws:
        ws.send_str('hello')
        async for msg in ws:
            raise ws.exception()


  • ping(message=b'') — отправить пинг (MSG_PING). message — опциональный параметр для указания полезной нагрузки пинг-сообщения. Можно передавать str (будет побайтно закодирован в UTF-8) или bytes.
  • send_str(data) — отправить информацию как MSG_TEXT сообщение. Информацию нужно передавать в метод первым неименованным параметром строкой (тип str). Если будет передана не строка будет вызвано исключение TypeError.
  • send_bytes(data) -  отправить информацию как MSG_BYTE сообщение. Информацию нужно передавать в метод первым неименованным параметром с типом bytes, bytearray или memoryview. Если будет передан объект иного типа будет вызвано исключение TypeError.
  • coroutine close(*, code=1000, message=b'') — coroutine что инициирует закрытие соединения отправит  MSG_CLOSE сообщение и ждет ответа от сервера.
Рассмотрим пример где мы подключаемся к серверу и он, после первого сообщения уходит в бесконечный цикл
# tornado
    def on_message(self, message):
            while True:
                self.write_message(':-||')

так как метод close() ждет ответа от сервера, то мы никогда не закроем соединение. Для того чтобы избежать такой ситуации мы можем обернуть close() в asyncio.wait() или asyncio.wait_for():
import asyncio
import aiohttp
@asyncio.coroutine
def test():
    session = aiohttp.ClientSession()
    ws = yield from (session.ws_connect('ws://127.0.0.1:8888'))
    # подключимся к серверу который после первого сообщения уходит в бесконечный цикл
    # так как метод close() ждет подтверждения, то мы никога не закроем соединение
    # 
    ws.send_bytes(bytes([0]))
    try:
        yield from asyncio.wait_for(ws.close(), 2) # ждем две секунды
    except asyncio.TimeoutError:
        print("Timeout")
    finally:
        session.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(test())
loop.close()

Метод поддерживает два необязательных параметра:
  1. code (int) — код закрытия соединения
  1. message() - указания полезной нагрузки понг-сообщения. Можно передавать str (будет побайтно закодирован в UTF-8) или bytes.
  • coroutine receive() - ждет входящего сообщения и возвращает объект типа Message. Неявно обрабатывает MSG_PING, MSG_PONG и MSG_CLOSE ничего не возвращая. Это пинг-понг процесс что выполняет закрытие соединения.

Комментариев нет:

Отправить комментарий