Skip to content

tasks

importing.tasks ¤

Classes¤

DataImport ¤

Bases: PermissionsBase

Model to store the data imports.

This model stores the data imports, which are files with data that are uploaded to the system. The data is then processed asynchronously and stored in the database.

Attributes:

Name Type Description
station ForeignKey

Station to which the data belongs.

format ForeignKey

Format of the data.

rawfile FileField

File with the data to be imported.

date DateTimeField

Date of submission of the data.

start_date DateTimeField

Start date of the data.

end_date DateTimeField

End date of the data.

records IntegerField

Number of records in the data.

observations TextField

Notes or observations about the data.

status TextField

Status of the import.

log TextField

Log of the data ingestion, indicating any errors.

reprocess BooleanField

If checked, the data will be reprocessed.

Functions¤
clean() ¤

Validate information and uploads the measurement data.

Source code in importing/models.py
104
105
106
107
108
109
110
111
112
def clean(self) -> None:
    """Validate information and uploads the measurement data."""
    tz = self.station.timezone
    if not tz:
        raise ValidationError("Station must have a timezone set.")

    if self.reprocess:
        self.status = "N"
        self.reprocess = False

Functions¤

ingest_data(data_import_pk) ¤

Initiate the ingestion of data into the DB.

If the status of the data import is "not queued", the request is processed. The data loaded and saved to the database. The status is updated to completed or failed depending on the outcome.

Source code in importing/tasks.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@on_commit_task()
def ingest_data(data_import_pk: int) -> None:
    """Initiate the ingestion of data into the DB.

    If the status of the data import is "not queued", the request is processed. The data
    loaded and saved to the database. The status is updated to completed or failed
    depending on the outcome.
    """
    data_import = DataImport.objects.get(pk=data_import_pk)
    if data_import.status != "N":
        return

    data_import.status = "Q"
    data_import.save()

    try:
        getLogger("huey").info("Ingesting data for %s", data_import)
        data_import.start_date, data_import.end_date, data_import.records = (
            save_temp_data_to_permanent(
                data_import.station, data_import.format, data_import.rawfile
            )
        )
        data_import.status = "C"
        data_import.log = "Data ingestion completed successfully"
        getLogger("huey").info("Data ingestion for %s completed", data_import)
    except Exception as e:
        data_import.status = "F"
        data_import.log = str(e)
        getLogger("huey").exception("Error ingesting data for %s", data_import)
    finally:
        data_import.save()

save_temp_data_to_permanent(station, file_format, file) ¤

Function to pass the temporary import to the final table.

Uses the data_import_temp object only to get all required information from its fields.

This function carries out the following steps:

  • Bulk delete of existing data between two times on a given measurement table for the station in question.
  • Bulk create to add the new data from the uploaded file.

Parameters:

Name Type Description Default
data_import_temp

DataImportTemp object.

required
Source code in importing/functions.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
def save_temp_data_to_permanent(
    station: Station, file_format: Format, file: FileField
) -> tuple[datetime, datetime, int]:
    """Function to pass the temporary import to the final table.

    Uses the data_import_temp object only to get all required information from its
    fields.

    This function carries out the following steps:

    - Bulk delete of existing data between two times on a given measurement table for
    the station in question.
    - Bulk create to add the new data from the uploaded file.

    Args:
        data_import_temp: DataImportTemp object.
    """

    all_data = construct_matrix(file, file_format, station)
    if not all_data:
        msg = "No data to import. Is the chosen format correct?"
        getLogger().error(msg)
        raise ValidationError(msg)

    must_cols = ["station_id", "variable_id", "date", "value"]
    start_date = all_data[0]["date"].iloc[0]
    end_date = all_data[0]["date"].iloc[-1]
    num_records = len(all_data[0])
    for table in all_data:
        cols = [
            c for c in table.columns if c in Measurement._meta.fields or c in must_cols
        ]
        table = (
            table[cols]
            .dropna(axis=0, subset=must_cols)
            .rename(columns={"date": "time"})
        )
        records = table.to_dict("records")
        variable_id = table["variable_id"].iloc[0]

        # Delete existing data between the date ranges
        Measurement.timescale.filter(
            time__range=[start_date, end_date],
            station_id=station.station_id,
            variable_id=variable_id,
        ).delete()

        # Bulk add new data
        model_instances = [Measurement(**record) for record in records]

        # Call the clean method
        map(lambda x: x.clean(), model_instances)

        # WARNING: This is a bulk insert, so it will not call the save()
        # method nor send the pre_save or post_save signals for each instance.
        Measurement.objects.bulk_create(model_instances)

    return start_date, end_date, num_records