Skip to content

Tasks

Collection of tasks that can be used to run Data Quality checks using Soda Core.

soda_scan_execute(data_source_name, configuration, checks, variables, scan_results_file=None, verbose=False, return_scan_result_file_content=False, shell_env=None) async

Task that execute a Soda Scan. First, the scan is created and configured using the provided configuration, checks, and other options, and then it is executed against the provided data source.

Parameters:

Name Type Description Default
data_source_name str

The name of the data source against which the checks will be executed. The data source name must match one of the data sources provided in the configuration object.

required
configuration SodaConfiguration

SodaConfiguration object that will be used to configure the scan before its execution.

required
checks SodaCLCheck

SodaCLCheck object that will be used, together with configuration, to configure the scan before its execution.

required
variables Optional[Dict[str, str]]

A Dict[str, str] that contains all variables references within checks.

required
scan_results_file Optional[str]

The path to the file where the scan results will be stored. If not provided, the scan results will not be stored on the file system and only the stdout of the soda shell task would be returned.

None
verbose bool

Whether to run the checks with a verbose log or not. Default to False.

False
return_scan_result_file_content bool

Controls the return of the task. If True, the task will return the content of the scan results, otherwise it will return the stdout of the soda shell task. Default to False.

False
shell_env Optional[Dict[str, str]]

A Dict[str, str] that contains all environment variables that will be passed to the soda shell task.

None

Returns:

Type Description
Union[List, str]

Logs produced by running soda scan CLI command.

Example
from prefect_soda_core.sodacl_check import SodaCLCheck
from prefect_soda_core.soda_configuration import SodaConfiguration
from prefect_soda_core.tasks import soda_scan_execute

from prefect import flow

sodacl_check_block = SodaCLCheck.load("SODACL_CHECK_BLOCK_NAME")
soda_configuration_block = SodaConfiguration.load("SODA_CONF_BLOCK_NAME")

@flow
def run_soda_scan():
    return soda_scan_execute(
        data_source_name="datasource",
        configuration=soda_configuration_block,
        checks=sodacl_check_block,
        variables={"key": "value"},
        scan_results_file="scan_results.json",
        verbose=False,
        return_scan_result_file_content=False,
        shell_env={"SNOWFLAKE_PASSWORD": "********"}
    )
Source code in prefect_soda_core/tasks.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
@task
async def soda_scan_execute(
    data_source_name: str,
    configuration: SodaConfiguration,
    checks: SodaCLCheck,
    variables: Optional[Dict[str, str]],
    scan_results_file: Optional[str] = None,
    verbose: bool = False,
    return_scan_result_file_content: bool = False,
    shell_env: Optional[Dict[str, str]] = None,
) -> Union[List, str]:
    """
    Task that execute a Soda Scan.
    First, the scan is created and configured using the provided
    configuration, checks, and other options, and then
    it is executed against the provided data source.

    Args:
        data_source_name: The name of the data source against
            which the checks will be executed. The data source name
            must match one of the data sources provided in the
            `configuration` object.
        configuration: `SodaConfiguration` object that will be used
            to configure the scan before its execution.
        checks: `SodaCLCheck` object that will be used, together with
            `configuration`, to configure the scan before its execution.
        variables: A `Dict[str, str]` that contains all variables
            references within checks.
        scan_results_file: The path to the file where the scan results
            will be stored. If not provided, the scan results will not
            be stored on the file system and only the stdout of the soda
            shell task would be returned.
        verbose: Whether to run the checks with a verbose log or not.
            Default to `False`.
        return_scan_result_file_content: Controls the return of the task.
            If `True`, the task will return the content of the scan results,
            otherwise it will return the stdout of the soda shell task.
            Default to `False`.
        shell_env: A `Dict[str, str]` that contains all environment variables
            that will be passed to the soda shell task.

    Raises:
        `RuntimeError` in case `soda scan` encounters any error
            during execution.

    Returns:
        Logs produced by running `soda scan` CLI command.

    Example:
        ```python
        from prefect_soda_core.sodacl_check import SodaCLCheck
        from prefect_soda_core.soda_configuration import SodaConfiguration
        from prefect_soda_core.tasks import soda_scan_execute

        from prefect import flow

        sodacl_check_block = SodaCLCheck.load("SODACL_CHECK_BLOCK_NAME")
        soda_configuration_block = SodaConfiguration.load("SODA_CONF_BLOCK_NAME")

        @flow
        def run_soda_scan():
            return soda_scan_execute(
                data_source_name="datasource",
                configuration=soda_configuration_block,
                checks=sodacl_check_block,
                variables={"key": "value"},
                scan_results_file="scan_results.json",
                verbose=False,
                return_scan_result_file_content=False,
                shell_env={"SNOWFLAKE_PASSWORD": "********"}
            )
        ```
    """
    # Persist the configuration on the file system, if necessary
    configuration.persist_configuration()

    # Perists checks on the file system, if necessary
    checks.persist_checks()

    # Soda command initial definition
    command = (
        f"soda scan -d {data_source_name} -c {configuration.configuration_yaml_path}"
    )

    # If variables are provided, add the to Soda command
    if variables:
        var_str = "".join(
            [
                f'-v "{var_name}={var_value}" '
                for var_name, var_value in variables.items()
            ]
        )

        command = f"{command} {var_str}"

    # If return_scan_result_file_content is True, save the output of the scan to a file
    if return_scan_result_file_content is True:
        # Implicitly use task run name and time to store
        #   the JSON-based scan results file
        if scan_results_file is None:
            task_run_name = get_run_context().task_run.name
            task_run_start_time = get_run_context().task_run.start_time
            scan_results_file = f"{task_run_start_time}--{task_run_name}.json"

        command = f"{command} -srf {scan_results_file}"

    # If verbose logging is requested, add corresponding option to Soda command
    if verbose:
        command = f"{command} -V"

    # Build final Soda command
    command = f"{command} {checks.sodacl_yaml_path}"

    # Log Soda command for debuggin purpose
    get_run_logger().debug(f"Soda requested command is: {command}")

    # Init soda_logs
    soda_logs = []
    try:
        # Execute Soda command
        soda_logs = await shell_run_command.fn(
            command=command, env=shell_env, return_all=True
        )
    except RuntimeError as e:
        # Ignoring the Runtime Error with code 2 that is raised
        #   when the soda test runs successfully but the check fails
        #   causing the flow to break.
        if not str(e).startswith("Command failed with exit code 2:"):
            raise e

    if return_scan_result_file_content is True:
        # Get logs from scan result file
        with open(scan_results_file, "r") as f:
            soda_logs = json.load(f)

    return soda_logs