Skip to content

handlers

importing.signals.handlers ¤

Attributes¤

User = get_user_model() module-attribute ¤

Classes¤

DataImport ¤

Bases: PermissionsBase

Model to store the data imports.

This model stores the data imports, which are files with data that are uploaded to the system. The data is then processed asynchronously and stored in the database.

Attributes:

Name Type Description
station ForeignKey

Station to which the data belongs.

format ForeignKey

Format of the data.

rawfile FileField

File with the data to be imported.

date DateTimeField

Date of submission of the data.

start_date DateTimeField

Start date of the data.

end_date DateTimeField

End date of the data.

records IntegerField

Number of records in the data.

observations TextField

Notes or observations about the data.

status TextField

Status of the import.

log TextField

Log of the data ingestion, indicating any errors.

reprocess BooleanField

If checked, the data will be reprocessed.

Functions¤
clean() ¤

Validate information and uploads the measurement data.

Source code in importing/models.py
104
105
106
107
108
109
110
111
112
113
114
115
116
def clean(self) -> None:
    """Validate information and uploads the measurement data."""
    tz = self.station.timezone
    if not tz:
        raise ValidationError("Station must have a timezone set.")

    # If the file has changed, we reprocess the data
    if self.pk and self.rawfile != self.__class__.objects.get(pk=self.pk).rawfile:
        self.reprocess = True

    if self.reprocess:
        self.status = "N"
        self.reprocess = False

Functions¤

ingest_data(data_import_pk) ¤

Initiate the ingestion of data into the DB.

If the status of the data import is "not queued", the request is processed. The data loaded and saved to the database. The status is updated to completed or failed depending on the outcome.

Source code in importing/tasks.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@on_commit_task()
def ingest_data(data_import_pk: int) -> None:
    """Initiate the ingestion of data into the DB.

    If the status of the data import is "not queued", the request is processed. The data
    loaded and saved to the database. The status is updated to completed or failed
    depending on the outcome.
    """
    data_import = DataImport.objects.get(pk=data_import_pk)
    if data_import.status != "N":
        return

    data_import.status = "Q"
    data_import.save()

    try:
        getLogger("huey").info("Ingesting data for %s", data_import)
        data_import.start_date, data_import.end_date, data_import.records = (
            save_temp_data_to_permanent(data_import)
        )
        update_variables_for_station(data_import.station.station_code)
        data_import.status = "C"
        data_import.log = "Data ingestion completed successfully"
        getLogger("huey").info("Data ingestion for %s completed", data_import)
    except Exception as e:
        data_import.status = "F"
        data_import.log = str(e)
        getLogger("huey").exception("Error ingesting data for %s", data_import)
    finally:
        data_import.save()
        clear_cache()

process_data_ingestion(sender, instance, **kwargs) ¤

Schedules the data ingestion task.

Source code in importing/signals/handlers.py
19
20
21
22
@receiver(post_save, sender=DataImport)
def process_data_ingestion(sender, instance: PermissionsBase, **kwargs):
    """Schedules the data ingestion task."""
    ingest_data(instance.pk)

set_model_permissions(sender, **kwargs) ¤

Set model-level permissions.

Source code in importing/signals/handlers.py
25
26
27
28
@receiver(post_migrate)
def set_model_permissions(sender, **kwargs):
    """Set model-level permissions."""
    DataImport.set_model_permissions()

set_object_permissions(sender, instance, **kwargs) ¤

Set object-level permissions.

Source code in importing/signals/handlers.py
13
14
15
16
@receiver(post_save, sender=DataImport)
def set_object_permissions(sender, instance: PermissionsBase, **kwargs):
    """Set object-level permissions."""
    instance.set_object_permissions()