Skip to content

tasks

importing.tasks ¤

Classes¤

DataImport ¤

Bases: PermissionsBase

Model to store the data imports.

This model stores the data imports, which are, often, files with data that are uploaded to the system. The data is then processed asynchronously and stored in the database.

Attributes:

Name Type Description
station ForeignKey

Station to which the data belongs.

format ForeignKey

Format of the data.

rawfile FileField

File with the data to be imported.

date DateTimeField

Date of submission of the data.

start_date DateTimeField

Start date of the data.

end_date DateTimeField

End date of the data.

records IntegerField

Number of records in the data.

observations TextField

Notes or observations about the data.

status TextField

Status of the import.

log TextField

Log of the data ingestion, indicating any errors.

Functions¤
clean() ¤

Validate information and uploads the measurement data.

Source code in importing/models.py
138
139
140
141
142
143
144
145
def clean(self) -> None:
    """Validate information and uploads the measurement data."""
    tz = self.station.timezone
    if not tz:
        raise ValidationError("Station must have a timezone set.")

    if self.origin.origin == "Thingsboard" and not self.format.thingsboard:
        raise ValidationError("Ensure a Thingsboard-specific format is specified.")

Functions¤

clear_cache() ¤

Clear the cache.

Source code in importing/tasks.py
44
45
46
47
48
49
@task()
def clear_cache() -> None:
    """Clear the cache."""
    from measurement.reporting import get_report_data_from_db

    get_report_data_from_db.cache_clear()

ingest_data(data_import_pk) ¤

Initiate the ingestion of data into the DB.

If the status of the data import is "not queued", the request is processed. The data loaded and saved to the database. The status is updated to completed or failed depending on the outcome.

Source code in importing/tasks.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@on_commit_task()
def ingest_data(data_import_pk: int) -> None:
    """Initiate the ingestion of data into the DB.

    If the status of the data import is "not queued", the request is processed. The data
    loaded and saved to the database. The status is updated to completed or failed
    depending on the outcome.
    """
    data_import = DataImport.objects.get(pk=data_import_pk)
    if data_import.status != "N":
        return

    data_import.status = "Q"
    data_import.save()

    try:
        getLogger("huey").info("Ingesting data for %s", data_import)
        data_import.start_date, data_import.end_date, data_import.records = (
            save_temp_data_to_permanent(data_import)
        )
        update_variables_for_station(data_import.station.station_code)
        data_import.status = "C"
        data_import.log = "Data ingestion completed successfully"
        getLogger("huey").info("Data ingestion for %s completed", data_import)
    except Exception as e:
        data_import.status = "F"
        data_import.log = str(e)
        getLogger("huey").exception("Error ingesting data for %s", data_import)
    finally:
        data_import.save()
        clear_cache()

save_temp_data_to_permanent(data_import) ¤

Function to pass the temporary import to the final table.

This function carries out the following steps: - Bulk delete of existing data between two times on a given measurement table for the station in question. - Bulk create to add the new data from the uploaded file.

Parameters:

Name Type Description Default
data_import DataImport

The DataImport object.

required

Returns:

Type Description
tuple[datetime, datetime, int]

A tuple containing the start date, end date and number of records inserted.

Source code in importing/functions.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
def save_temp_data_to_permanent(
    data_import: DataImport,
) -> tuple[datetime, datetime, int]:
    """Function to pass the temporary import to the final table.

    This function carries out the following steps:
    - Bulk delete of existing data between two times on a given measurement table for
    the station in question.
    - Bulk create to add the new data from the uploaded file.

    Args:
        data_import: The DataImport object.

    Returns:
        A tuple containing the start date, end date and number of records inserted.
    """
    # Delete existing measurements and reports for the same data_import_id
    Measurement.objects.filter(data_import_id=data_import.data_import_id).delete()
    Report.objects.filter(data_import_id=data_import.data_import_id).delete()

    start_date, end_date, all_data = construct_matrix(data_import)

    if not all_data:
        msg = "No data to import. Is the chosen format correct?"
        raise ValueError(msg)

    must_cols = ["date", "value"]
    num_records = len(all_data[0][1])
    for variable_id, table in all_data:
        cols = [
            c for c in table.columns if c in Measurement._meta.fields or c in must_cols
        ]
        table = (
            table[cols]
            .dropna(axis=0, subset=must_cols)
            .rename(columns={"date": "time"})
        )

        # Delete existing data between the date ranges. Needed for data not linked
        # to a data_import_id. Both measurements and reports are deleted.
        station_id = data_import.station.station_id
        Measurement.timescale.filter(
            time__range=[start_date, end_date],
            station_id=station_id,
            variable_id=variable_id,
        ).delete()
        Report.objects.filter(
            time__range=[start_date, end_date],
            station_id=station_id,
            variable_id=variable_id,
        ).delete()

        # Add data to the database in batches
        for start in range(0, len(table), settings.IMPORT_BATCH_SIZE):
            batch = table.iloc[start : start + settings.IMPORT_BATCH_SIZE]

            model_instances = []
            for row in batch.itertuples(index=False):
                instance = Measurement(
                    **row._asdict(),
                    station_id=station_id,
                    variable_id=variable_id,
                    data_import_id=data_import.data_import_id,
                )
                instance.clean()
                model_instances.append(instance)

            # WARNING: This is a bulk insert, so it will not call the save()
            # method nor send the pre_save or post_save signals for each instance.
            Measurement.objects.bulk_create(model_instances)

    return start_date, end_date, num_records

update_variables_for_station(*station_codes) ¤

Update the variables for the given station codes.

The variables are updated based on the measurements associated with the station. The variables are saved as a comma-separated string in the variables field of the station model.

Parameters:

Name Type Description Default
station_codes tuple[str]

Station codes for which to update the variables. If not provided, all station codes with measurements are considered.

()
Source code in station/functions.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def update_variables_for_station(*station_codes) -> None:
    """Update the variables for the given station codes.

    The variables are updated based on the measurements associated with the station.
    The variables are saved as a comma-separated string in the variables field of the
    station model.

    Args:
        station_codes (tuple[str]): Station codes for which to update the variables.
            If not provided, all station codes with measurements are considered.
    """

    # We get the station codes from the Measurement model if not provided
    # Only station codes with measurements are considered
    station_codes = (
        station_codes
        or Measurement.objects.values_list(
            "station__station_code", flat=True
        ).distinct()
    )

    # Get the variables for each station and save them as a comma-separated string
    for station_code in station_codes:
        variables = (
            Measurement.objects.filter(station__station_code=station_code)
            .values_list("variable__variable_code", flat=True)
            .distinct()
        )
        if variables:
            station = Station.objects.get(station_code=station_code)
            station.variables = variables = ",".join(variables)
            station.full_clean()
            station.save()