You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
700 lines
28 KiB
700 lines
28 KiB
"""The CheckExternalLinksBuilder class."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import contextlib
|
|
import json
|
|
import re
|
|
import socket
|
|
import time
|
|
import warnings
|
|
from html.parser import HTMLParser
|
|
from os import path
|
|
from queue import PriorityQueue, Queue
|
|
from threading import Thread
|
|
from typing import TYPE_CHECKING, NamedTuple, cast
|
|
from urllib.parse import unquote, urlparse, urlsplit, urlunparse
|
|
|
|
from docutils import nodes
|
|
from requests.exceptions import ConnectionError, HTTPError, SSLError, TooManyRedirects
|
|
from requests.exceptions import Timeout as RequestTimeout
|
|
|
|
from sphinx.builders.dummy import DummyBuilder
|
|
from sphinx.deprecation import RemovedInSphinx80Warning
|
|
from sphinx.locale import __
|
|
from sphinx.transforms.post_transforms import SphinxPostTransform
|
|
from sphinx.util import encode_uri, logging, requests
|
|
from sphinx.util.console import darkgray, darkgreen, purple, red, turquoise
|
|
from sphinx.util.http_date import rfc1123_to_epoch
|
|
from sphinx.util.nodes import get_node_line
|
|
|
|
if TYPE_CHECKING:
|
|
from collections.abc import Iterator
|
|
from typing import Any, Callable
|
|
|
|
from requests import Response
|
|
|
|
from sphinx.application import Sphinx
|
|
from sphinx.config import Config
|
|
from sphinx.util.typing import ExtensionMetadata
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
uri_re = re.compile('([a-z]+:)?//') # matches to foo:// and // (a protocol relative URL)
|
|
|
|
DEFAULT_REQUEST_HEADERS = {
|
|
'Accept': 'text/html,application/xhtml+xml;q=0.9,*/*;q=0.8',
|
|
}
|
|
CHECK_IMMEDIATELY = 0
|
|
QUEUE_POLL_SECS = 1
|
|
DEFAULT_DELAY = 60.0
|
|
|
|
|
|
class CheckExternalLinksBuilder(DummyBuilder):
|
|
"""
|
|
Checks for broken external links.
|
|
"""
|
|
|
|
name = 'linkcheck'
|
|
epilog = __('Look for any errors in the above output or in '
|
|
'%(outdir)s/output.txt')
|
|
|
|
def init(self) -> None:
|
|
self.broken_hyperlinks = 0
|
|
self.timed_out_hyperlinks = 0
|
|
self.hyperlinks: dict[str, Hyperlink] = {}
|
|
# set a timeout for non-responding servers
|
|
socket.setdefaulttimeout(5.0)
|
|
|
|
if not self.config.linkcheck_allow_unauthorized:
|
|
deprecation_msg = (
|
|
"The default value for 'linkcheck_allow_unauthorized' will change "
|
|
"from `True` in Sphinx 7.3+ to `False`, meaning that HTTP 401 "
|
|
"unauthorized responses will be reported as broken by default. "
|
|
"See https://github.com/sphinx-doc/sphinx/issues/11433 for details."
|
|
)
|
|
warnings.warn(deprecation_msg, RemovedInSphinx80Warning, stacklevel=1)
|
|
|
|
if self.config.linkcheck_report_timeouts_as_broken:
|
|
deprecation_msg = (
|
|
"The default value for 'linkcheck_report_timeouts_as_broken' will change "
|
|
'to False in Sphinx 8, meaning that request timeouts '
|
|
"will be reported with a new 'timeout' status, instead of as 'broken'. "
|
|
'This is intended to provide more detail as to the failure mode. '
|
|
'See https://github.com/sphinx-doc/sphinx/issues/11868 for details.'
|
|
)
|
|
warnings.warn(deprecation_msg, RemovedInSphinx80Warning, stacklevel=1)
|
|
|
|
def finish(self) -> None:
|
|
checker = HyperlinkAvailabilityChecker(self.config)
|
|
logger.info('')
|
|
|
|
output_text = path.join(self.outdir, 'output.txt')
|
|
output_json = path.join(self.outdir, 'output.json')
|
|
with open(output_text, 'w', encoding='utf-8') as self.txt_outfile, \
|
|
open(output_json, 'w', encoding='utf-8') as self.json_outfile:
|
|
for result in checker.check(self.hyperlinks):
|
|
self.process_result(result)
|
|
|
|
if self.broken_hyperlinks or self.timed_out_hyperlinks:
|
|
self.app.statuscode = 1
|
|
|
|
def process_result(self, result: CheckResult) -> None:
|
|
filename = self.env.doc2path(result.docname, False)
|
|
|
|
linkstat = {'filename': filename, 'lineno': result.lineno,
|
|
'status': result.status, 'code': result.code, 'uri': result.uri,
|
|
'info': result.message}
|
|
self.write_linkstat(linkstat)
|
|
|
|
if result.status == 'unchecked':
|
|
return
|
|
if result.status == 'working' and result.message == 'old':
|
|
return
|
|
if result.lineno:
|
|
logger.info('(%16s: line %4d) ', result.docname, result.lineno, nonl=True)
|
|
if result.status == 'ignored':
|
|
if result.message:
|
|
logger.info(darkgray('-ignored- ') + result.uri + ': ' + result.message)
|
|
else:
|
|
logger.info(darkgray('-ignored- ') + result.uri)
|
|
elif result.status == 'local':
|
|
logger.info(darkgray('-local- ') + result.uri)
|
|
self.write_entry('local', result.docname, filename, result.lineno, result.uri)
|
|
elif result.status == 'working':
|
|
logger.info(darkgreen('ok ') + result.uri + result.message)
|
|
elif result.status == 'timeout':
|
|
if self.app.quiet or self.app.warningiserror:
|
|
logger.warning('timeout ' + result.uri + result.message,
|
|
location=(result.docname, result.lineno))
|
|
else:
|
|
logger.info(red('timeout ') + result.uri + red(' - ' + result.message))
|
|
self.write_entry('timeout', result.docname, filename, result.lineno,
|
|
result.uri + ': ' + result.message)
|
|
self.timed_out_hyperlinks += 1
|
|
elif result.status == 'broken':
|
|
if self.app.quiet or self.app.warningiserror:
|
|
logger.warning(__('broken link: %s (%s)'), result.uri, result.message,
|
|
location=(result.docname, result.lineno))
|
|
else:
|
|
logger.info(red('broken ') + result.uri + red(' - ' + result.message))
|
|
self.write_entry('broken', result.docname, filename, result.lineno,
|
|
result.uri + ': ' + result.message)
|
|
self.broken_hyperlinks += 1
|
|
elif result.status == 'redirected':
|
|
try:
|
|
text, color = {
|
|
301: ('permanently', purple),
|
|
302: ('with Found', purple),
|
|
303: ('with See Other', purple),
|
|
307: ('temporarily', turquoise),
|
|
308: ('permanently', purple),
|
|
}[result.code]
|
|
except KeyError:
|
|
text, color = ('with unknown code', purple)
|
|
linkstat['text'] = text
|
|
if self.config.linkcheck_allowed_redirects:
|
|
logger.warning('redirect ' + result.uri + ' - ' + text + ' to ' +
|
|
result.message, location=(result.docname, result.lineno))
|
|
else:
|
|
logger.info(color('redirect ') + result.uri +
|
|
color(' - ' + text + ' to ' + result.message))
|
|
self.write_entry('redirected ' + text, result.docname, filename,
|
|
result.lineno, result.uri + ' to ' + result.message)
|
|
else:
|
|
raise ValueError('Unknown status %s.' % result.status)
|
|
|
|
def write_linkstat(self, data: dict) -> None:
|
|
self.json_outfile.write(json.dumps(data))
|
|
self.json_outfile.write('\n')
|
|
|
|
def write_entry(self, what: str, docname: str, filename: str, line: int,
|
|
uri: str) -> None:
|
|
self.txt_outfile.write(f'{filename}:{line}: [{what}] {uri}\n')
|
|
|
|
|
|
class HyperlinkCollector(SphinxPostTransform):
|
|
builders = ('linkcheck',)
|
|
default_priority = 800
|
|
|
|
def run(self, **kwargs: Any) -> None:
|
|
builder = cast(CheckExternalLinksBuilder, self.app.builder)
|
|
hyperlinks = builder.hyperlinks
|
|
docname = self.env.docname
|
|
|
|
# reference nodes
|
|
for refnode in self.document.findall(nodes.reference):
|
|
if 'refuri' in refnode:
|
|
uri = refnode['refuri']
|
|
_add_uri(self.app, uri, refnode, hyperlinks, docname)
|
|
|
|
# image nodes
|
|
for imgnode in self.document.findall(nodes.image):
|
|
uri = imgnode['candidates'].get('?')
|
|
if uri and '://' in uri:
|
|
_add_uri(self.app, uri, imgnode, hyperlinks, docname)
|
|
|
|
# raw nodes
|
|
for rawnode in self.document.findall(nodes.raw):
|
|
uri = rawnode.get('source')
|
|
if uri and '://' in uri:
|
|
_add_uri(self.app, uri, rawnode, hyperlinks, docname)
|
|
|
|
|
|
def _add_uri(app: Sphinx, uri: str, node: nodes.Element,
|
|
hyperlinks: dict[str, Hyperlink], docname: str) -> None:
|
|
if newuri := app.emit_firstresult('linkcheck-process-uri', uri):
|
|
uri = newuri
|
|
|
|
try:
|
|
lineno = get_node_line(node)
|
|
except ValueError:
|
|
lineno = -1
|
|
|
|
if uri not in hyperlinks:
|
|
hyperlinks[uri] = Hyperlink(uri, docname, app.env.doc2path(docname), lineno)
|
|
|
|
|
|
class Hyperlink(NamedTuple):
|
|
uri: str
|
|
docname: str
|
|
docpath: str
|
|
lineno: int
|
|
|
|
|
|
class HyperlinkAvailabilityChecker:
|
|
def __init__(self, config: Config) -> None:
|
|
self.config = config
|
|
self.rate_limits: dict[str, RateLimit] = {}
|
|
self.rqueue: Queue[CheckResult] = Queue()
|
|
self.workers: list[Thread] = []
|
|
self.wqueue: PriorityQueue[CheckRequest] = PriorityQueue()
|
|
self.num_workers: int = config.linkcheck_workers
|
|
|
|
self.to_ignore: list[re.Pattern[str]] = list(map(re.compile,
|
|
self.config.linkcheck_ignore))
|
|
|
|
def check(self, hyperlinks: dict[str, Hyperlink]) -> Iterator[CheckResult]:
|
|
self.invoke_threads()
|
|
|
|
total_links = 0
|
|
for hyperlink in hyperlinks.values():
|
|
if self.is_ignored_uri(hyperlink.uri):
|
|
yield CheckResult(hyperlink.uri, hyperlink.docname, hyperlink.lineno,
|
|
'ignored', '', 0)
|
|
else:
|
|
self.wqueue.put(CheckRequest(CHECK_IMMEDIATELY, hyperlink), False)
|
|
total_links += 1
|
|
|
|
done = 0
|
|
while done < total_links:
|
|
yield self.rqueue.get()
|
|
done += 1
|
|
|
|
self.shutdown_threads()
|
|
|
|
def invoke_threads(self) -> None:
|
|
for _i in range(self.num_workers):
|
|
thread = HyperlinkAvailabilityCheckWorker(self.config,
|
|
self.rqueue, self.wqueue,
|
|
self.rate_limits)
|
|
thread.start()
|
|
self.workers.append(thread)
|
|
|
|
def shutdown_threads(self) -> None:
|
|
self.wqueue.join()
|
|
for _worker in self.workers:
|
|
self.wqueue.put(CheckRequest(CHECK_IMMEDIATELY, None), False)
|
|
|
|
def is_ignored_uri(self, uri: str) -> bool:
|
|
return any(pat.match(uri) for pat in self.to_ignore)
|
|
|
|
|
|
class CheckRequest(NamedTuple):
|
|
next_check: float
|
|
hyperlink: Hyperlink | None
|
|
|
|
|
|
class CheckResult(NamedTuple):
|
|
uri: str
|
|
docname: str
|
|
lineno: int
|
|
status: str
|
|
message: str
|
|
code: int
|
|
|
|
|
|
class HyperlinkAvailabilityCheckWorker(Thread):
|
|
"""A worker class for checking the availability of hyperlinks."""
|
|
|
|
def __init__(self, config: Config,
|
|
rqueue: Queue[CheckResult],
|
|
wqueue: Queue[CheckRequest],
|
|
rate_limits: dict[str, RateLimit]) -> None:
|
|
self.rate_limits = rate_limits
|
|
self.rqueue = rqueue
|
|
self.wqueue = wqueue
|
|
|
|
self.anchors_ignore: list[re.Pattern[str]] = list(
|
|
map(re.compile, config.linkcheck_anchors_ignore))
|
|
self.anchors_ignore_for_url: list[re.Pattern[str]] = list(
|
|
map(re.compile, config.linkcheck_anchors_ignore_for_url))
|
|
self.documents_exclude: list[re.Pattern[str]] = list(
|
|
map(re.compile, config.linkcheck_exclude_documents))
|
|
self.auth = [(re.compile(pattern), auth_info) for pattern, auth_info
|
|
in config.linkcheck_auth]
|
|
|
|
self.timeout: int | float | None = config.linkcheck_timeout
|
|
self.request_headers: dict[str, dict[str, str]] = config.linkcheck_request_headers
|
|
self.check_anchors: bool = config.linkcheck_anchors
|
|
self.allowed_redirects: dict[re.Pattern[str], re.Pattern[str]]
|
|
self.allowed_redirects = config.linkcheck_allowed_redirects
|
|
self.retries: int = config.linkcheck_retries
|
|
self.rate_limit_timeout = config.linkcheck_rate_limit_timeout
|
|
self._allow_unauthorized = config.linkcheck_allow_unauthorized
|
|
if config.linkcheck_report_timeouts_as_broken:
|
|
self._timeout_status = 'broken'
|
|
else:
|
|
self._timeout_status = 'timeout'
|
|
|
|
self.user_agent = config.user_agent
|
|
self.tls_verify = config.tls_verify
|
|
self.tls_cacerts = config.tls_cacerts
|
|
|
|
self._session = requests._Session()
|
|
|
|
super().__init__(daemon=True)
|
|
|
|
def run(self) -> None:
|
|
while True:
|
|
next_check, hyperlink = self.wqueue.get()
|
|
if hyperlink is None:
|
|
# An empty hyperlink is a signal to shutdown the worker; cleanup resources here
|
|
self._session.close()
|
|
break
|
|
|
|
uri, docname, _docpath, lineno = hyperlink
|
|
if uri is None:
|
|
break
|
|
|
|
netloc = urlsplit(uri).netloc
|
|
with contextlib.suppress(KeyError):
|
|
# Refresh rate limit.
|
|
# When there are many links in the queue, workers are all stuck waiting
|
|
# for responses, but the builder keeps queuing. Links in the queue may
|
|
# have been queued before rate limits were discovered.
|
|
next_check = self.rate_limits[netloc].next_check
|
|
if next_check > time.time():
|
|
# Sleep before putting message back in the queue to avoid
|
|
# waking up other threads.
|
|
time.sleep(QUEUE_POLL_SECS)
|
|
self.wqueue.put(CheckRequest(next_check, hyperlink), False)
|
|
self.wqueue.task_done()
|
|
continue
|
|
status, info, code = self._check(docname, uri, hyperlink)
|
|
if status == 'rate-limited':
|
|
logger.info(darkgray('-rate limited- ') + uri + darkgray(' | sleeping...'))
|
|
else:
|
|
self.rqueue.put(CheckResult(uri, docname, lineno, status, info, code))
|
|
self.wqueue.task_done()
|
|
|
|
def _check(self, docname: str, uri: str, hyperlink: Hyperlink) -> tuple[str, str, int]:
|
|
# check for various conditions without bothering the network
|
|
|
|
for doc_matcher in self.documents_exclude:
|
|
if doc_matcher.match(docname):
|
|
info = (
|
|
f'{docname} matched {doc_matcher.pattern} from '
|
|
'linkcheck_exclude_documents'
|
|
)
|
|
return 'ignored', info, 0
|
|
|
|
if len(uri) == 0 or uri.startswith(('#', 'mailto:', 'tel:')):
|
|
return 'unchecked', '', 0
|
|
if not uri.startswith(('http:', 'https:')):
|
|
if uri_re.match(uri):
|
|
# Non-supported URI schemes (ex. ftp)
|
|
return 'unchecked', '', 0
|
|
|
|
src_dir = path.dirname(hyperlink.docpath)
|
|
if path.exists(path.join(src_dir, uri)):
|
|
return 'working', '', 0
|
|
return 'broken', '', 0
|
|
|
|
# need to actually check the URI
|
|
status, info, code = '', '', 0
|
|
for _ in range(self.retries):
|
|
status, info, code = self._check_uri(uri, hyperlink)
|
|
if status != 'broken':
|
|
break
|
|
|
|
return status, info, code
|
|
|
|
def _retrieval_methods(self,
|
|
check_anchors: bool,
|
|
anchor: str) -> Iterator[tuple[Callable, dict]]:
|
|
if not check_anchors or not anchor:
|
|
yield self._session.head, {'allow_redirects': True}
|
|
yield self._session.get, {'stream': True}
|
|
|
|
def _check_uri(self, uri: str, hyperlink: Hyperlink) -> tuple[str, str, int]:
|
|
req_url, delimiter, anchor = uri.partition('#')
|
|
if delimiter and anchor:
|
|
for rex in self.anchors_ignore:
|
|
if rex.match(anchor):
|
|
anchor = ''
|
|
break
|
|
else:
|
|
for rex in self.anchors_ignore_for_url:
|
|
if rex.match(req_url):
|
|
anchor = ''
|
|
break
|
|
|
|
# handle non-ASCII URIs
|
|
try:
|
|
req_url.encode('ascii')
|
|
except UnicodeError:
|
|
req_url = encode_uri(req_url)
|
|
|
|
# Get auth info, if any
|
|
for pattern, auth_info in self.auth: # NoQA: B007 (false positive)
|
|
if pattern.match(uri):
|
|
break
|
|
else:
|
|
auth_info = None
|
|
|
|
# update request headers for the URL
|
|
headers = _get_request_headers(uri, self.request_headers)
|
|
|
|
# Linkcheck HTTP request logic:
|
|
#
|
|
# - Attempt HTTP HEAD before HTTP GET unless page content is required.
|
|
# - Follow server-issued HTTP redirects.
|
|
# - Respect server-issued HTTP 429 back-offs.
|
|
error_message = ''
|
|
status_code = -1
|
|
response_url = retry_after = ''
|
|
for retrieval_method, kwargs in self._retrieval_methods(self.check_anchors, anchor):
|
|
try:
|
|
with retrieval_method(
|
|
url=req_url, auth=auth_info,
|
|
headers=headers,
|
|
timeout=self.timeout,
|
|
**kwargs,
|
|
_user_agent=self.user_agent,
|
|
_tls_info=(self.tls_verify, self.tls_cacerts),
|
|
) as response:
|
|
if (self.check_anchors and response.ok and anchor
|
|
and not contains_anchor(response, anchor)):
|
|
raise Exception(__(f'Anchor {anchor!r} not found'))
|
|
|
|
# Copy data we need from the (closed) response
|
|
status_code = response.status_code
|
|
redirect_status_code = response.history[-1].status_code if response.history else None # NoQA: E501
|
|
retry_after = response.headers.get('Retry-After')
|
|
response_url = f'{response.url}'
|
|
response.raise_for_status()
|
|
del response
|
|
break
|
|
|
|
except RequestTimeout as err:
|
|
return self._timeout_status, str(err), 0
|
|
|
|
except SSLError as err:
|
|
# SSL failure; report that the link is broken.
|
|
return 'broken', str(err), 0
|
|
|
|
except (ConnectionError, TooManyRedirects) as err:
|
|
# Servers drop the connection on HEAD requests, causing
|
|
# ConnectionError.
|
|
error_message = str(err)
|
|
continue
|
|
|
|
except HTTPError as err:
|
|
error_message = str(err)
|
|
|
|
# Unauthorized: the client did not provide required credentials
|
|
if status_code == 401:
|
|
if self._allow_unauthorized:
|
|
deprecation_msg = (
|
|
"\n---\n"
|
|
"The linkcheck builder encountered an HTTP 401 "
|
|
"(unauthorized) response, and will report it as "
|
|
"'working' in this version of Sphinx to maintain "
|
|
"backwards-compatibility."
|
|
"\n"
|
|
"This logic will change in Sphinx 8.0 which will "
|
|
"report the hyperlink as 'broken'."
|
|
"\n"
|
|
"To explicitly continue treating unauthorized "
|
|
"hyperlink responses as 'working', set the "
|
|
"'linkcheck_allow_unauthorized' config option to "
|
|
"``True``."
|
|
"\n"
|
|
"See https://github.com/sphinx-doc/sphinx/issues/11433 "
|
|
"for details."
|
|
"\n---"
|
|
)
|
|
warnings.warn(deprecation_msg, RemovedInSphinx80Warning, stacklevel=1)
|
|
status = 'working' if self._allow_unauthorized else 'broken'
|
|
return status, 'unauthorized', 0
|
|
|
|
# Rate limiting; back-off if allowed, or report failure otherwise
|
|
if status_code == 429:
|
|
if next_check := self.limit_rate(response_url, retry_after):
|
|
self.wqueue.put(CheckRequest(next_check, hyperlink), False)
|
|
return 'rate-limited', '', 0
|
|
return 'broken', error_message, 0
|
|
|
|
# Don't claim success/failure during server-side outages
|
|
if status_code == 503:
|
|
return 'ignored', 'service unavailable', 0
|
|
|
|
# For most HTTP failures, continue attempting alternate retrieval methods
|
|
continue
|
|
|
|
except Exception as err:
|
|
# Unhandled exception (intermittent or permanent); report that
|
|
# the link is broken.
|
|
return 'broken', str(err), 0
|
|
|
|
else:
|
|
# All available retrieval methods have been exhausted; report
|
|
# that the link is broken.
|
|
return 'broken', error_message, 0
|
|
|
|
# Success; clear rate limits for the origin
|
|
netloc = urlsplit(req_url).netloc
|
|
self.rate_limits.pop(netloc, None)
|
|
|
|
if ((response_url.rstrip('/') == req_url.rstrip('/'))
|
|
or _allowed_redirect(req_url, response_url,
|
|
self.allowed_redirects)):
|
|
return 'working', '', 0
|
|
elif redirect_status_code is not None:
|
|
return 'redirected', response_url, redirect_status_code
|
|
else:
|
|
return 'redirected', response_url, 0
|
|
|
|
def limit_rate(self, response_url: str, retry_after: str) -> float | None:
|
|
delay = DEFAULT_DELAY
|
|
next_check = None
|
|
if retry_after:
|
|
try:
|
|
# Integer: time to wait before next attempt.
|
|
delay = float(retry_after)
|
|
except ValueError:
|
|
try:
|
|
# An HTTP-date: time of next attempt.
|
|
next_check = rfc1123_to_epoch(retry_after)
|
|
except (ValueError, TypeError):
|
|
# TypeError: Invalid date format.
|
|
# ValueError: Invalid date, e.g. Oct 52th.
|
|
pass
|
|
else:
|
|
delay = next_check - time.time()
|
|
else:
|
|
next_check = time.time() + delay
|
|
netloc = urlsplit(response_url).netloc
|
|
if next_check is None:
|
|
max_delay = self.rate_limit_timeout
|
|
try:
|
|
rate_limit = self.rate_limits[netloc]
|
|
except KeyError:
|
|
delay = DEFAULT_DELAY
|
|
else:
|
|
last_wait_time = rate_limit.delay
|
|
delay = 2.0 * last_wait_time
|
|
if delay > max_delay > last_wait_time:
|
|
delay = max_delay
|
|
if delay > max_delay:
|
|
return None
|
|
next_check = time.time() + delay
|
|
self.rate_limits[netloc] = RateLimit(delay, next_check)
|
|
return next_check
|
|
|
|
|
|
def _get_request_headers(
|
|
uri: str,
|
|
request_headers: dict[str, dict[str, str]],
|
|
) -> dict[str, str]:
|
|
url = urlsplit(uri)
|
|
candidates = (f'{url.scheme}://{url.netloc}',
|
|
f'{url.scheme}://{url.netloc}/',
|
|
uri,
|
|
'*')
|
|
|
|
for u in candidates:
|
|
if u in request_headers:
|
|
return {**DEFAULT_REQUEST_HEADERS, **request_headers[u]}
|
|
return {}
|
|
|
|
|
|
def contains_anchor(response: Response, anchor: str) -> bool:
|
|
"""Determine if an anchor is contained within an HTTP response."""
|
|
parser = AnchorCheckParser(unquote(anchor))
|
|
# Read file in chunks. If we find a matching anchor, we break
|
|
# the loop early in hopes not to have to download the whole thing.
|
|
for chunk in response.iter_content(chunk_size=4096, decode_unicode=True):
|
|
if isinstance(chunk, bytes): # requests failed to decode
|
|
chunk = chunk.decode() # manually try to decode it
|
|
|
|
parser.feed(chunk)
|
|
if parser.found:
|
|
break
|
|
parser.close()
|
|
return parser.found
|
|
|
|
|
|
class AnchorCheckParser(HTMLParser):
|
|
"""Specialised HTML parser that looks for a specific anchor."""
|
|
|
|
def __init__(self, search_anchor: str) -> None:
|
|
super().__init__()
|
|
|
|
self.search_anchor = search_anchor
|
|
self.found = False
|
|
|
|
def handle_starttag(self, tag: Any, attrs: Any) -> None:
|
|
for key, value in attrs:
|
|
if key in ('id', 'name') and value == self.search_anchor:
|
|
self.found = True
|
|
break
|
|
|
|
|
|
def _allowed_redirect(url: str, new_url: str,
|
|
allowed_redirects: dict[re.Pattern[str], re.Pattern[str]]) -> bool:
|
|
return any(
|
|
from_url.match(url) and to_url.match(new_url)
|
|
for from_url, to_url
|
|
in allowed_redirects.items()
|
|
)
|
|
|
|
|
|
class RateLimit(NamedTuple):
|
|
delay: float
|
|
next_check: float
|
|
|
|
|
|
def rewrite_github_anchor(app: Sphinx, uri: str) -> str | None:
|
|
"""Rewrite anchor name of the hyperlink to github.com
|
|
|
|
The hyperlink anchors in github.com are dynamically generated. This rewrites
|
|
them before checking and makes them comparable.
|
|
"""
|
|
parsed = urlparse(uri)
|
|
if parsed.hostname == 'github.com' and parsed.fragment:
|
|
prefixed = parsed.fragment.startswith('user-content-')
|
|
if not prefixed:
|
|
fragment = f'user-content-{parsed.fragment}'
|
|
return urlunparse(parsed._replace(fragment=fragment))
|
|
return None
|
|
|
|
|
|
def compile_linkcheck_allowed_redirects(app: Sphinx, config: Config) -> None:
|
|
"""Compile patterns in linkcheck_allowed_redirects to the regexp objects."""
|
|
for url, pattern in list(app.config.linkcheck_allowed_redirects.items()):
|
|
try:
|
|
app.config.linkcheck_allowed_redirects[re.compile(url)] = re.compile(pattern)
|
|
except re.error as exc:
|
|
logger.warning(__('Failed to compile regex in linkcheck_allowed_redirects: %r %s'),
|
|
exc.pattern, exc.msg)
|
|
finally:
|
|
# Remove the original regexp-string
|
|
app.config.linkcheck_allowed_redirects.pop(url)
|
|
|
|
|
|
def setup(app: Sphinx) -> ExtensionMetadata:
|
|
app.add_builder(CheckExternalLinksBuilder)
|
|
app.add_post_transform(HyperlinkCollector)
|
|
|
|
app.add_config_value('linkcheck_ignore', [], '')
|
|
app.add_config_value('linkcheck_exclude_documents', [], '')
|
|
app.add_config_value('linkcheck_allowed_redirects', {}, '')
|
|
app.add_config_value('linkcheck_auth', [], '')
|
|
app.add_config_value('linkcheck_request_headers', {}, '')
|
|
app.add_config_value('linkcheck_retries', 1, '')
|
|
app.add_config_value('linkcheck_timeout', 30, '', (int, float))
|
|
app.add_config_value('linkcheck_workers', 5, '')
|
|
app.add_config_value('linkcheck_anchors', True, '')
|
|
# Anchors starting with ! are ignored since they are
|
|
# commonly used for dynamic pages
|
|
app.add_config_value('linkcheck_anchors_ignore', ['^!'], '')
|
|
app.add_config_value('linkcheck_anchors_ignore_for_url', (), '', (tuple, list))
|
|
app.add_config_value('linkcheck_rate_limit_timeout', 300.0, '')
|
|
app.add_config_value('linkcheck_allow_unauthorized', True, '')
|
|
app.add_config_value('linkcheck_report_timeouts_as_broken', True, '', bool)
|
|
|
|
app.add_event('linkcheck-process-uri')
|
|
|
|
app.connect('config-inited', compile_linkcheck_allowed_redirects, priority=800)
|
|
|
|
# FIXME: Disable URL rewrite handler for github.com temporarily.
|
|
# ref: https://github.com/sphinx-doc/sphinx/issues/9435
|
|
# app.connect('linkcheck-process-uri', rewrite_github_anchor)
|
|
|
|
return {
|
|
'version': 'builtin',
|
|
'parallel_read_safe': True,
|
|
'parallel_write_safe': True,
|
|
}
|
|
|