Skip to content

prefect_managedfiletransfer.download_file_task

Classes

Functions

download_file_task async

Task to download a single file from a remote source (SFTP, RClone, or LocalFileSystem) to a local path Args: source_block (TransferBlockType): The block representing the source connection. source_type (RemoteConnectionType): The type of the source connection. remote_asset (RemoteAsset): The remote asset to download. target_file_path (Path): The path where the file should be downloaded. update_only_if_newer_mode (bool): If true, skip files that are newer on the destination. overwrite (bool): If true, overwrite the file if it exists. mode (str): The transfer mode to use (e.g., Copy, Move). rclone_config (RCloneConfigSavedInPrefect | None): The RClone configuration to use for the download, if applicable. check_for_space (bool): If true, check if there is enough space on the destination. check_for_space_overhead (int): The overhead space to consider when checking for space in bytes. reference_date (datetime | None): The reference date to use for checking file modification times. Defaults to None, which uses the current time. Returns: AssetDownloadResult: The result of the download operation, including success status and any error messages.

Source code in prefect_managedfiletransfer/download_file_task.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
@task(
    retries=2,
    retry_delay_seconds=60,
    timeout_seconds=60 * 30,
    task_run_name="download-{remote_asset.path}",
)
async def download_file_task(
    source_block: TransferBlockType,
    source_type: RemoteConnectionType,
    remote_asset: RemoteAsset,
    target_file_path: Path,
    update_only_if_newer_mode: bool,
    overwrite: bool,
    mode: str,
    rclone_config: RCloneConfigSavedInPrefect | None,
    check_for_space: bool,
    check_for_space_overhead: int,
    reference_date: datetime | None = None,
) -> AssetDownloadResult:
    """
    Task to download a single file from a remote source (SFTP, RClone, or LocalFileSystem) to a local path
    Args:
        source_block (TransferBlockType): The block representing the source connection.
        source_type (RemoteConnectionType): The type of the source connection.
        remote_asset (RemoteAsset): The remote asset to download.
        target_file_path (Path): The path where the file should be downloaded.
        update_only_if_newer_mode (bool): If true, skip files that are newer on the destination.
        overwrite (bool): If true, overwrite the file if it exists.
        mode (str): The transfer mode to use (e.g., Copy, Move).
        rclone_config (RCloneConfigSavedInPrefect | None): The RClone configuration to use for the download, if applicable.
        check_for_space (bool): If true, check if there is enough space on the destination.
        check_for_space_overhead (int): The overhead space to consider when checking for space in bytes.
        reference_date (datetime | None): The reference date to use for checking file modification times. Defaults to None, which uses the current time.
    Returns:
        AssetDownloadResult: The result of the download operation, including success status and any error messages.
    """

    logger.info(f"Start download {remote_asset.path} to {target_file_path}")

    if not reference_date:
        reference_date = datetime.now(timezone.utc)

    with (
        source_block.get_temp_key_file()
        if hasattr(source_block, "get_temp_key_file")
        else nullcontext()
    ) as temp_key_file:
        download_result = await download_asset(
            file=remote_asset,
            destination_path=target_file_path,
            remote_type=source_type,
            host=source_block.host if hasattr(source_block, "host") else None,
            port=source_block.port if hasattr(source_block, "port") else None,
            username=(
                source_block.username if hasattr(source_block, "username") else None
            ),
            password=(
                source_block.password if hasattr(source_block, "password") else None
            ),
            private_key_path=(temp_key_file.get_path() if temp_key_file else None),
            rclone_config=rclone_config,
            update_only_if_newer_mode=update_only_if_newer_mode,
            overwrite=overwrite,
            check_for_space=check_for_space,
            check_for_space_overhead=check_for_space_overhead,
            mode=mode,
            reference_date=reference_date,
        )
        if not download_result.success:
            logger.error(
                f"Failed to download {remote_asset.path}: {download_result.error}"
            )
            raise Exception(download_result.error)

        return download_result

prefect_managedfiletransfer.list_remote_files_task

Classes

Functions

list_remote_files_task async

Task to list remote files based on a matcher. Remote can be SFTP, RClone, or LocalFileSystem.

Parameters:

Name Type Description Default
source_block TransferBlockType

The block representing the source connection.

required
source_type RemoteConnectionType

The type of the source connection.

required
matcher FileMatcher

The matcher defining the pattern to match files.

required
rclone_config RCloneConfigSavedInPrefect | None

The RClone configuration to use for the remote connection, if applicable.

None
reference_date datetime | None

The reference date to use for filtering files. Defaults to None, which uses the current time.

None

Returns:

Type Description
list[RemoteAsset]

list[RemoteAsset]: A list of RemoteAsset objects that match the given pattern.

Source code in prefect_managedfiletransfer/list_remote_files_task.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
@task(task_run_name="list_{matcher.source_folder}_{matcher.pattern_to_match}")
async def list_remote_files_task(
    source_block: TransferBlockType,
    source_type: RemoteConnectionType,
    matcher: FileMatcher,
    rclone_config: RCloneConfigSavedInPrefect | None = None,
    reference_date: datetime | None = None,
) -> list[RemoteAsset]:
    """
    Task to list remote files based on a matcher. Remote can be SFTP, RClone, or LocalFileSystem.

    Args:
        source_block (TransferBlockType): The block representing the source connection.
        source_type (RemoteConnectionType): The type of the source connection.
        matcher (FileMatcher): The matcher defining the pattern to match files.
        rclone_config (RCloneConfigSavedInPrefect | None): The RClone configuration to use for the remote connection, if applicable.
        reference_date (datetime | None): The reference date to use for filtering files. Defaults to None, which uses the current time.

    Returns:
        list[RemoteAsset]: A list of RemoteAsset objects that match the given pattern.
    """

    basepath_str = source_block.basepath if hasattr(source_block, "basepath") else None
    remote_source_path = PathUtil.resolve_path(
        source_type, basepath_str, matcher.source_folder
    )

    if reference_date is None:
        reference_date = datetime.now(timezone.utc)

    logger.debug(f"Listing remote files in {remote_source_path} with matcher {matcher}")

    files: list[RemoteAsset] = []
    with (
        source_block.get_temp_key_file()
        if hasattr(source_block, "get_temp_key_file")
        else nullcontext()
    ) as temp_key_file:
        found = await list_remote_assets(
            remote_folder=remote_source_path,
            pattern_to_match=matcher.pattern_to_match,
            remote_type=source_type,
            host=source_block.host if hasattr(source_block, "host") else None,
            port=source_block.port if hasattr(source_block, "port") else None,
            username=(
                source_block.username if hasattr(source_block, "username") else None
            ),
            password=(
                source_block.password if hasattr(source_block, "password") else None
            ),
            private_key_path=(temp_key_file.get_path() if temp_key_file else None),
            rclone_config=rclone_config,
            minimum_age=matcher.minimum_age,
            maximum_age=matcher.maximum_age,
            sort=matcher.sort,
            skip=matcher.skip,
            take=matcher.take,
            reference_date=reference_date,
        )
        files.extend(found)

    logger.info(
        f"Found {len(files)} remote files in {remote_source_path} with matcher {matcher}"
    )

    return files

prefect_managedfiletransfer.upload_file_task

Classes

Functions

upload_file_task async

Task to upload a single file to a remote destination (local/SFTP/RClone remote).

Parameters:

Name Type Description Default
source_remote_asset RemoteAsset

The remote asset to upload.

required
destination_block ServerWithBasicAuthBlock | ServerWithPublicKeyAuthBlock | LocalFileSystem | RCloneConfigFileBlock

The block representing the destination.

required
destination_type RemoteConnectionType

The type of the destination connection.

required
target_file_path Path

The path where the file should be uploaded.

required
update_only_if_newer_mode bool

If true, skip files that are newer on the destination.

required
overwrite bool

If true, overwrite the file if it exists.

required
mode TransferType

The transfer mode to use (e.g., Copy, Move).

required
rclone_config RCloneConfigFileBlock

The RClone configuration block to use for the upload.

required
check_for_space bool

If true, check if there is enough space on the destination.

required
check_for_space_overhead int

The overhead space to consider when checking for space in bytes.

required
reference_date datetime

now() in UTC. The reference date to use for checking file modification times. Used in testing.

required
Source code in prefect_managedfiletransfer/upload_file_task.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
@task(
    retries=2,
    retry_delay_seconds=60,
    timeout_seconds=60 * 30,
    task_run_name="upload-{target_file_path}",
)
async def upload_file_task(
    source_remote_asset: RemoteAsset,
    destination_block: (
        ServerWithBasicAuthBlock
        | ServerWithPublicKeyAuthBlock
        | LocalFileSystem
        | RCloneConfigFileBlock
    ),
    destination_type: RemoteConnectionType,
    target_file_path: Path,
    update_only_if_newer_mode: bool,
    overwrite: bool,
    mode: TransferType,
    rclone_config: RCloneConfigFileBlock,
    check_for_space: bool,
    check_for_space_overhead: int,
    reference_date: datetime,
) -> Path:
    """
    Task to upload a single file to a remote destination (local/SFTP/RClone remote).

    Args:
        source_remote_asset (RemoteAsset): The remote asset to upload.
        destination_block (ServerWithBasicAuthBlock | ServerWithPublicKeyAuthBlock | LocalFileSystem | RCloneConfigFileBlock): The block representing the destination.
        destination_type (RemoteConnectionType): The type of the destination connection.
        target_file_path (Path): The path where the file should be uploaded.
        update_only_if_newer_mode (bool): If true, skip files that are newer on the destination.
        overwrite (bool): If true, overwrite the file if it exists.
        mode (TransferType): The transfer mode to use (e.g., Copy, Move).
        rclone_config (RCloneConfigFileBlock): The RClone configuration block to use for the upload.
        check_for_space (bool): If true, check if there is enough space on the destination.
        check_for_space_overhead (int): The overhead space to consider when checking for space in bytes.
        reference_date (datetime): now() in UTC. The reference date to use for checking file modification times. Used in testing.
    Returns:
        Path: The path to the uploaded file.
    """

    logger.info(f"Start upload {source_remote_asset.path} to {target_file_path}")

    with (
        destination_block.get_temp_key_file()
        if hasattr(destination_block, "get_temp_key_file")
        else nullcontext()
    ) as temp_key_file:
        upload_result = await upload_asset(
            source_folder=source_remote_asset.path.parent,
            pattern_to_upload=source_remote_asset.path.name,
            destination_file=target_file_path,
            destination_type=destination_type,
            host=(
                destination_block.host if hasattr(destination_block, "host") else None
            ),
            port=(
                destination_block.port if hasattr(destination_block, "port") else None
            ),
            username=(
                destination_block.username
                if hasattr(destination_block, "username")
                else None
            ),
            password=(
                destination_block.password
                if hasattr(destination_block, "password")
                else None
            ),
            private_key_path=(temp_key_file.get_path() if temp_key_file else None),
            rclone_config=rclone_config,
            update_only_if_newer_mode=update_only_if_newer_mode,
            overwrite=overwrite,
            mode=mode,
            check_for_space=check_for_space,
            check_for_space_overhead=check_for_space_overhead,
            reference_datetime=reference_date,
        )
        if upload_result != 0:
            logger.error(
                f"Failed to upload {source_remote_asset.path} to {target_file_path}. Exit code {upload_result}"
            )
            raise Exception("Upload failed")

        return target_file_path