D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
wmt
/
Filename :
main.py
back
Copy
#!/opt/cloudlinux/venv/bin/python3 -bb import asyncio import ipaddress import os import sys import aiohttp import signal import time import concurrent.futures from datetime import datetime, timedelta from typing import List, Optional from dataclasses import dataclass from sqlalchemy import or_ from wmt.db import ( ScrapeResult, DomainAlerts, setup_database, session_scope, cleanup_old_data ) from wmt.common.utils import get_domains, setup_logger, save_pid_and_lock, intersect from clsentry import init_sentry_client from clsentry.utils import get_pkg_version from wmt.common.const import ( PING_TIMEOUT_STATUS_CODE, SENTRY_DNS, ERROR_DOMAINS_PING_RETRY_INTERVAL, ERROR_DOMAINS_ALERT_INTERVAL, WMT_LOCK_FILE, PING_CONNECTIONS, LICENSE_EXPIRED_FAREWELL_LETTER_MARKER, LICENSE_CHECK_PAUSE ) from wmt.common.notification import Notifier, SupportedNotificationTypes from wmt.common.report import ErrorReport from wmt.common import cfg from cllicense import CloudlinuxLicenseLib logger = setup_logger('wmt_scanner') # SSRF filter: non-routable address ranges the scanner must never connect to. _BLOCKED_IP_PROPERTIES = frozenset(['is_loopback', 'is_private', 'is_link_local', 'is_reserved']) # RFC 6598 CGNAT / shared address space. Python 3.11's IPv4Address.is_private # returns False for this range (fixed upstream only in 3.12.4), so check it explicitly. _CGNAT_NETWORK = ipaddress.ip_network('100.64.0.0/10') def _is_blocked_ip(host) -> bool: """ True if `host` is a literal IP in a non-routable / blocked range. Hostnames return False -- they are filtered after resolution by _SafeResolver. """ try: ip = ipaddress.ip_address(host) except ValueError: return False # Normalise IPv4-mapped IPv6 (e.g. ::ffff:100.64.1.1) to its embedded IPv4 so # the IPv4-only range checks below apply -- closes the IPv4-mapped CGNAT bypass. mapped = getattr(ip, 'ipv4_mapped', None) if mapped is not None: ip = mapped if any(getattr(ip, prop) for prop in _BLOCKED_IP_PROPERTIES): return True if ip.version == 4 and ip in _CGNAT_NETWORK: return True return False class _SafeResolver(aiohttp.abc.AbstractResolver): """ Wraps aiohttp's ThreadedResolver and drops any resolved address that falls in a blocked range, closing the DNS-rebinding-to-internal-host vector. """ def __init__(self, *args, **kwargs): self._resolver = aiohttp.ThreadedResolver(*args, **kwargs) async def resolve(self, host, port=0, family=None): if family is None: hosts = await self._resolver.resolve(host, port) else: hosts = await self._resolver.resolve(host, port, family) safe = [h for h in hosts if not _is_blocked_ip(h['host'])] if not safe: raise OSError(f'No routable address for {host}') return safe async def close(self): await self._resolver.close() class _SafeTCPConnector(aiohttp.TCPConnector): """ Rejects IP-literal hosts in a blocked range before connecting. aiohttp's stock _resolve_host short-circuits IP literals and skips the resolver, which would otherwise leave redirect targets like http://169.254.169.254/ unfiltered. """ async def _resolve_host(self, host, port, traces=None): if _is_blocked_ip(host): raise OSError(f'Blocked non-routable address: {host}') return await super()._resolve_host(host, port, traces=traces) def reload_conf(sig_number, frame): cfg.reload() config_log = {k: v for k, v in cfg.to_dict().items() if k != 'report_email'} logger.info('Reloading config: %s', str(config_log)) def shutdown(sig_number, frame): """ Shutdown to call finally block to close all fds, remove lock and file see: save_pid_and_lock() """ sys.exit(0) @dataclass class ScrapeCoroResult: url: str response_code: Optional[int] = None response_time_ms: Optional[int] = None original_domain: Optional[str] = None def get_connection_limit() -> int: value = getattr(cfg.cfg, 'ping_connections', PING_CONNECTIONS) try: value = int(value) except (TypeError, ValueError): value = PING_CONNECTIONS return max(1, value) async def ping(url, ping_timeout, session, semaphore): """ Main 'pinger' 1. Requests domains - if domain responded - keep status code - if no response for timeout - keep Timeout status code - if unreachable (ConnectionError or so) - keep 523 status code (same logic as go implementation) """ async with semaphore: async def _fetch(target_url): start = time.time() try: async with session.get(target_url, timeout=ping_timeout) as resp: # Use the final URL after redirects to preserve actual scheme (http/https) final_url = str(resp.url) if hasattr(resp, 'url') else target_url return ScrapeCoroResult( final_url, response_code=resp.status, response_time_ms=int(1000 * (time.time() - start)) ) except concurrent.futures.TimeoutError: return ScrapeCoroResult(target_url, response_code=PING_TIMEOUT_STATUS_CODE, response_time_ms=ping_timeout * 1000) except aiohttp.client_exceptions.ClientError: # 523 is code for unreachable resource # same logic as in go implementation return ScrapeCoroResult(target_url, response_code=523) # Try HTTPS first, then fallback to HTTP if url.startswith('https://'): https_url = url http_url = 'http://' + url[len('https://'):] elif url.startswith('http://'): https_url = 'https://' + url[len('http://'):] http_url = url else: https_url = f'https://{url}' http_url = f'http://{url}' https_result = await _fetch(https_url) https_result.original_domain = url if https_result.response_code == 200: return https_result http_result = await _fetch(http_url) http_result.original_domain = url return http_result def executors(ping_timeout, session, semaphore, ping_target_domains=None): """ ping_timeout: specified in config timeout time (s) for request semaphore: semaphore obj to handle asyncio tasks ping_target_domains: mostly needed for re-pinging error domains """ domains = get_domains() if ping_target_domains is not None: domains &= set(ping_target_domains) logger.debug('Those domains will be pinged: %s', str(domains)) for domain in domains: if not cfg.is_domain_ignored(domain): yield ping(domain, ping_timeout, session, semaphore) async def scrape_sites(ping_site_timeout, ping_interval, session, semaphore, ping_target_domains=None) -> List[ScrapeCoroResult]: coroutines = list(executors(ping_site_timeout, session, semaphore, ping_target_domains)) if len(coroutines) == 0: return [] tasks = [asyncio.create_task(coro) for coro in coroutines] try: return await asyncio.wait_for(asyncio.gather(*tasks), timeout=ping_interval) except asyncio.TimeoutError: results: List[ScrapeCoroResult] = [] for task in tasks: if not task.done(): task.cancel() for task in tasks: try: results.append(await task) except asyncio.CancelledError: continue return results def manage_ping_results(engine, pinged, ping_target_domains=None): """ - obtains all scrape coro results from asyncio tasks - saves ping results to ScrapeResult table - updates 'is_resolved' field in DomainAlerts table, in case error domain`s status code was changed to 200 - returns domains with non-200 status code code """ finished_domains = set() errors_domains = {} resolved = [] with session_scope(engine) as session: for result in pinged: session.add(ScrapeResult( website=result.url, is_finished=True, response_code=result.response_code, response_time_ms=result.response_time_ms )) finished_domains.add(result.original_domain or result.url) if result.response_code != 200: errors_domains[result.url] = result.response_code else: resolved.append(result.url) # mark no-more failing domains as resolved session.query(DomainAlerts) \ .filter(DomainAlerts.website.in_(resolved)) \ .update(dict(is_resolved=True), synchronize_session=False) domains = get_domains() if ping_target_domains is not None: domains &= set(ping_target_domains) unfinished_domains = set(domains) - finished_domains with session_scope(engine) as session: for unfinished in unfinished_domains: session.add(ScrapeResult( website=unfinished, is_finished=False )) return errors_domains def get_recent_alerts(engine, alert_domains): """ - gets websites that must NOT be included in alert email: less than ERROR_DOMAINS_ALERT_INTERVAL passed or is_resolved marker was not changed from last alerting """ repeat_interval = datetime.now() - timedelta(hours=ERROR_DOMAINS_ALERT_INTERVAL) with session_scope(engine) as session: recently_alerted = session.query(DomainAlerts.website) \ .filter(DomainAlerts.website.in_(list(alert_domains.keys())), or_(DomainAlerts.alert_time > repeat_interval, DomainAlerts.is_resolved == False)) return [row.website for row in recently_alerted] def alert(domains_data): """ prepares needed error report object with error domains to be alerted and sends this mail returns alerted domains """ logger.info('Alerts will be sent for %s', str(list(domains_data.keys()))) error_report = [ ErrorReport( url=domain, code=', '.join(map(str, set(codes))), count_errors=len(codes) ) for domain, codes in domains_data.items() ] Notifier( target_email=cfg.target_email, from_email=cfg.from_email, report={ 'error_report': error_report }, notification_type=SupportedNotificationTypes.ALERT).notify() return domains_data def flush_alerts(engine, alert_domains): """ - gets recently alerted domains (those that must not be alerted again) and does not include them for alerting - calls alerting for left domains - updates DomainAlerts table: if website was not alerted -> adds new record if website was alerted before -> updates alert time and is_resolved marker """ recently_alerted = get_recent_alerts(engine, alert_domains) domains_to_alert = {k: v for k, v in alert_domains.items() if k not in recently_alerted} if not domains_to_alert: logger.info('All domains "%s" were alerted or still not resolved in last %d hours', str(list(alert_domains.keys())), ERROR_DOMAINS_ALERT_INTERVAL) return alert(domains_to_alert) now = datetime.now() with session_scope(engine) as session: websites = session.query(DomainAlerts) \ .with_entities(DomainAlerts.website) \ .all() urls = [row.website for row in websites] for domain in domains_to_alert: if domain in urls: session.query(DomainAlerts) \ .filter(DomainAlerts.website == domain) \ .update(dict(alert_time=now, is_resolved=False)) else: session.add(DomainAlerts(website=domain, alert_time=now)) def should_be_repinged(error_domains): if error_domains and cfg.cfg.alert_notifications_enabled: return True return False def cleanup_farewell_letter_marker(): if os.path.exists(LICENSE_EXPIRED_FAREWELL_LETTER_MARKER): logger.info('CloudLinux license was updated') os.remove(LICENSE_EXPIRED_FAREWELL_LETTER_MARKER) def manage_license_farewell(): """ Sends farewell letter once (if it was not sent before) """ try: if not os.path.exists(LICENSE_EXPIRED_FAREWELL_LETTER_MARKER): logger.warning('Going to send last email about expired license!') Notifier( target_email=cfg.target_email, from_email=cfg.from_email, report={}, notification_type=SupportedNotificationTypes.FAREWELL).notify() except Exception: logger.exception('Error while managing farewell letter') async def scrape_iteration(previously_errored, engine): """ Scanner logic: 1. Scrapes domains and obtains ping results; 2. Manage ping results (e.g: saving to DB) 3. In case error domains found -> start re-pinging Re-pinging: - in min(ping_interval, 5 mins) - flush alerts if needed 4. Sleep for ping_interval until next ping iteration """ start = time.time() # ping interval parameter stored in minutes in config ping_interval_seconds = cfg.cfg.ping_interval * 60 try: cleanup_farewell_letter_marker() cleanup_old_data(engine) connections_limit = get_connection_limit() if getattr(cfg.cfg, 'allow_private_targets', False): connector = aiohttp.TCPConnector(limit=connections_limit) else: connector = _SafeTCPConnector(limit=connections_limit, resolver=_SafeResolver()) async with aiohttp.ClientSession(connector=connector) as session: semaphore = asyncio.Semaphore(connections_limit) ping_result = await scrape_sites( cfg.cfg.ping_timeout, ping_interval_seconds, session, semaphore ) error_domains = manage_ping_results(engine, ping_result) # re-ping if should_be_repinged(error_domains): logger.info('Those domains are unsuccessful: %s \n Try to re-ping them', str(error_domains)) # let`s re-ping in ERROR_DOMAINS_PING_RETRY_INTERVAL time while True: ping_interval_seconds = cfg.cfg.ping_interval * 60 if ping_interval_seconds > ERROR_DOMAINS_PING_RETRY_INTERVAL: elapsed_for_ping = time.time() - start await asyncio.sleep( min(max(ERROR_DOMAINS_PING_RETRY_INTERVAL - elapsed_for_ping, 0), 10) ) if elapsed_for_ping > ERROR_DOMAINS_PING_RETRY_INTERVAL: # re-ping during current ping iteration ping_retry_result = await scrape_sites( cfg.cfg.ping_timeout, ping_interval_seconds, session, semaphore, ping_target_domains=error_domains.keys() ) retry_errors = manage_ping_results( engine, ping_retry_result, error_domains.keys() ) alert_domains = intersect(error_domains, retry_errors) break else: # error domains will be re-pinged together with other domains alert_domains = intersect(previously_errored, error_domains) previously_errored = error_domains break if alert_domains: logger.info('Domains with unsuccessful status code found: "%s"', str(list(alert_domains.keys()))) flush_alerts(engine, alert_domains) else: # clean up, no error domains during current iteration previously_errored = [] except Exception: logger.exception('Error during ping iteration!') finally: while True: ping_interval_seconds = cfg.cfg.ping_interval * 60 # it can be modified with reload elapsed = time.time() - start sleep_time = min(max(ping_interval_seconds - elapsed, 0), 10) # "10" to check "reload" await asyncio.sleep(sleep_time) if elapsed > ping_interval_seconds: break return previously_errored async def scrape_loop(): """ Main loop for wmt_scanner_solo service each 'while: True' iteration returns errored domains (domains that responded with non-200 status code) """ engine = setup_database() previously_errored = [] license_attempt = 0 while True: if CloudlinuxLicenseLib().get_license_status(): license_attempt = 0 previously_errored = await scrape_iteration(previously_errored, engine) else: license_attempt += 1 if license_attempt == 1: logger.warning('Seems your CloudLinux license is expired!') # let`s do several attempts to be really # sure it is not false-positives from CLN or similar if license_attempt >= 5: manage_license_farewell() license_attempt = 0 ping_interval_seconds = cfg.cfg.ping_interval * 60 sleep_time = min(ping_interval_seconds, LICENSE_CHECK_PAUSE) await asyncio.sleep(sleep_time) if __name__ == '__main__': pid = str(os.getpid()) logger.info("PID: %s", pid) with save_pid_and_lock(WMT_LOCK_FILE, pid): init_sentry_client('web-monitoring-tool', get_pkg_version('cl-web-monitoring-tool'), SENTRY_DNS) signal.signal(signal.SIGUSR1, reload_conf) signal.signal(signal.SIGTERM, shutdown) loop = asyncio.get_event_loop() loop.run_until_complete(scrape_loop())