1
0
Fork 0

Adding upstream version 0.6.3.

Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
Daniel Baumann 2025-02-05 10:27:41 +01:00
parent 4a6dd5e474
commit 341025cc97
Signed by: daniel
GPG key ID: FBB4F0E80A80222F
11 changed files with 1078 additions and 0 deletions

56
aioeapi/aio_portcheck.py Normal file
View file

@ -0,0 +1,56 @@
# -----------------------------------------------------------------------------
# System Imports
# -----------------------------------------------------------------------------
from typing import Optional
import socket
import asyncio
# -----------------------------------------------------------------------------
# Public Imports
# -----------------------------------------------------------------------------
from httpx import URL
# -----------------------------------------------------------------------------
# Exports
# -----------------------------------------------------------------------------
__all__ = ["port_check_url"]
# -----------------------------------------------------------------------------
#
# CODE BEGINS
#
# -----------------------------------------------------------------------------
async def port_check_url(url: URL, timeout: Optional[int] = 5) -> bool:
"""
This function attempts to open the port designated by the URL given the
timeout in seconds. If the port is avaialble then return True; False
otherwise.
Parameters
----------
url:
The URL that provides the target system
timeout: optional, default is 5 seonds
Time to await for the port to open in seconds
"""
port = url.port or socket.getservbyname(url.scheme)
try:
wr: asyncio.StreamWriter
_, wr = await asyncio.wait_for(
asyncio.open_connection(host=url.host, port=port), timeout=timeout
)
# MUST close if opened!
wr.close()
return True
except Exception: # noqa
return False