git: 20d2e961ee73 - main - devel/py-proxmoxer: Fix tests

From: Nuno Teixeira <eduardo_at_FreeBSD.org>
Date: Tue, 24 Dec 2024 08:49:25 UTC
The branch main has been updated by eduardo:

URL: https://cgit.FreeBSD.org/ports/commit/?id=20d2e961ee73f108e45838473147bcccea0c5a7b

commit 20d2e961ee73f108e45838473147bcccea0c5a7b
Author:     Nuno Teixeira <eduardo@FreeBSD.org>
AuthorDate: 2024-12-24 08:47:09 +0000
Commit:     Nuno Teixeira <eduardo@FreeBSD.org>
CommitDate: 2024-12-24 08:47:09 +0000

    devel/py-proxmoxer: Fix tests
    
    Sync PYPI tests with GH released version until fixed upstream
    
    See also:       https://github.com/proxmoxer/proxmoxer/issues/195
    PR:             283360
---
 devel/py-proxmoxer/Makefile                |    6 +-
 devel/py-proxmoxer/files/extra-patch-tests | 1117 ++++++++++++++++++++++++++++
 2 files changed, 1122 insertions(+), 1 deletion(-)

diff --git a/devel/py-proxmoxer/Makefile b/devel/py-proxmoxer/Makefile
index 46b164ef0e90..38863903dec8 100644
--- a/devel/py-proxmoxer/Makefile
+++ b/devel/py-proxmoxer/Makefile
@@ -17,12 +17,16 @@ RUN_DEPENDS=	${PYTHON_PKGNAMEPREFIX}requests>=2.0.0:www/py-requests@${PY_FLAVOR}
 TEST_DEPENDS=	${PYTHON_PKGNAMEPREFIX}coveralls>0:devel/py-coveralls@${PY_FLAVOR} \
 		${PYTHON_PKGNAMEPREFIX}openssh-wrapper>0:security/py-openssh-wrapper@${PY_FLAVOR} \
 		${PYTHON_PKGNAMEPREFIX}paramiko>0:security/py-paramiko@${PY_FLAVOR} \
+		${PYTHON_PKGNAMEPREFIX}requests-toolbelt>0:www/py-requests-toolbelt@${PY_FLAVOR} \
 		${PYTHON_PKGNAMEPREFIX}responses>0:devel/py-responses@${PY_FLAVOR}
 
 USES=		python
 USE_PYTHON=	autoplist pep517 pytest
+BINARY_ALIAS=	python3=${PYTHON_CMD}
 
-TESTING_UNSAFE=	https://github.com/proxmoxer/proxmoxer/issues/195
+# Sync PYPI tests with GH released version
+# See also: https://github.com/proxmoxer/proxmoxer/issues/195
+EXTRA_PATCHES=	${FILESDIR}/extra-patch-tests
 
 NO_ARCH=	yes
 
diff --git a/devel/py-proxmoxer/files/extra-patch-tests b/devel/py-proxmoxer/files/extra-patch-tests
new file mode 100644
index 000000000000..e2a5bbf58fb1
--- /dev/null
+++ b/devel/py-proxmoxer/files/extra-patch-tests
@@ -0,0 +1,1117 @@
+Sync PYPI tests with GH released version
+
+diff -ruN tests/__init__.py proxmoxer-2.2.0/tests/__init__.py
+--- tests/__init__.py	1970-01-01 01:00:00.000000000 +0100
++++ proxmoxer-2.2.0/tests/__init__.py	2024-12-15 02:12:42.000000000 +0000
+@@ -0,0 +1,3 @@
++__author__ = "John Hollowell"
++__copyright__ = "(c) John Hollowell 2022"
++__license__ = "MIT"
+diff -ruN tests/api_mock.py proxmoxer-2.2.0/tests/api_mock.py
+--- tests/api_mock.py	1970-01-01 01:00:00.000000000 +0100
++++ proxmoxer-2.2.0/tests/api_mock.py	2024-12-15 02:12:42.000000000 +0000
+@@ -0,0 +1,360 @@
++__author__ = "John Hollowell"
++__copyright__ = "(c) John Hollowell 2022"
++__license__ = "MIT"
++
++import json
++import re
++from urllib.parse import parse_qsl, urlparse
++
++import pytest
++import responses
++from requests_toolbelt import MultipartEncoder
++
++
++@pytest.fixture()
++def mock_pve():
++    with responses.RequestsMock(registry=PVERegistry, assert_all_requests_are_fired=False) as rsps:
++        yield rsps
++
++
++class PVERegistry(responses.registries.FirstMatchRegistry):
++    base_url = "https://1.2.3.4:1234/api2/json"
++
++    common_headers = {
++        "Cache-Control": "max-age=0",
++        "Connection": "close, Keep-Alive",
++        "Pragma": "no-cache",
++        "Server": "pve-api-daemon/3.0",
++        "Content-Type": "application/json;charset=UTF-8",
++    }
++
++    def __init__(self):
++        super().__init__()
++        for resp in self._generate_static_responses():
++            self.add(resp)
++
++        for resp in self._generate_dynamic_responses():
++            self.add(resp)
++
++    def _generate_static_responses(self):
++        resps = []
++
++        # Basic GET requests
++        resps.append(
++            responses.Response(
++                method="GET",
++                url=self.base_url + "/version",
++                json={"data": {"version": "7.2-3", "release": "7.2", "repoid": "c743d6c1"}},
++            )
++        )
++
++        resps.append(
++            responses.Response(
++                method="POST",
++                url=re.compile(self.base_url + r"/nodes/[^/]+/storage/[^/]+/download-url"),
++                # "done" added to UPID so polling will terminate (status checking is tested elsewhere)
++                json={
++                    "data": "UPID:node:003094EA:095F1EFE:63E88772:download:file.iso:root@pam:done",
++                    "success": 1,
++                },
++            )
++        )
++
++        resps.append(
++            responses.Response(
++                method="POST",
++                url=re.compile(self.base_url + r"/nodes/[^/]+/storage/storage1/upload"),
++                # "done" added to UPID so polling will terminate (status checking is tested elsewhere)
++                json={"data": "UPID:node:0017C594:0ADB2769:63EC5455:imgcopy::root@pam:done"},
++            )
++        )
++        resps.append(
++            responses.Response(
++                method="POST",
++                url=re.compile(self.base_url + r"/nodes/[^/]+/storage/missing/upload"),
++                status=500,
++                body="storage 'missing' does not exist",
++            )
++        )
++
++        return resps
++
++    def _generate_dynamic_responses(self):
++        resps = []
++
++        # Authentication
++        resps.append(
++            responses.CallbackResponse(
++                method="POST",
++                url=self.base_url + "/access/ticket",
++                callback=self._cb_password_auth,
++            )
++        )
++
++        # Session testing
++        resps.append(
++            responses.CallbackResponse(
++                method="GET",
++                url=self.base_url + "/fake/echo",
++                callback=self._cb_echo,
++            )
++        )
++
++        resps.append(
++            responses.CallbackResponse(
++                method="GET",
++                url=re.compile(self.base_url + r"/nodes/[^/]+/qemu/[^/]+/agent/exec"),
++                callback=self._cb_echo,
++            )
++        )
++
++        resps.append(
++            responses.CallbackResponse(
++                method="GET",
++                url=re.compile(self.base_url + r"/nodes/[^/]+/qemu/[^/]+/monitor"),
++                callback=self._cb_qemu_monitor,
++            )
++        )
++
++        resps.append(
++            responses.CallbackResponse(
++                method="GET",
++                url=re.compile(self.base_url + r"/nodes/[^/]+/tasks/[^/]+/status"),
++                callback=self._cb_task_status,
++            )
++        )
++
++        resps.append(
++            responses.CallbackResponse(
++                method="GET",
++                url=re.compile(self.base_url + r"/nodes/[^/]+/query-url-metadata.*"),
++                callback=self._cb_url_metadata,
++            )
++        )
++
++        return resps
++
++    ###################################
++    # Callbacks for Dynamic Responses #
++    ###################################
++
++    def _cb_echo(self, request):
++        body = request.body
++        if body is not None:
++            if isinstance(body, MultipartEncoder):
++                body = body.to_string()  # really, to byte string
++            body = body if isinstance(body, str) else str(body, "utf-8")
++
++        resp = {
++            "method": request.method,
++            "url": request.url,
++            "headers": dict(request.headers),
++            "cookies": request._cookies.get_dict(),
++            "body": body,
++            # "body_json": dict(parse_qsl(request.body)),
++        }
++        return (200, self.common_headers, json.dumps(resp))
++
++    def _cb_password_auth(self, request):
++        form_data_dict = dict(parse_qsl(request.body))
++
++        # if this user should not be authenticated
++        if form_data_dict.get("username") == "bad_auth":
++            return (
++                401,
++                self.common_headers,
++                json.dumps({"data": None}),
++            )
++        # if this user requires OTP and it is not included
++        if form_data_dict.get("username") == "otp" and form_data_dict.get("otp") is None:
++            return (
++                200,
++                self.common_headers,
++                json.dumps(
++                    {
++                        "data": {
++                            "ticket": "otp_ticket",
++                            "CSRFPreventionToken": "CSRFPreventionToken",
++                            "NeedTFA": 1,
++                        }
++                    }
++                ),
++            )
++
++        # if this is the first ticket
++        if form_data_dict.get("password") != "ticket":
++            return (
++                200,
++                self.common_headers,
++                json.dumps(
++                    {"data": {"ticket": "ticket", "CSRFPreventionToken": "CSRFPreventionToken"}}
++                ),
++            )
++        # if this is refreshing the ticket, return new ticket
++        else:
++            return (
++                200,
++                self.common_headers,
++                json.dumps(
++                    {
++                        "data": {
++                            "ticket": "new_ticket",
++                            "CSRFPreventionToken": "CSRFPreventionToken_2",
++                        }
++                    }
++                ),
++            )
++
++    def _cb_task_status(self, request):
++        resp = {}
++        if "keep-running" in request.url:
++            resp = {
++                "data": {
++                    "id": "110",
++                    "pid": 1044989,
++                    "node": "node1",
++                    "pstart": 284768076,
++                    "status": "running",
++                    "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running",
++                    "starttime": 1661825068,
++                    "user": "root@pam",
++                    "type": "vzdump",
++                }
++            }
++
++        elif "stopped" in request.url:
++            resp = {
++                "data": {
++                    "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:stopped",
++                    "starttime": 1661825068,
++                    "user": "root@pam",
++                    "type": "vzdump",
++                    "pstart": 284768076,
++                    "status": "stopped",
++                    "exitstatus": "interrupted by signal",
++                    "pid": 1044989,
++                    "id": "110",
++                    "node": "node1",
++                }
++            }
++
++        elif "done" in request.url:
++            resp = {
++                "data": {
++                    "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done",
++                    "starttime": 1661825068,
++                    "user": "root@pam",
++                    "type": "vzdump",
++                    "pstart": 284768076,
++                    "status": "stopped",
++                    "exitstatus": "OK",
++                    "pid": 1044989,
++                    "id": "110",
++                    "node": "node1",
++                }
++            }
++
++        elif "comment" in request.url:
++            resp = {
++                "data": {
++                    "upid": "UPID:node:00000000:00000000:00000000:task:id:root@pam:comment",
++                    "node": "node",
++                    "pid": 0,
++                    "pstart": 0,
++                    "starttime": 0,
++                    "type": "task",
++                    "id": "id",
++                    "user": "root@pam",
++                    "status": "stopped",
++                    "exitstatus": "OK",
++                }
++            }
++
++        return (200, self.common_headers, json.dumps(resp))
++
++    def _cb_url_metadata(self, request):
++        form_data_dict = dict(parse_qsl((urlparse(request.url)).query))
++
++        if "file.iso" in form_data_dict.get("url", ""):
++            return (
++                200,
++                self.common_headers,
++                json.dumps(
++                    {
++                        "data": {
++                            "size": 123456,
++                            "filename": "file.iso",
++                            "mimetype": "application/x-iso9660-image",
++                            # "mimetype": "application/octet-stream",
++                        },
++                        "success": 1,
++                    }
++                ),
++            )
++        elif "invalid.iso" in form_data_dict.get("url", ""):
++            return (
++                500,
++                self.common_headers,
++                json.dumps(
++                    {
++                        "status": 500,
++                        "message": "invalid server response: '500 Can't connect to sub.domain.tld:443 (certificate verify failed)'\n",
++                        "success": 0,
++                        "data": None,
++                    }
++                ),
++            )
++        elif "missing.iso" in form_data_dict.get("url", ""):
++            return (
++                500,
++                self.common_headers,
++                json.dumps(
++                    {
++                        "status": 500,
++                        "success": 0,
++                        "message": "invalid server response: '404 Not Found'\n",
++                        "data": None,
++                    }
++                ),
++            )
++
++        elif "index.html" in form_data_dict.get("url", ""):
++            return (
++                200,
++                self.common_headers,
++                json.dumps(
++                    {
++                        "success": 1,
++                        "data": {"filename": "index.html", "mimetype": "text/html", "size": 17664},
++                    }
++                ),
++            )
++
++    def _cb_qemu_monitor(self, request):
++        body = request.body
++        if body is not None:
++            body = body if isinstance(body, str) else str(body, "utf-8")
++
++        # if the command is an array, throw the type error PVE would throw
++        if "&" in body:
++            return (
++                400,
++                self.common_headers,
++                json.dumps(
++                    {
++                        "data": None,
++                        "errors": {"command": "type check ('string') failed - got ARRAY"},
++                    }
++                ),
++            )
++        else:
++            resp = {
++                "method": request.method,
++                "url": request.url,
++                "headers": dict(request.headers),
++                "cookies": request._cookies.get_dict(),
++                "body": body,
++                # "body_json": dict(parse_qsl(request.body)),
++            }
++            print(resp)
++            return (200, self.common_headers, json.dumps(resp))
+diff -ruN tests/files_mock.py proxmoxer-2.2.0/tests/files_mock.py
+--- tests/files_mock.py	1970-01-01 01:00:00.000000000 +0100
++++ proxmoxer-2.2.0/tests/files_mock.py	2024-12-15 02:12:42.000000000 +0000
+@@ -0,0 +1,127 @@
++__author__ = "John Hollowell"
++__copyright__ = "(c) John Hollowell 2022"
++__license__ = "MIT"
++
++import re
++
++import pytest
++import responses
++from requests import exceptions
++
++from .api_mock import PVERegistry
++
++
++@pytest.fixture()
++def mock_files():
++    with responses.RequestsMock(
++        registry=FilesRegistry, assert_all_requests_are_fired=False
++    ) as rsps:
++        yield rsps
++
++
++class FilesRegistry(responses.registries.FirstMatchRegistry):
++    base_url = "https://sub.domain.tld"
++
++    common_headers = {
++        "Cache-Control": "max-age=0",
++        "Connection": "close, Keep-Alive",
++        "Pragma": "no-cache",
++        "Server": "pve-api-daemon/3.0",
++        "Content-Type": "application/json;charset=UTF-8",
++    }
++
++    def __init__(self):
++        super().__init__()
++        for resp in self._generate_static_responses():
++            self.add(resp)
++
++    def _generate_static_responses(self):
++        resps = []
++
++        # Basic GET requests
++        resps.append(responses.Response(method="GET", url=self.base_url, body="hello world"))
++        resps.append(
++            responses.Response(method="GET", url=self.base_url + "/file.iso", body="CONTENTS")
++        )
++
++        # sibling
++        resps.append(
++            responses.Response(
++                method="GET", url=self.base_url + "/sibling/file.iso", body="CONTENTS\n"
++            )
++        )
++        resps.append(
++            responses.Response(
++                method="GET",
++                url=self.base_url + "/sibling/TESTINGSUMS",
++                body="this_is_the_hash  file.iso",
++            )
++        )
++
++        # extension
++        resps.append(
++            responses.Response(
++                method="GET", url=self.base_url + "/extension/file.iso", body="CONTENTS\n"
++            )
++        )
++        resps.append(
++            responses.Response(
++                method="GET",
++                url=self.base_url + "/extension/file.iso.testing",
++                body="this_is_the_hash  file.iso",
++            )
++        )
++        resps.append(
++            responses.Response(
++                method="GET",
++                url=self.base_url + "/extension/connectionerror.iso.testing",
++                body=exceptions.ConnectionError(),
++            )
++        )
++        resps.append(
++            responses.Response(
++                method="GET",
++                url=self.base_url + "/extension/readtimeout.iso.testing",
++                body=exceptions.ReadTimeout(),
++            )
++        )
++
++        # extension upper
++        resps.append(
++            responses.Response(
++                method="GET", url=self.base_url + "/upper/file.iso", body="CONTENTS\n"
++            )
++        )
++        resps.append(
++            responses.Response(
++                method="GET",
++                url=self.base_url + "/upper/file.iso.TESTING",
++                body="this_is_the_hash  file.iso",
++            )
++        )
++
++        resps.append(
++            responses.Response(
++                method="GET",
++                url=re.compile(self.base_url + r"/checksums/file.iso.\w+"),
++                body="1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890 file.iso",
++            )
++        )
++
++        return resps
++
++
++@pytest.fixture()
++def mock_files_and_pve():
++    with responses.RequestsMock(registry=BothRegistry, assert_all_requests_are_fired=False) as rsps:
++        yield rsps
++
++
++class BothRegistry(responses.registries.FirstMatchRegistry):
++    def __init__(self):
++        super().__init__()
++        registries = [FilesRegistry(), PVERegistry()]
++
++        for reg in registries:
++            for resp in reg.registered:
++                self.add(resp)
+diff -ruN tests/tools/__init__.py proxmoxer-2.2.0/tests/tools/__init__.py
+--- tests/tools/__init__.py	1970-01-01 01:00:00.000000000 +0100
++++ proxmoxer-2.2.0/tests/tools/__init__.py	2024-12-15 02:12:42.000000000 +0000
+@@ -0,0 +1,3 @@
++__author__ = "John Hollowell"
++__copyright__ = "(c) John Hollowell 2022"
++__license__ = "MIT"
+diff -ruN tests/tools/test_files.py proxmoxer-2.2.0/tests/tools/test_files.py
+--- tests/tools/test_files.py	1970-01-01 01:00:00.000000000 +0100
++++ proxmoxer-2.2.0/tests/tools/test_files.py	2024-12-15 02:12:42.000000000 +0000
+@@ -0,0 +1,375 @@
++__author__ = "John Hollowell"
++__copyright__ = "(c) John Hollowell 2023"
++__license__ = "MIT"
++
++import logging
++import tempfile
++from unittest import mock
++
++import pytest
++
++from proxmoxer import ProxmoxAPI, core
++from proxmoxer.tools import ChecksumInfo, Files, SupportedChecksums
++
++from ..api_mock import mock_pve  # pylint: disable=unused-import # noqa: F401
++from ..files_mock import (  # pylint: disable=unused-import # noqa: F401
++    mock_files,
++    mock_files_and_pve,
++)
++
++MODULE_LOGGER_NAME = "proxmoxer.tools.files"
++
++
++class TestChecksumInfo:
++    def test_basic(self):
++        info = ChecksumInfo("name", 123)
++
++        assert info.name == "name"
++        assert info.hex_size == 123
++
++    def test_str(self):
++        info = ChecksumInfo("name", 123)
++
++        assert str(info) == "name"
++
++    def test_repr(self):
++        info = ChecksumInfo("name", 123)
++
++        assert repr(info) == "name (123 digits)"
++
++
++class TestGetChecksum:
++    def test_get_checksum_from_sibling_file_success(self, mock_files):
++        url = "https://sub.domain.tld/sibling/file.iso"
++        exp_hash = "this_is_the_hash"
++        info = ChecksumInfo("testing", 16)
++        res1 = Files._get_checksum_from_sibling_file(url, checksum_info=info)
++        res2 = Files._get_checksum_from_sibling_file(url, checksum_info=info, filename="file.iso")
++
++        assert res1 == exp_hash
++        assert res2 == exp_hash
++
++    def test_get_checksum_from_sibling_file_fail(self, mock_files):
++        url = "https://sub.domain.tld/sibling/missing.iso"
++        info = ChecksumInfo("testing", 16)
++        res1 = Files._get_checksum_from_sibling_file(url, checksum_info=info)
++        res2 = Files._get_checksum_from_sibling_file(
++            url, checksum_info=info, filename="missing.iso"
++        )
++
++        assert res1 is None
++        assert res2 is None
++
++    def test_get_checksum_from_extension_success(self, mock_files):
++        url = "https://sub.domain.tld/extension/file.iso"
++        exp_hash = "this_is_the_hash"
++        info = ChecksumInfo("testing", 16)
++        res1 = Files._get_checksum_from_extension(url, checksum_info=info)
++        res2 = Files._get_checksum_from_extension(url, checksum_info=info, filename="file.iso")
++
++        assert res1 == exp_hash
++        assert res2 == exp_hash
++
++    def test_get_checksum_from_extension_fail(self, mock_files):
++        url = "https://sub.domain.tld/extension/missing.iso"
++
++        info = ChecksumInfo("testing", 16)
++        res1 = Files._get_checksum_from_extension(url, checksum_info=info)
++        res2 = Files._get_checksum_from_extension(
++            url, checksum_info=info, filename="connectionerror.iso"
++        )
++        res3 = Files._get_checksum_from_extension(
++            url, checksum_info=info, filename="readtimeout.iso"
++        )
++
++        assert res1 is None
++        assert res2 is None
++        assert res3 is None
++
++    def test_get_checksum_from_extension_upper_success(self, mock_files):
++        url = "https://sub.domain.tld/upper/file.iso"
++        exp_hash = "this_is_the_hash"
++        info = ChecksumInfo("testing", 16)
++        res1 = Files._get_checksum_from_extension_upper(url, checksum_info=info)
++        res2 = Files._get_checksum_from_extension_upper(
++            url, checksum_info=info, filename="file.iso"
++        )
++
++        assert res1 == exp_hash
++        assert res2 == exp_hash
++
++    def test_get_checksum_from_extension_upper_fail(self, mock_files):
++        url = "https://sub.domain.tld/upper/missing.iso"
++        info = ChecksumInfo("testing", 16)
++        res1 = Files._get_checksum_from_extension_upper(url, checksum_info=info)
++        res2 = Files._get_checksum_from_extension_upper(
++            url, checksum_info=info, filename="missing.iso"
++        )
++
++        assert res1 is None
++        assert res2 is None
++
++    def test_get_checksums_from_file_url_all_checksums(self, mock_files):
++        base_url = "https://sub.domain.tld/checksums/file.iso"
++        full_checksum_string = "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
++        for types_enum in SupportedChecksums:
++            checksum_info = types_enum.value
++
++            data = Files.get_checksums_from_file_url(base_url, preferred_type=checksum_info)
++
++            assert data[0] == full_checksum_string[0 : checksum_info.hex_size]
++            assert data[1] == checksum_info
++
++    def test_get_checksums_from_file_url_missing(self, mock_files):
++        url = "https://sub.domain.tld/missing.iso"
++
++        data = Files.get_checksums_from_file_url(url)
++
++        assert data[0] is None
++        assert data[1] is None
++
++
++class TestFiles:
++    prox = ProxmoxAPI("1.2.3.4:1234", token_name="name", token_value="value")
++
++    def test_init_basic(self):
++        f = Files(self.prox, "node1", "storage1")
++
++        assert f._prox == self.prox
++        assert f._node == "node1"
++        assert f._storage == "storage1"
++
++    def test_repr(self):
++        f = Files(self.prox, "node1", "storage1")
++        assert (
++            repr(f)
++            == "Files (node1/storage1 at ProxmoxAPI (https backend for https://1.2.3.4:1234/api2/json))"
++        )
++
++    def test_get_file_info_pass(self, mock_pve):
++        f = Files(self.prox, "node1", "storage1")
++        info = f.get_file_info("https://sub.domain.tld/file.iso")
++
++        assert info["filename"] == "file.iso"
++        assert info["mimetype"] == "application/x-iso9660-image"
++        assert info["size"] == 123456
++
++    def test_get_file_info_fail(self, mock_pve):
++        f = Files(self.prox, "node1", "storage1")
++        info = f.get_file_info("https://sub.domain.tld/invalid.iso")
++
++        assert info is None
++
++
++class TestFilesDownload:
++    prox = ProxmoxAPI("1.2.3.4:1234", token_name="name", token_value="value")
++    f = Files(prox, "node1", "storage1")
++
++    def test_download_discover_checksum(self, mock_files_and_pve, caplog):
++        status = self.f.download_file_to_storage("https://sub.domain.tld/checksums/file.iso")
++
++        # this is the default "done" task mock information
++        assert status == {
++            "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done",
++            "starttime": 1661825068,
++            "user": "root@pam",
++            "type": "vzdump",
++            "pstart": 284768076,
++            "status": "stopped",
++            "exitstatus": "OK",
++            "pid": 1044989,
++            "id": "110",
++            "node": "node1",
++        }
++        assert caplog.record_tuples == []
++
++    def test_download_no_blocking(self, mock_files_and_pve, caplog):
++        status = self.f.download_file_to_storage(
++            "https://sub.domain.tld/checksums/file.iso", blocking_status=False
++        )
++
++        # this is the default "done" task mock information
++        assert status == {
++            "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done",
++            "starttime": 1661825068,
++            "user": "root@pam",
++            "type": "vzdump",
++            "pstart": 284768076,
++            "status": "stopped",
++            "exitstatus": "OK",
++            "pid": 1044989,
++            "id": "110",
++            "node": "node1",
++        }
++        assert caplog.record_tuples == []
++
++    def test_download_no_discover_checksum(self, mock_files_and_pve, caplog):
++        caplog.set_level(logging.WARNING, logger=MODULE_LOGGER_NAME)
++
++        status = self.f.download_file_to_storage("https://sub.domain.tld/file.iso")
++
++        # this is the default "stopped" task mock information
++        assert status == {
++            "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done",
++            "starttime": 1661825068,
++            "user": "root@pam",
++            "type": "vzdump",
++            "pstart": 284768076,
++            "status": "stopped",
++            "exitstatus": "OK",
++            "pid": 1044989,
++            "id": "110",
++            "node": "node1",
++        }
++        assert caplog.record_tuples == [
++            (
++                MODULE_LOGGER_NAME,
++                logging.WARNING,
++                "Unable to discover checksum. Will not do checksum validation",
++            ),
++        ]
++
++    def test_uneven_checksum(self, caplog, mock_files_and_pve):
++        caplog.set_level(logging.DEBUG, logger=MODULE_LOGGER_NAME)
++        status = self.f.download_file_to_storage("https://sub.domain.tld/file.iso", checksum="asdf")
++
++        assert status is None
++
++        assert caplog.record_tuples == [
++            (
++                MODULE_LOGGER_NAME,
++                logging.ERROR,
++                "Must pass both checksum and checksum_type or leave both None for auto-discovery",
++            ),
++        ]
++
++    def test_uneven_checksum_type(self, caplog, mock_files_and_pve):
++        caplog.set_level(logging.DEBUG, logger=MODULE_LOGGER_NAME)
++        status = self.f.download_file_to_storage(
++            "https://sub.domain.tld/file.iso", checksum_type="asdf"
++        )
++
++        assert status is None
++
++        assert caplog.record_tuples == [
++            (
++                MODULE_LOGGER_NAME,
++                logging.ERROR,
++                "Must pass both checksum and checksum_type or leave both None for auto-discovery",
++            ),
++        ]
++
++    def test_get_file_info_missing(self, mock_pve):
++        f = Files(self.prox, "node1", "storage1")
++        info = f.get_file_info("https://sub.domain.tld/missing.iso")
++
++        assert info is None
++
++    def test_get_file_info_non_iso(self, mock_pve):
++        f = Files(self.prox, "node1", "storage1")
++        info = f.get_file_info("https://sub.domain.tld/index.html")
++
++        assert info["filename"] == "index.html"
++        assert info["mimetype"] == "text/html"
++
++
++class TestFilesUpload:
++    prox = ProxmoxAPI("1.2.3.4:1234", token_name="name", token_value="value")
++    f = Files(prox, "node1", "storage1")
++
++    def test_upload_no_file(self, mock_files_and_pve, caplog):
++        status = self.f.upload_local_file_to_storage("/does-not-exist.iso")
++
++        assert status is None
++        assert caplog.record_tuples == [
++            (
++                MODULE_LOGGER_NAME,
++                logging.ERROR,
++                '"/does-not-exist.iso" does not exist or is not a file',
++            ),
++        ]
++
++    def test_upload_dir(self, mock_files_and_pve, caplog):
++        with tempfile.TemporaryDirectory() as tmp_dir:
++            status = self.f.upload_local_file_to_storage(tmp_dir)
++
++            assert status is None
++            assert caplog.record_tuples == [
++                (
++                    MODULE_LOGGER_NAME,
++                    logging.ERROR,
++                    f'"{tmp_dir}" does not exist or is not a file',
++                ),
++            ]
++
++    def test_upload_empty_file(self, mock_files_and_pve, caplog):
++        with tempfile.NamedTemporaryFile("rb") as f_obj:
++            status = self.f.upload_local_file_to_storage(filename=f_obj.name)
++
++            assert status is not None
++            assert caplog.record_tuples == []
++
++    def test_upload_non_empty_file(self, mock_files_and_pve, caplog):
++        with tempfile.NamedTemporaryFile("w+b") as f_obj:
++            f_obj.write(b"a" * 100)
++            f_obj.seek(0)
++            status = self.f.upload_local_file_to_storage(filename=f_obj.name)
++
++            assert status is not None
++            assert caplog.record_tuples == []
++
++    def test_upload_no_checksum(self, mock_files_and_pve, caplog):
++        with tempfile.NamedTemporaryFile("rb") as f_obj:
++            status = self.f.upload_local_file_to_storage(
++                filename=f_obj.name, do_checksum_check=False
++            )
++
++            assert status is not None
++            assert caplog.record_tuples == []
++
++    def test_upload_checksum_unavailable(self, mock_files_and_pve, caplog, apply_no_checksums):
++        with tempfile.NamedTemporaryFile("rb") as f_obj:
++            status = self.f.upload_local_file_to_storage(filename=f_obj.name)
++
++            assert status is not None
++            assert caplog.record_tuples == [
++                (
++                    MODULE_LOGGER_NAME,
++                    logging.WARNING,
++                    "There are no Proxmox supported checksums which are supported by hashlib. Skipping checksum validation",
++                )
++            ]
++
++    def test_upload_non_blocking(self, mock_files_and_pve, caplog):
++        with tempfile.NamedTemporaryFile("rb") as f_obj:
++            status = self.f.upload_local_file_to_storage(filename=f_obj.name, blocking_status=False)
++
++            assert status is not None
++            assert caplog.record_tuples == []
++
++    def test_upload_proxmox_error(self, mock_files_and_pve, caplog):
++        with tempfile.NamedTemporaryFile("rb") as f_obj:
++            f_copy = Files(self.f._prox, self.f._node, "missing")
++
++            with pytest.raises(core.ResourceException) as exc_info:
++                f_copy.upload_local_file_to_storage(filename=f_obj.name)
++
++            assert exc_info.value.status_code == 500
++            assert exc_info.value.status_message == "Internal Server Error"
++            # assert exc_info.value.content == "storage 'missing' does not exist"
++
++    def test_upload_io_error(self, mock_files_and_pve, caplog):
++        with tempfile.NamedTemporaryFile("rb") as f_obj:
++            mo = mock.mock_open()
++            mo.side_effect = IOError("ERROR MESSAGE")
++            with mock.patch("builtins.open", mo):
++                status = self.f.upload_local_file_to_storage(filename=f_obj.name)
++
++            assert status is None
++            assert caplog.record_tuples == [(MODULE_LOGGER_NAME, logging.ERROR, "ERROR MESSAGE")]
++
++
++@pytest.fixture
++def apply_no_checksums():
++    with mock.patch("hashlib.algorithms_available", set()):
++        yield
+diff -ruN tests/tools/test_tasks.py proxmoxer-2.2.0/tests/tools/test_tasks.py
+--- tests/tools/test_tasks.py	1970-01-01 01:00:00.000000000 +0100
++++ proxmoxer-2.2.0/tests/tools/test_tasks.py	2024-12-15 02:12:42.000000000 +0000
+@@ -0,0 +1,223 @@
++__author__ = "John Hollowell"
++__copyright__ = "(c) John Hollowell 2022"
++__license__ = "MIT"
++
++import logging
++
++import pytest
++
++from proxmoxer import ProxmoxAPI
++from proxmoxer.tools import Tasks
++
++from ..api_mock import mock_pve  # pylint: disable=unused-import # noqa: F401
++
++
++class TestBlockingStatus:
++    def test_basic(self, mocked_prox, caplog):
++        caplog.set_level(logging.DEBUG, logger="proxmoxer.core")
++
++        status = Tasks.blocking_status(
++            mocked_prox, "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done"
++        )
++
++        assert status == {
++            "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done",
++            "starttime": 1661825068,
++            "user": "root@pam",
++            "type": "vzdump",
++            "pstart": 284768076,
++            "status": "stopped",
++            "exitstatus": "OK",
++            "pid": 1044989,
++            "id": "110",
++            "node": "node1",
++        }
++        assert caplog.record_tuples == [
++            (
++                "proxmoxer.core",
++                20,
++                "GET https://1.2.3.4:1234/api2/json/nodes/node1/tasks/UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done/status",
++            ),
++            (
++                "proxmoxer.core",
++                10,
++                'Status code: 200, output: b\'{"data": {"upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done", "starttime": 1661825068, "user": "root@pam", "type": "vzdump", "pstart": 284768076, "status": "stopped", "exitstatus": "OK", "pid": 1044989, "id": "110", "node": "node1"}}\'',
*** 179 LINES SKIPPED ***