import logging
import os
import socket
import tempfile
import threading
import time
import unittest
from functools import partial
from typing import Any, Dict, Type, Union
from faker import Faker
from parametrize import parametrize
from ..providers.txt_file import TxtFileProvider
from ..registry import FILE_REGISTRY
from ..storages.sftp_storage import SFTPStorage
from .sftp_server import SFTPServerManager, start_server
from .utils import AutoFreePortInt
__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2022-2023 Artur Barseghyan"
__license__ = "MIT"
__all__ = ("TestSFTPStorageTestCase",)
SFTP_USER = os.environ.get("SFTP_USER", "foo")
SFTP_PASS = os.environ.get("SFTP_PASS", "pass")
SFTP_HOST = os.environ.get("SFTP_HOST", "127.0.0.1")
SFTP_PORT = int(os.environ.get("SFTP_PORT", AutoFreePortInt(host=SFTP_HOST)))
SFTP_ROOT_PATH = os.environ.get("SFTP_ROOT_PATH", "/upload")
LOGGER = logging.getLogger(__name__)
FAKER = Faker()
FAKER.add_provider(TxtFileProvider)
[docs]class TestSFTPStorageTestCase(unittest.TestCase):
"""Test SFTP storage."""
server_manager: SFTPServerManager
server_thread: threading.Thread
sftp_host: str = SFTP_HOST
sftp_port: int = int(AutoFreePortInt(host=SFTP_HOST))
sftp_user: str = SFTP_USER
sftp_pass: str = SFTP_PASS
sftp_root_path: str = SFTP_ROOT_PATH
# Maximum number of retries to check if the port is free
max_port_retry_limit: int = 10
[docs] @staticmethod
def is_port_in_use(host: str, port: int) -> bool:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex((host, port)) == 0
[docs] @classmethod
def free_port(cls: "TestSFTPStorageTestCase") -> None:
"""Check if the port is in use and wait until it is free."""
retry_count = 0
while cls.is_port_in_use(cls.sftp_host, cls.sftp_port):
if retry_count >= cls.max_port_retry_limit:
LOGGER.warning(
f"Maximum retries reached. Port {cls.sftp_port} on "
f"host {cls.sftp_host} may not be free soon."
)
retry_count = 0 # Reset the retry count
# Assign a new port
cls.sftp_port = int(AutoFreePortInt(host=SFTP_HOST))
LOGGER.warning(
f"Assigned a new port {cls.sftp_port} on "
f"host {cls.sftp_host}. Restarting attempts..."
)
continue
LOGGER.info(
f"Port {cls.sftp_port} in use on host {cls.sftp_host}, "
f"waiting..."
)
time.sleep(1)
retry_count += 1
[docs] def tearDown(self) -> None:
super().tearDown()
# FILE_REGISTRY.clean_up()
[docs] @classmethod
def setUpClass(cls):
# Free port
cls.free_port()
os.makedirs(
os.path.join(tempfile.gettempdir(), "upload", "sub"),
exist_ok=True,
)
# Create a partial function with custom arguments
start_server_partial = partial(
start_server, host=cls.sftp_host, port=cls.sftp_port
)
# Start the server in a separate thread
cls.server_thread = threading.Thread(target=start_server_partial)
# Daemonize the thread, so it exits when the main thread exits
cls.server_thread.daemon = True
cls.server_thread.start()
time.sleep(2)
@parametrize(
"storage_cls, kwargs, prefix, basename, extension",
[
# SFTPStorage
(
SFTPStorage,
{
"host": sftp_host,
"port": sftp_port,
"username": sftp_user,
"password": sftp_pass,
"root_path": sftp_root_path,
},
"zzz",
None,
"docx",
),
(
SFTPStorage,
{
"host": sftp_host,
"port": sftp_port,
"username": sftp_user,
"password": sftp_pass,
"root_path": sftp_root_path,
},
None,
"my_zzz_filename",
"docx",
),
],
)
def test_storage(
self: "TestSFTPStorageTestCase",
storage_cls: Type[SFTPStorage],
kwargs: Dict[str, Any],
prefix: Union[str, None],
basename: Union[str, None],
extension: str,
) -> None:
"""Test storage."""
storage = storage_cls(**kwargs)
# Text file
file_text = storage.generate_filename(
basename=basename, prefix=prefix, extension=extension
)
# Write to the text file
text_result = storage.write_text(file_text, "Lorem ipsum")
# Check if file exists
self.assertTrue(storage.exists(file_text))
# Assert correct return value
self.assertIsInstance(text_result, int)
# Bytes
file_bytes = storage.generate_filename(
basename=basename, prefix=prefix, extension=extension
)
# Write to bytes file
bytes_result = storage.write_bytes(file_bytes, b"Lorem ipsum")
# Check if file exists
self.assertTrue(storage.exists(file_bytes))
# Assert correct return value
self.assertIsInstance(bytes_result, int)
# Clean up
storage.unlink(file_text)
storage.unlink(file_bytes)
@parametrize(
"storage_cls, kwargs, prefix, extension",
[
# FileSystemStorage
(
SFTPStorage,
{
"host": sftp_host,
"port": sftp_port,
"username": sftp_user,
"password": sftp_pass,
"root_path": sftp_root_path,
},
"zzz",
"",
),
],
)
def test_storage_generate_filename_exceptions(
self: "TestSFTPStorageTestCase",
storage_cls: Type[SFTPStorage],
kwargs: Dict[str, Any],
prefix: str,
extension: str,
) -> None:
"""Test storage `generate_filename` exceptions."""
storage = storage_cls(**kwargs)
with self.assertRaises(Exception):
# Generate filename
storage.generate_filename(prefix=prefix, extension=extension)
with self.assertRaises(Exception):
# Generate filename
storage.generate_filename(basename=prefix, extension=extension)
[docs] def test_file_system_storage_abspath(
self: "TestSFTPStorageTestCase",
) -> None:
"""Test `FileSystemStorage` `abspath`."""
storage = SFTPStorage(
host=self.sftp_host,
port=self.sftp_port,
username=self.sftp_user,
password=self.sftp_pass,
root_path=self.sftp_root_path,
)
filename = storage.generate_filename(prefix="", extension="tmp")
self.assertTrue(
storage.abspath(filename).startswith(self.sftp_root_path)
)
[docs] def test_integration(self: "TestSFTPStorageTestCase") -> None:
storage = SFTPStorage(
host=self.sftp_host,
port=self.sftp_port,
username=self.sftp_user,
password=self.sftp_pass,
root_path=self.sftp_root_path,
)
file = FAKER.txt_file(storage=storage)
self.assertTrue(storage.exists(file.data["filename"]))
self.assertTrue(
storage.abspath(file.data["filename"]).startswith(
self.sftp_root_path
)
)
FILE_REGISTRY.clean_up() # Clean up storage files
[docs] def test_integration_sub_dir(self: "TestSFTPStorageTestCase") -> None:
storage = SFTPStorage(
host=self.sftp_host,
port=self.sftp_port,
username=self.sftp_user,
password=self.sftp_pass,
root_path=self.sftp_root_path,
rel_path="sub",
)
file = FAKER.txt_file(storage=storage)
self.assertTrue(storage.exists(file.data["filename"]))
self.assertTrue(
storage.abspath(file.data["filename"]).startswith(
os.path.join(self.sftp_root_path, "sub")
)
)
FILE_REGISTRY.clean_up() # Clean up storage files
@parametrize(
"kwargs",
[
# Wrong username, key
(
{
"host": sftp_host,
"port": sftp_port,
"username": "wrong",
"key": "wrong",
},
),
# Wrong username, password
(
{
"host": sftp_host,
"port": sftp_port,
"username": "wrong",
"password": "wrong",
},
),
],
)
def test_storage_initialization_exceptions(
self: "TestSFTPStorageTestCase",
kwargs: Dict[str, Any],
) -> None:
with self.assertRaises(Exception):
# Initialize the storage
SFTPStorage(**kwargs)
[docs] def test_storage_write_text_exceptions(self) -> None:
storage = SFTPStorage(
host=self.sftp_host,
port=self.sftp_port,
username=self.sftp_user,
password=self.sftp_pass,
root_path=self.sftp_root_path,
)
val = storage.write_text(
filename=FAKER.file_name(),
data=1, # type: ignore
)
self.assertEqual(val, -1)
[docs] def test_storage_write_bytes_exceptions(self) -> None:
storage = SFTPStorage(
host=self.sftp_host,
port=self.sftp_port,
username=self.sftp_user,
password=self.sftp_pass,
root_path=self.sftp_root_path,
)
val = storage.write_bytes(
filename=FAKER.file_name(),
data=1, # type: ignore
)
self.assertEqual(val, -1)
[docs] def test_storage_exists_exceptions(self) -> None:
storage = SFTPStorage(
host=self.sftp_host,
port=self.sftp_port,
username=self.sftp_user,
password=self.sftp_pass,
root_path=self.sftp_root_path,
)
val = storage.exists(
filename=FAKER.file_name(),
)
self.assertFalse(val)