You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
29 lines
1.2 KiB
29 lines
1.2 KiB
import time
|
|
import requests
|
|
import socket
|
|
|
|
|
|
|
|
def wait_for_server(url, timeout=10, retry_interval=0.5):
|
|
"""
|
|
Waits for a web server to become available by polling its URL.
|
|
"""
|
|
|
|
start_time = time.monotonic()
|
|
while time.monotonic() - start_time < timeout:
|
|
try:
|
|
# First, try a simple TCP connection to check if the port is open
|
|
hostname, port = url.split("//")[1].split(":")
|
|
port = int(port)
|
|
with socket.create_connection((hostname, port), timeout=retry_interval):
|
|
pass # If the connection succeeds, continue to the HTTP check
|
|
|
|
# Then, make an HTTP request to ensure the server is responding correctly
|
|
response = requests.get(url, timeout=retry_interval)
|
|
response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx)
|
|
return # Server is up and responding correctly
|
|
except (requests.exceptions.RequestException, socket.error) as e:
|
|
print(f"Server not yet available: {e}. Retrying in {retry_interval} seconds...")
|
|
time.sleep(retry_interval)
|
|
|
|
raise TimeoutError(f"Server at {url} did not become available within {timeout} seconds.")
|
|
|