Skip to content

tasks

importing.tasks ¤

Classes¤

DataImport ¤

Bases: PermissionsBase

Model to store the data imports.

This model stores the data imports, which are files with data that are uploaded to the system. The data is then processed asynchronously and stored in the database.

Attributes:

Name Type Description
station ForeignKey

Station to which the data belongs.

format ForeignKey

Format of the data.

rawfile FileField

File with the data to be imported.

date DateTimeField

Date of submission of the data.

start_date DateTimeField

Start date of the data.

end_date DateTimeField

End date of the data.

records IntegerField

Number of records in the data.

observations TextField

Notes or observations about the data.

status TextField

Status of the import.

log TextField

Log of the data ingestion, indicating any errors.

reprocess BooleanField

If checked, the data will be reprocessed.

Functions¤
clean() ¤

Validate information and uploads the measurement data.

Source code in importing\models.py
104
105
106
107
108
109
110
111
112
113
114
115
116
def clean(self) -> None:
    """Validate information and uploads the measurement data."""
    tz = self.station.timezone
    if not tz:
        raise ValidationError("Station must have a timezone set.")

    # If the file has changed, we reprocess the data
    if self.pk and self.rawfile != self.__class__.objects.get(pk=self.pk).rawfile:
        self.reprocess = True

    if self.reprocess:
        self.status = "N"
        self.reprocess = False

Functions¤

clear_cache() ¤

Clear the cache.

Source code in importing\tasks.py
44
45
46
47
48
49
@task()
def clear_cache() -> None:
    """Clear the cache."""
    from measurement.reporting import get_report_data_from_db

    get_report_data_from_db.cache_clear()

ingest_data(data_import_pk) ¤

Initiate the ingestion of data into the DB.

If the status of the data import is "not queued", the request is processed. The data loaded and saved to the database. The status is updated to completed or failed depending on the outcome.

Source code in importing\tasks.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@on_commit_task()
def ingest_data(data_import_pk: int) -> None:
    """Initiate the ingestion of data into the DB.

    If the status of the data import is "not queued", the request is processed. The data
    loaded and saved to the database. The status is updated to completed or failed
    depending on the outcome.
    """
    data_import = DataImport.objects.get(pk=data_import_pk)
    if data_import.status != "N":
        return

    data_import.status = "Q"
    data_import.save()

    try:
        getLogger("huey").info("Ingesting data for %s", data_import)
        data_import.start_date, data_import.end_date, data_import.records = (
            save_temp_data_to_permanent(data_import)
        )
        update_variables_for_station(data_import.station.station_code)
        data_import.status = "C"
        data_import.log = "Data ingestion completed successfully"
        getLogger("huey").info("Data ingestion for %s completed", data_import)
    except Exception as e:
        data_import.status = "F"
        data_import.log = str(e)
        getLogger("huey").exception("Error ingesting data for %s", data_import)
    finally:
        data_import.save()
        clear_cache()

save_temp_data_to_permanent(data_import) ¤

Function to pass the temporary import to the final table.

Uses the data_import_temp object only to get all required information from its fields.

This function carries out the following steps:

  • Bulk delete of existing data between two times on a given measurement table for the station in question.
  • Bulk create to add the new data from the uploaded file.

Parameters:

Name Type Description Default
data_import_temp

DataImportTemp object.

required
Source code in importing\functions.py
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
def save_temp_data_to_permanent(
    data_import: DataImport,
) -> tuple[datetime, datetime, int]:
    """Function to pass the temporary import to the final table.

    Uses the data_import_temp object only to get all required information from its
    fields.

    This function carries out the following steps:

    - Bulk delete of existing data between two times on a given measurement table for
    the station in question.
    - Bulk create to add the new data from the uploaded file.

    Args:
        data_import_temp: DataImportTemp object.
    """
    station = data_import.station
    file_format = data_import.format
    file = data_import.rawfile

    # Delete exiting measurements and reports for the same data_import_id
    Measurement.objects.filter(data_import_id=data_import.data_import_id).delete()
    Report.objects.filter(data_import_id=data_import.data_import_id).delete()

    all_data = construct_matrix(file, file_format, station, data_import)
    if not all_data:
        msg = "No data to import. Is the chosen format correct?"
        raise ValueError(msg)

    must_cols = ["data_import_id", "station_id", "variable_id", "date", "value"]
    start_date = all_data[0]["date"].iloc[0]
    end_date = all_data[0]["date"].iloc[-1]
    num_records = len(all_data[0])
    for table in all_data:
        cols = [
            c for c in table.columns if c in Measurement._meta.fields or c in must_cols
        ]
        table = (
            table[cols]
            .dropna(axis=0, subset=must_cols)
            .rename(columns={"date": "time"})
        )
        records = table.to_dict("records")
        variable_id = table["variable_id"].iloc[0]

        # Delete existing data between the date ranges. Needed for data not linked
        # to a data_import_id. Both measurements and reports are deleted.
        Measurement.timescale.filter(
            time__range=[start_date, end_date],
            station_id=station.station_id,
            variable_id=variable_id,
        ).delete()
        Report.objects.filter(
            time__range=[start_date, end_date],
            station_id=station.station_id,
            variable_id=variable_id,
        ).delete()

        # Bulk add new data
        def create_and_clean(**record):
            instance = Measurement(**record)
            instance.clean()
            return instance

        model_instances = [create_and_clean(**record) for record in records]

        # WARNING: This is a bulk insert, so it will not call the save()
        # method nor send the pre_save or post_save signals for each instance.
        Measurement.objects.bulk_create(model_instances)

    return start_date, end_date, num_records

update_variables_for_station(*station_codes) ¤

Update the variables for the given station codes.

The variables are updated based on the measurements associated with the station. The variables are saved as a comma-separated string in the variables field of the station model.

Parameters:

Name Type Description Default
station_codes tuple[str]

Station codes for which to update the variables. If not provided, all station codes with measurements are considered.

()
Source code in station\functions.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def update_variables_for_station(*station_codes) -> None:
    """Update the variables for the given station codes.

    The variables are updated based on the measurements associated with the station.
    The variables are saved as a comma-separated string in the variables field of the
    station model.

    Args:
        station_codes (tuple[str]): Station codes for which to update the variables.
            If not provided, all station codes with measurements are considered.
    """

    # We get the station codes from the Measurement model if not provided
    # Only station codes with measurements are considered
    station_codes = (
        station_codes
        or Measurement.objects.values_list(
            "station__station_code", flat=True
        ).distinct()
    )

    # Get the variables for each station and save them as a comma-separated string
    for station_code in station_codes:
        variables = (
            Measurement.objects.filter(station__station_code=station_code)
            .values_list("variable__variable_code", flat=True)
            .distinct()
        )
        if variables:
            station = Station.objects.get(station_code=station_code)
            station.variables = variables = ",".join(variables)
            station.full_clean()
            station.save()