Skip to content

prefect_managedfiletransfer.transfer_files_flow

Classes

Functions

transfer_files_flow async

Transfers files from a source to a destination based on the provided matchers and mapping. Args: source_block: The source block to transfer files from. destination_block: The destination block to transfer files to. source_file_matchers: List of file matcheing patterns to find and filter files in the source. path_mapping: List of file-to-folder mappings for transferring files. destination_folder: The path of the folder in destination_block where files will be transferred. update_only_if_newer_mode: If true, skip files that are newer on the destination. overwrite: If true, overwrite existing files in the destination. check_for_space: If true, check if there is enough space on the destination before transferring. check_for_space_overhead: Amount of extra space to reserve on the destination (in bytes). mode: Copy or Move transfer mode. reference_date: defaults to now() in UTC - used to filter files based on modification time, and for pattern replacement in file names Returns: A list of the Paths of transferred files.

Source code in prefect_managedfiletransfer/transfer_files_flow.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
@flow(
    name=CONSTANTS.FLOW_NAMES.TRANSFER_FILES,
    log_prints=True,
    flow_run_name=_generate_flow_run_name,
    retries=2,
    retry_delay_seconds=60 * 20,  # retry every 20 minutes
    timeout_seconds=60 * 30,  # timeout after 30 minutes
)
async def transfer_files_flow(
    source_block: TransferBlockType,
    destination_block: TransferBlockType,
    source_file_matchers: list[FileMatcher] = [FileMatcher()],
    path_mapping: list[FileToFolderMapping] = [],
    destination_folder: Path = Path("."),
    update_only_if_newer_mode: bool = False,
    overwrite: bool = False,
    check_for_space: bool = True,
    check_for_space_overhead: int = 2 * 1024 * 1024 * 1024,  # 2GB overhead
    mode: TransferType = TransferType.Copy,
    reference_date: datetime | None = None,
) -> list:
    """
    Transfers files from a source to a destination based on the provided matchers and mapping.
    Args:
        source_block: The source block to transfer files from.
        destination_block: The destination block to transfer files to.
        source_file_matchers: List of file matcheing patterns to find and filter files in the source.
        path_mapping: List of file-to-folder mappings for transferring files.
        destination_folder: The path of the folder in destination_block where files will be transferred.
        update_only_if_newer_mode: If true, skip files that are newer on the destination.
        overwrite: If true, overwrite existing files in the destination.
        check_for_space: If true, check if there is enough space on the destination before transferring.
        check_for_space_overhead: Amount of extra space to reserve on the destination (in bytes).
        mode: Copy or Move transfer mode.
        reference_date: defaults to now() in UTC - used to filter files based on modification time, and for pattern replacement in file names
    Returns:
        A list of the Paths of transferred files.
    """
    if not source_file_matchers:
        raise ValueError("No source file matchers provided")

    if reference_date is None:
        reference_date = datetime.now(timezone.utc)

    # cannot both be local file systems
    if not isinstance(source_block, LocalFileSystem) and not isinstance(
        destination_block, LocalFileSystem
    ):
        raise ValueError(
            "Cannot transfer files between two remote file systems. One must be local."
        )

    source_type = map_block_to_remote_type(source_block)
    destination_type = map_block_to_remote_type(destination_block)
    rclone_source_config = (
        RCloneConfigSavedInPrefect(source_block)
        if isinstance(source_block, RCloneConfigFileBlock)
        else None
    )
    rclone_destination_config = (
        RCloneConfigSavedInPrefect(destination_block)
        if isinstance(destination_block, RCloneConfigFileBlock)
        else None
    )

    source_files: list[RemoteAsset] = []
    for matcher in source_file_matchers:
        files = await list_remote_files_task(
            source_block,
            source_type,
            matcher,
            rclone_source_config,
            reference_date,
        )
        source_files.extend(files)

    basepath_str = (
        destination_block.basepath if hasattr(destination_block, "basepath") else None
    )

    resolved_destination_with_basepath = PathUtil.resolve_path(
        destination_type, basepath_str, destination_folder
    )

    source_destination_pairs = FileToFolderMapping.apply_mappings(
        path_mapping, source_files, resolved_destination_with_basepath
    )

    transferred = []
    for remote_asset, target_file_path in source_destination_pairs:
        if destination_type == RemoteConnectionType.LOCAL:
            downloaded_file = await download_file_task(
                source_block,
                source_type,
                remote_asset,
                target_file_path,
                update_only_if_newer_mode,
                overwrite,
                mode,
                rclone_source_config,
                check_for_space,
                check_for_space_overhead,
                reference_date,
            )
            transferred.append(downloaded_file)
        else:
            upload_result = await upload_file_task(
                remote_asset,
                destination_block,
                destination_type,
                target_file_path,
                update_only_if_newer_mode,
                overwrite,
                mode,
                rclone_config=rclone_destination_config,
                check_for_space=check_for_space,
                check_for_space_overhead=check_for_space_overhead,
                reference_date=reference_date,
            )
            transferred.append(upload_result)

    logger.info(f"Transfer completed. {len(transferred)} files processed")

    return transferred

prefect_managedfiletransfer.upload_file_flow

Classes

Functions

upload_file_flow async

Publish a single file to a destination, e.g. upload an image to a website or copy a file to a local shared public folder

Parameters:

Name Type Description Default
source_folder Path

The folder where the file to upload is located.

required
pattern_to_upload str

The pattern of the file to upload, e.g. "*.jpg".

required
destination_file Path

The destination file path where the file will be uploaded.

required
destination_block_or_blockname ServerWithBasicAuthBlock | ServerWithPublicKeyAuthBlock | LocalFileSystem | RCloneConfigFileBlock | str

The destination block or block name where the file will be uploaded.

required
update_only_if_newer_mode bool

If true, skip files that are newer on the destination.

False
mode TransferType

The transfer mode to use, e.g. Copy or Move.

Copy
overwrite bool

If true, overwrite the file if it already exists at the destination.

False
Source code in prefect_managedfiletransfer/upload_file_flow.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
@flow(
    name=CONSTANTS.FLOW_NAMES.UPLOAD_FILE,
    log_prints=True,
    flow_run_name=generate_flow_run_name,
    retries=2,
    retry_delay_seconds=60 * 20,  # retry every 20 minutes
    timeout_seconds=60 * 30,  # timeout after 30 minutes
)
async def upload_file_flow(
    source_folder: Path,
    pattern_to_upload: str,
    destination_file: Path,
    destination_block_or_blockname: (
        ServerWithBasicAuthBlock
        | ServerWithPublicKeyAuthBlock
        | LocalFileSystem
        | RCloneConfigFileBlock
        | str
    ),
    update_only_if_newer_mode: bool = False,  # if true skip files that are newer on the destination
    mode: TransferType = TransferType.Copy,
    overwrite: bool = False,
):
    """
    Publish a single file to a destination, e.g. upload an image to a website or copy a file to a local shared public folder

    Args:
        source_folder (Path): The folder where the file to upload is located.
        pattern_to_upload (str): The pattern of the file to upload, e.g. "*.jpg".
        destination_file (Path): The destination file path where the file will be uploaded.
        destination_block_or_blockname (ServerWithBasicAuthBlock | ServerWithPublicKeyAuthBlock | LocalFileSystem | RCloneConfigFileBlock | str): The destination block or block name where the file will be uploaded.
        update_only_if_newer_mode (bool): If true, skip files that are newer on the destination.
        mode (TransferType): The transfer mode to use, e.g. Copy or Move.
        overwrite (bool): If true, overwrite the file if it already exists at the destination.
    Returns:
        None: The function does not return anything, it raises an exception if the upload fails.
    Raises:
        ValueError: If the destination block or blockname is missing, or if the upload fails.
        TypeError: If the destination block is of an unsupported type.
        ImportError: If the required libraries for SFTP or RClone are not installed.
        FileNotFoundError: If the source folder or file to upload does not exist.
        ConnectionError: If the connection to the SFTP server fails.
        FileExistsError: If the destination file already exists and overwrite is False.
        RuntimeError: if the upload occured but the sizes do not match, indicating a potential issue with the upload process.
        RuntimeError: if the upload fails with a non-zero exit code.
    """

    logger = get_run_logger()

    sftp_details: ServerWithBasicAuthBlock | None = None
    sftp_details_public_key: ServerWithPublicKeyAuthBlock | None = None
    local_details: LocalFileSystem | None = None

    if destination_block_or_blockname is None:
        raise ValueError("Destination block or blockname is missing")

    destination_block: (
        ServerWithBasicAuthBlock
        | ServerWithPublicKeyAuthBlock
        | LocalFileSystem
        | RCloneConfigFileBlock
    )
    if isinstance(destination_block_or_blockname, str):
        destination_block = await try_fetch_upload_destination(
            destination_block_or_blockname
        )
    else:
        destination_block = destination_block_or_blockname
    result: int | None = None
    if isinstance(destination_block, ServerWithBasicAuthBlock):
        sftp_details = destination_block
        if not sftp_details.isValid():
            raise ValueError("One or more SFTP server details are missing")
        logger.info(
            f"Uploading {pattern_to_upload} to {destination_file} on {sftp_details.host}"
        )
        result = await upload_asset(
            source_folder=source_folder,
            pattern_to_upload=pattern_to_upload,
            destination_file=destination_file,
            destination_type=RemoteConnectionType.SFTP,
            host=sftp_details.host,
            port=sftp_details.port,
            username=sftp_details.username,
            password=sftp_details.password.get_secret_value(),
            update_only_if_newer_mode=update_only_if_newer_mode,
            overwrite=overwrite,
            mode=mode,
        )
    elif isinstance(destination_block, ServerWithPublicKeyAuthBlock):
        sftp_details_public_key = destination_block
        if not sftp_details_public_key.is_valid():
            raise ValueError("One or more SFTP server details are missing")
        logger.info(
            f"Uploading {pattern_to_upload} to {destination_file} on {sftp_details_public_key.host} with pub/private keys"
        )
        with sftp_details_public_key.get_temp_key_file() as temp_key_file:
            result = await upload_asset(
                source_folder=source_folder,
                pattern_to_upload=pattern_to_upload,
                destination_file=destination_file,
                destination_type=RemoteConnectionType.SFTP,
                host=sftp_details_public_key.host,
                port=sftp_details_public_key.port,
                username=sftp_details_public_key.username,
                private_key_path=temp_key_file.get_path(),
                update_only_if_newer_mode=update_only_if_newer_mode,
                overwrite=overwrite,
                mode=mode,
            )
    elif isinstance(destination_block, LocalFileSystem):
        local_details = destination_block
        if not local_details.basepath:
            raise ValueError("LocalFileSystem.basepath details are missing")
        logger.debug(f"Using basepath from local filesystem: {local_details.basepath}")
        destination_file = Path(local_details.basepath) / destination_file
        logger.info(f"Uploading to local filesystem: {destination_file}")
        result = await upload_asset(
            source_folder=source_folder,
            pattern_to_upload=pattern_to_upload,
            destination_file=destination_file,
            destination_type=RemoteConnectionType.LOCAL,
            update_only_if_newer_mode=update_only_if_newer_mode,
            overwrite=overwrite,
            mode=mode,
        )
    elif isinstance(destination_block, RCloneConfigFileBlock):
        rclone_details_prefect_block: RCloneConfigFileBlock = destination_block
        logger.info(
            f"Uploading {pattern_to_upload} to {destination_file} using rclone block for remote {rclone_details_prefect_block.remote_name}"
        )

        result = await upload_asset(
            source_folder=source_folder,
            pattern_to_upload=pattern_to_upload,
            destination_file=destination_file,
            destination_type=RemoteConnectionType.RCLONE,
            rclone_config=RCloneConfigSavedInPrefect(rclone_details_prefect_block),
            update_only_if_newer_mode=update_only_if_newer_mode,
            overwrite=overwrite,
            mode=mode,
            logger=logger,
        )

    if result is None:
        raise ValueError("Destination details are missing")
    if result != 0:
        raise RuntimeError(
            f"Failed to upload {pattern_to_upload} to {destination_file}. Exit code {result}"
        )