Skip to content

functions

importing.functions ¤

Attributes¤

one_second = np.timedelta64(1, 's') module-attribute ¤

unix_epoch = np.datetime64(0, 's') module-attribute ¤

Classes¤

Classification ¤

Bases: PermissionsBase

Contains instructions on how to classify the data into a specific variable.

In particular, it links a format to a variable, and provides the column indices for the value, maximum, and minimum columns, as well as the validator columns. It also contains information on whether the data is accumulated, incremental, and the resolution of the data.

Attributes:

Name Type Description
cls_id AutoField

Primary key.

format ForeignKey

The format of the data file.

variable ForeignKey

The variable to which the data belongs.

value PositiveSmallIntegerField

Index of the value column, starting in 0.

maximum PositiveSmallIntegerField

Index of the maximum value column, starting in 0.

minimum PositiveSmallIntegerField

Index of the minimum value column, starting in 0.

value_validator_column PositiveSmallIntegerField

Index of the value validator column, starting in 0.

value_validator_text CharField

Value validator text.

maximum_validator_column PositiveSmallIntegerField

Index of the maximum value validator column, starting in 0.

maximum_validator_text CharField

Maximum value validator text.

minimum_validator_column PositiveSmallIntegerField

Index of the minimum value validator column, starting in 0.

minimum_validator_text CharField

Minimum value validator text.

accumulate PositiveSmallIntegerField

If set to a number of minutes, the data will be accumulated over that period.

resolution DecimalField

Resolution of the data. Only used if it is to be accumulated.

incremental BooleanField

Whether the data is an incremental counter. If it is, any value below the previous one will be removed.

decimal_comma BooleanField

Whether the data uses a comma as a decimal separator.

Functions¤
__str__() ¤

Return the string representation of the object.

Source code in formatting/models.py
419
420
421
def __str__(self) -> str:
    """Return the string representation of the object."""
    return str(self.cls_id)
clean() ¤

Validate the model instance.

It checks that the column indices are different, and that the accumulation period is greater than zero if it is set. It also checks that the resolution is set if the data is accumulated.

Source code in formatting/models.py
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
def clean(self) -> None:
    """Validate the model instance.

    It checks that the column indices are different, and that the accumulation
    period is greater than zero if it is set. It also checks that the resolution is
    set if the data is accumulated.
    """
    if self.accumulate and self.resolution is None:
        raise ValidationError(
            {
                "resolution": "The resolution must be set if the data is "
                "accumulated."
            }
        )

    col_names = [
        "value",
        "maximum",
        "minimum",
        "value_validator_column",
        "maximum_validator_column",
        "minimum_validator_column",
    ]
    unique = defaultdict(list)
    for name in col_names:
        if getattr(self, name) is not None:
            unique[getattr(self, name)].append(name)
    for _, names in unique.items():
        if len(names) != 1:
            msg = "The columns must be different."
            raise ValidationError({field: msg for field in names})
get_absolute_url() ¤

Get the absolute URL of the object.

Source code in formatting/models.py
423
424
425
def get_absolute_url(self) -> str:
    """Get the absolute URL of the object."""
    return reverse("formatting:classification_detail", kwargs={"pk": self.pk})

DataImport ¤

Bases: PermissionsBase

Model to store the data imports.

This model stores the data imports, which are files with data that are uploaded to the system. The data is then processed asynchronously and stored in the database.

Attributes:

Name Type Description
station ForeignKey

Station to which the data belongs.

format ForeignKey

Format of the data.

rawfile FileField

File with the data to be imported.

date DateTimeField

Date of submission of the data.

start_date DateTimeField

Start date of the data.

end_date DateTimeField

End date of the data.

records IntegerField

Number of records in the data.

observations TextField

Notes or observations about the data.

status TextField

Status of the import.

log TextField

Log of the data ingestion, indicating any errors.

reprocess BooleanField

If checked, the data will be reprocessed.

Functions¤
clean() ¤

Validate information and uploads the measurement data.

Source code in importing/models.py
104
105
106
107
108
109
110
111
112
113
114
115
116
def clean(self) -> None:
    """Validate information and uploads the measurement data."""
    tz = self.station.timezone
    if not tz:
        raise ValidationError("Station must have a timezone set.")

    # If the file has changed, we reprocess the data
    if self.pk and self.rawfile != self.__class__.objects.get(pk=self.pk).rawfile:
        self.reprocess = True

    if self.reprocess:
        self.status = "N"
        self.reprocess = False

Format ¤

Bases: PermissionsBase

Details of the data file format, describing how to read the file.

It combines several properties, such as the file extension, the delimiter, the date and time formats, and the column indices for the date and time columns, instructing how to read the data file and parse the dates. It is mostly used to ingest data from text files, like CSV.

Attributes:

Name Type Description
format_id AutoField

Primary key.

name CharField

Short name of the format entry.

description TextField

Description of the format.

extension ForeignKey

The extension of the data file.

delimiter ForeignKey

The delimiter between columns in the data file. Only required for text files.

first_row PositiveSmallIntegerField

Index of the first row with data, starting in 0.

footer_rows PositiveSmallIntegerField

Number of footer rows to be ignored at the end.

date ForeignKey

Format for the date column. Only required for text files.

date_column PositiveSmallIntegerField

Index of the date column, starting in 0.

time ForeignKey

Format for the time column. Only required for text files.

time_column PositiveSmallIntegerField

Index of the time column, starting in 0.

Attributes¤
datetime_format: str property ¤

Obtain the datetime format string.

Functions¤
__str__() ¤

Return the string representation of the object.

Source code in formatting/models.py
253
254
255
def __str__(self) -> str:
    """Return the string representation of the object."""
    return str(self.name)
datetime_columns(delimiter) ¤

Column indices that correspond to the date and time columns in the dataset.

Parameters:

Name Type Description Default
delimiter str

The delimiter used to split the date and time codes.

required

Returns:

Type Description
list[int]

list[int]: A list of column indices.

Source code in formatting/models.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
def datetime_columns(self, delimiter: str) -> list[int]:
    """Column indices that correspond to the date and time columns in the dataset.

    Args:
        delimiter (str): The delimiter used to split the date and time codes.

    Returns:
        list[int]: A list of column indices.
    """
    date_items = self.date.code.split(delimiter)
    date_cols = list(range(self.date_column, self.date_column + len(date_items)))
    time_items = self.time.code.split(delimiter)
    time_cols = list(range(self.time_column, self.time_column + len(time_items)))
    return date_cols + time_cols
get_absolute_url() ¤

Get the absolute URL of the object.

Source code in formatting/models.py
257
258
259
def get_absolute_url(self) -> str:
    """Get the absolute URL of the object."""
    return reverse("formatting:format_detail", kwargs={"pk": self.pk})

Measurement ¤

Bases: MeasurementBase

Class to store the measurements and their validation status.

This class holds the value of a given variable and station at a specific time, as well as auxiliary information such as maximum and minimum values, depth and direction, for vector quantities. All of these have a raw version where a backup of the original data is kept, should this change at any point.

Flags to monitor its validation status, if the data is active (and therefore can be used for reporting) and if it has actually been used for that is also included.

Attributes:

Name Type Description
depth int

Depth of the measurement.

direction Decimal

Direction of the measurement, useful for vector quantities.

raw_value Decimal

Original value of the measurement.

raw_maximum Decimal

Original maximum value of the measurement.

raw_minimum Decimal

Original minimum value of the measurement.

raw_direction Decimal

Original direction of the measurement.

raw_depth int

Original depth of the measurement.

is_validated bool

Flag to indicate if the measurement has been validated.

is_active bool

Flag to indicate if the measurement is active. An inactive measurement is not used for reporting

Attributes¤
overwritten: bool property ¤

Indicates if any of the values associated to the entry have been overwritten.

Returns:

Name Type Description
bool bool

True if any raw field is different to the corresponding standard field.

raws: tuple[str, ...] property ¤

Return the raw fields of the measurement.

Returns:

Type Description
tuple[str, ...]

tuple[str]: Tuple with the names of the raw fields of the measurement.

Functions¤
clean() ¤

Check consistency of validation, reporting and backs-up values.

Source code in measurement/models.py
259
260
261
262
263
264
265
266
267
268
269
def clean(self) -> None:
    """Check consistency of validation, reporting and backs-up values."""
    # Check consistency of validation
    if not self.is_validated and not self.is_active:
        raise ValidationError("Only validated entries can be declared as inactive.")

    # Backup values to raws, if needed
    for r in self.raws:
        value = getattr(self, r.removeprefix("raw_"))
        if value and not getattr(self, r):
            setattr(self, r, value)

Report ¤

Bases: MeasurementBase

Holds the different reporting data.

It also keeps track of which data has already been used when creating the reports.

Attributes:

Name Type Description
report_type str

Type of report. It can be hourly, daily or monthly.

completeness Decimal

Completeness of the report. Eg. a daily report with 24 hourly measurements would have a completeness of 100%.

Functions¤
clean() ¤

Validate that the report type and use of the data is consistent.

Source code in measurement/models.py
147
148
149
150
151
152
153
154
155
156
def clean(self) -> None:
    """Validate that the report type and use of the data is consistent."""
    if self.report_type == ReportType.HOURLY:
        self.time = self.time.replace(minute=0, second=0, microsecond=0)
    elif self.report_type == ReportType.DAILY:
        self.time = self.time.replace(hour=0, minute=0, second=0, microsecond=0)
    elif self.report_type == ReportType.MONTLY:
        self.time = self.time.replace(
            day=1, hour=0, minute=0, second=0, microsecond=0
        )

Station ¤

Bases: PermissionsBase

Main representation of a station, including several metadata.

Attributes:

Name Type Description
visibility str

Visibility level of the object, including an "internal" option.

station_id int

Primary key.

station_code str

Unique code for the station.

station_name str

Brief description of the station.

station_type StationType

Type of the station.

country Country

Country where the station is located.

region Region

Region within the Country where the station is located.

ecosystem Ecosystem

Ecosystem associated with the station.

institution Institution

Institutional partner responsible for the station.

place_basin PlaceBasin

Place-Basin association.

station_state bool

Is the station operational?

timezone str

Timezone of the station.

station_latitude Decimal

Latitude of the station, in degrees [-90 to 90].

station_longitude Decimal

Longitude of the station, in degrees [-180 to 180].

station_altitude int

Altitude of the station.

influence_km Decimal

Area of influence in km2.

station_file ImageField

Photography of the station.

station_external bool

Is the station external?

variables str

Comma-separated list of variables measured by the station.

Attributes¤
variables_list: list[str] property ¤

Return the list of variables measured by the station.

Only variables with data in the database are returned.

Returns:

Type Description
list[str]

list[str]: List of variables measured by the station.

Functions¤
__str__() ¤

Return the station code.

Source code in station/models.py
458
459
460
def __str__(self) -> str:
    """Return the station code."""
    return str(self.station_code)
get_absolute_url() ¤

Return the absolute url of the station.

Source code in station/models.py
462
463
464
def get_absolute_url(self) -> str:
    """Return the absolute url of the station."""
    return reverse("station:station_detail", kwargs={"pk": self.pk})
set_object_permissions() ¤

Set object-level permissions.

This method is called by the save method of the model to set the object-level permissions based on the visibility level of the object. In addition to the standard permissions for the station, the view_measurements permission is set which controls who can view the measurements associated to the station.

Source code in station/models.py
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
def set_object_permissions(self) -> None:
    """Set object-level permissions.

    This method is called by the save method of the model to set the object-level
    permissions based on the visibility level of the object. In addition to the
    standard permissions for the station, the view_measurements permission is set
    which controls who can view the measurements associated to the station.
    """
    super().set_object_permissions()

    standard_group = Group.objects.get(name="Standard")
    anonymous_user = get_anonymous_user()

    # Assign view_measurements permission based on permissions level
    if self.visibility == "public":
        assign_perm("view_measurements", standard_group, self)
        assign_perm("view_measurements", anonymous_user, self)
        if self.owner:
            remove_perm("view_measurements", self.owner, self)
    elif self.visibility == "internal":
        assign_perm("view_measurements", standard_group, self)
        remove_perm("view_measurements", anonymous_user, self)
        if self.owner:
            remove_perm("view_measurements", self.owner, self)
    elif self.visibility == "private":
        remove_perm("view_measurements", standard_group, self)
        remove_perm("view_measurements", anonymous_user, self)
        if self.owner:
            assign_perm("view_measurements", self.owner, self)

Functions¤

construct_matrix(matrix_source, file_format, station, data_import) ¤

Construct the "matrix" or results table. Does various cleaning / simple transformations depending on the date format, type of data (accumulated, incremental...) and deals with NANs.

Parameters:

Name Type Description Default
matrix_source FileField

raw data file path

required
file_format Format

a formatting.Format object.

required

Returns: Dict of dataframes for results (one for each variable type in the raw data file). TODO: Probably refactor into smaller chunks.

Source code in importing/functions.py
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
def construct_matrix(
    matrix_source: FileField,
    file_format: Format,
    station: Station,
    data_import: DataImport,
) -> list[pd.DataFrame]:
    """Construct the "matrix" or results table. Does various cleaning / simple
    transformations depending on the date format, type of data (accumulated,
    incremental...) and deals with NANs.

    Args:
        matrix_source: raw data file path
        file_format: a formatting.Format object.
    Returns: Dict of dataframes for results (one for each variable type in the raw data
        file).
    TODO: Probably refactor into smaller chunks.
    """
    # Get the "preformatted matrix" sorted by date col
    matrix = read_data_to_import(matrix_source, file_format, station.timezone)
    # Find start and end dates from top and bottom row
    start_date = matrix["date"].iloc[0]
    end_date = matrix["date"].iloc[-1]

    classifications = list(Classification.objects.filter(format=file_format))

    if len(classifications) == 0:
        msg = "No classifications found for this format. Please add some."
        raise ValueError(msg)

    max_cols = max([c.value for c in classifications])
    ncols = len(matrix.columns)
    if max_cols >= ncols:
        msg = (
            f"The number of columns in the file {ncols} is less than the maximum column"
            f" number specified in the classifications {max_cols}. Please check the "
            "file and the classifications for this format."
        )
        raise ValueError(msg)

    to_ingest = []
    for classification in classifications:
        columns = []
        columns.append(("date", "date"))

        # Validation of values
        columns.append((classification.value, "value"))
        if classification.value_validator_column:
            matrix.loc[
                matrix[classification.value_validator_column]
                != classification.value_validator_text,
                classification.value,
            ] = np.nan

        # Validation of maximum
        if classification.maximum:
            columns.append((classification.maximum, "maximum"))
            if classification.maximum_validator_column:
                matrix.loc[
                    matrix[classification.maximum_validator_column]
                    != classification.maximum_validator_text,
                    classification.maximum,
                ] = np.nan

        # Validation of minimum
        if classification.minimum:
            columns.append((classification.minimum, "minimum"))
            if classification.minimum_validator_column:
                matrix.loc[
                    matrix[classification.minimum_validator_column]
                    != classification.minimum_validator_text,
                    classification.minimum,
                ] = np.nan

        data = matrix.loc[:, [v[0] for v in columns]].rename(columns=dict(columns))

        # More data cleaning, column by column, deal with decimal comma vs point.
        for col in data:
            if col == "date":
                continue
            if classification.decimal_comma:
                data[col] = pd.Series(
                    [standardise_float_comma(val) for val in data[col].values],
                    index=matrix.index,
                )
            else:
                data[col] = pd.Series(
                    [standardise_float(val) for val in data[col].values],
                    index=matrix.index,
                )

        # Eliminate NAs
        data_columns = [column[1] for column in columns if column[1] != "date"]
        data = data.dropna(axis=0, how="all", subset=data_columns)
        if len(data) == 0:
            raise ValueError(
                f"Importing variable {classification.variable.name} from "
                f"column {classification.value} (starting in 0) results in no valid "
                "data."
            )

        # Deal with cumulative and incremental data
        if acc := classification.accumulate:
            # assumes that if incremental it only works with VALUE
            # (MAXIMUM and MINIMUM are excluded)
            if classification.incremental:
                data["value"] = data["value"].diff()
                data.loc[data["value"] < 0, "value"] = np.nan
                data = data.dropna()
            data["date"] = data["date"].apply(
                lambda x: x.replace(
                    minute=int(x.minute / acc) * acc,
                    second=0,
                    microsecond=0,
                    nanosecond=0,
                )
            )
            data["date"] = data["date"] + pd.Timedelta(minutes=acc)
            count = data.groupby("date")["value"].sum().to_frame()
            data = count["value"] * float(classification.resolution)

            start_date = start_date.replace(
                minute=int(start_date.minute / acc) * acc,
                second=0,
                microsecond=0,
                nanosecond=0,
            ) + pd.Timedelta(minutes=acc)
            end_date = end_date.replace(
                minute=int(end_date.minute / acc) * acc,
                second=0,
                microsecond=0,
                nanosecond=0,
            ) + pd.Timedelta(minutes=acc)
            table = pd.date_range(
                start_date, end_date, freq=f"{acc}min", name="date"
            ).to_frame()
            data = pd.concat([table, data], axis=1)
            data = data.fillna(0)

        # Deal with non cumulative but incremental data
        else:
            if classification.incremental:
                data["value"] = data["value"].diff()
                data.loc[data["value"] < 0, "value"] = np.nan
                data = data.dropna()
            if classification.resolution:
                data["value"] = data["value"] * float(classification.resolution)

        data["station_id"] = station.station_id
        data["variable_id"] = classification.variable.variable_id
        data["data_import_id"] = data_import.data_import_id

        # Add the data to the main list
        to_ingest.append(data)

    return to_ingest

get_last_uploaded_date(station_id, var_code) ¤

Get the last date of uploaded data for a given station ID and variable code.

Parameters:

Name Type Description Default
station_id int

The station ID.

required
var_code str

The variable code.

required

Returns:

Type Description
datetime | None

The last date that data was uploaded for the given station ID and variable code

datetime | None

or None if no data was found.

Source code in importing/functions.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def get_last_uploaded_date(station_id: int, var_code: str) -> datetime | None:
    """Get the last date of uploaded data for a given station ID and variable code.

    Args:
        station_id: The station ID.
        var_code: The variable code.

    Returns:
        The last date that data was uploaded for the given station ID and variable code
        or None if no data was found.
    """
    query = (
        Measurement.timescale.filter(
            station__station_id=station_id, variable__variable_code=var_code
        )
        .order_by("time")
        .last()
    )
    if query:
        return query.time

    return None

process_datetime_columns(data, file_format, timezone) ¤

Process the datetime columns in a DataFrame.

Parameters:

Name Type Description Default
data DataFrame

The DataFrame to process.

required
file_format Format

The file format.

required
timezone str

The timezone to use.

required

Returns:

Type Description
DataFrame

The DataFrame with the datetime columns processed.

Source code in importing/functions.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def process_datetime_columns(
    data: pd.DataFrame, file_format: Format, timezone: str
) -> pd.DataFrame:
    """Process the datetime columns in a DataFrame.

    Args:
        data: The DataFrame to process.
        file_format: The file format.
        timezone: The timezone to use.

    Returns:
        The DataFrame with the datetime columns processed.
    """
    tz = zoneinfo.ZoneInfo(timezone)
    dt_format = file_format.datetime_format
    if file_format.date_column == file_format.time_column:
        data["date"] = pd.Series(
            [
                standardise_datetime(row, dt_format).replace(tzinfo=tz)
                for row in data.iloc[:, file_format.date_column].values
            ],
            index=data.index,
        )
    else:
        cols = file_format.datetime_columns(file_format.delimiter.character)
        data["datetime_str"] = data.iloc[:, cols].agg(
            lambda row: " ".join([str(r) for r in row]), axis=1
        )
        data["date"] = data["datetime_str"].apply(
            lambda row: standardise_datetime(row, dt_format).replace(tzinfo=tz)
        )
        data.drop(columns=["datetime_str"], inplace=True)

    return data.sort_values("date").reset_index(drop=True)

read_data_to_import(source_file, file_format, timezone) ¤

Reads the data from file into a pandas DataFrame.

Works out what sort of file is being read and adds standardised columns for datetime.

Parameters:

Name Type Description Default
source_file Any

Stream of data to be parsed.

required
file_format Format

Format of the data to be parsed.

required
timezone str

Timezone name, eg. 'America/Chicago'.

required

Returns:

Type Description

Pandas.DataFrame with raw data read and extra column(s) for datetime

correctly parsed.

Source code in importing/functions.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def read_data_to_import(source_file: Any, file_format: Format, timezone: str):
    """Reads the data from file into a pandas DataFrame.

    Works out what sort of file is being read and adds standardised columns for
    datetime.

    Args:
        source_file: Stream of data to be parsed.
        file_format: Format of the data to be parsed.
        timezone: Timezone name, eg. 'America/Chicago'.

    Returns:
        Pandas.DataFrame with raw data read and extra column(s) for datetime
        correctly parsed.
    """
    if file_format.extension.value in ["xlsx", "xlx"]:
        data = read_file_excel(source_file, file_format)
    else:
        data = read_file_csv(source_file, file_format)

    return process_datetime_columns(data, file_format, timezone)

read_file_csv(source_file, file_format) ¤

Reads a CSV file into a pandas DataFrame.

Parameters:

Name Type Description Default
source_file Any

Stream of data to be parsed.

required
file_format Format

The file format.

required

Returns:

Type Description
DataFrame

A pandas DataFrame containing the data from the file.

Source code in importing/functions.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def read_file_csv(source_file: Any, file_format: Format) -> pd.DataFrame:
    """Reads a CSV file into a pandas DataFrame.

    Args:
        source_file: Stream of data to be parsed.
        file_format: The file format.

    Returns:
        A pandas DataFrame containing the data from the file.
    """
    firstline = file_format.first_row if file_format.first_row else 0
    skipfooter = file_format.footer_rows if file_format.footer_rows else 0
    delimiter = file_format.delimiter.character

    skiprows: int | list[int] = firstline
    if not isinstance(source_file, str | Path):
        # The file was uploaded as binary
        lines = len(source_file.readlines())
        source_file.seek(0)
        skiprows = [i for i in range(0, firstline)] + [
            i - 1 for i in range(lines, lines - skipfooter, -1)
        ]
        skipfooter = 0

    # Deal with the delimiter
    if "\\x" in delimiter:
        delim_hexcode = delimiter.replace("\\x", "")
        delim_intcode = eval("0x" + delim_hexcode)
        delimiter = chr(delim_intcode)
    elif delimiter == " ":
        delimiter = "\s+"  # This is a regex for whitespace

    return pd.read_csv(
        source_file,
        sep=delimiter,
        header=None,
        index_col=False,
        skiprows=skiprows,
        skipfooter=skipfooter,
        encoding="ISO-8859-1",
    )

read_file_excel(file_path, file_format) ¤

Reads an Excel file into a pandas DataFrame.

Parameters:

Name Type Description Default
file_path str

The path to the file to be read.

required
file_format Format

The file format.

required

Returns:

Type Description
DataFrame

A pandas DataFrame containing the data from the file.

Source code in importing/functions.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def read_file_excel(file_path: str, file_format: Format) -> pd.DataFrame:
    """Reads an Excel file into a pandas DataFrame.

    Args:
        file_path: The path to the file to be read.
        file_format: The file format.

    Returns:
        A pandas DataFrame containing the data from the file.
    """
    firstline = file_format.first_row if file_format.first_row else 0
    skipfooter = file_format.footer_rows if file_format.footer_rows else 0
    return pd.read_excel(
        file_path,
        header=None,
        skiprows=firstline - 1,
        skipfooter=skipfooter,
        engine=None,
        error_bad_lines=False,
        index_col=None,
    )

save_temp_data_to_permanent(data_import) ¤

Function to pass the temporary import to the final table.

Uses the data_import_temp object only to get all required information from its fields.

This function carries out the following steps:

  • Bulk delete of existing data between two times on a given measurement table for the station in question.
  • Bulk create to add the new data from the uploaded file.

Parameters:

Name Type Description Default
data_import_temp

DataImportTemp object.

required
Source code in importing/functions.py
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
def save_temp_data_to_permanent(
    data_import: DataImport,
) -> tuple[datetime, datetime, int]:
    """Function to pass the temporary import to the final table.

    Uses the data_import_temp object only to get all required information from its
    fields.

    This function carries out the following steps:

    - Bulk delete of existing data between two times on a given measurement table for
    the station in question.
    - Bulk create to add the new data from the uploaded file.

    Args:
        data_import_temp: DataImportTemp object.
    """
    station = data_import.station
    file_format = data_import.format
    file = data_import.rawfile

    # Delete exiting measurements and reports for the same data_import_id
    Measurement.objects.filter(data_import_id=data_import.data_import_id).delete()
    Report.objects.filter(data_import_id=data_import.data_import_id).delete()

    all_data = construct_matrix(file, file_format, station, data_import)
    if not all_data:
        msg = "No data to import. Is the chosen format correct?"
        raise ValueError(msg)

    must_cols = ["data_import_id", "station_id", "variable_id", "date", "value"]
    start_date = all_data[0]["date"].iloc[0]
    end_date = all_data[0]["date"].iloc[-1]
    num_records = len(all_data[0])
    for table in all_data:
        cols = [
            c for c in table.columns if c in Measurement._meta.fields or c in must_cols
        ]
        table = (
            table[cols]
            .dropna(axis=0, subset=must_cols)
            .rename(columns={"date": "time"})
        )
        records = table.to_dict("records")
        variable_id = table["variable_id"].iloc[0]

        # Delete existing data between the date ranges. Needed for data not linked
        # to a data_import_id. Both measurements and reports are deleted.
        Measurement.timescale.filter(
            time__range=[start_date, end_date],
            station_id=station.station_id,
            variable_id=variable_id,
        ).delete()
        Report.objects.filter(
            time__range=[start_date, end_date],
            station_id=station.station_id,
            variable_id=variable_id,
        ).delete()

        # Bulk add new data
        def create_and_clean(**record):
            instance = Measurement(**record)
            instance.clean()
            return instance

        model_instances = [create_and_clean(**record) for record in records]

        # WARNING: This is a bulk insert, so it will not call the save()
        # method nor send the pre_save or post_save signals for each instance.
        Measurement.objects.bulk_create(model_instances)

    return start_date, end_date, num_records

standardise_datetime(date_time, datetime_format) ¤

Returns a datetime object in the case that date_time is not already in that form.

Parameters:

Name Type Description Default
date_time Any

The date_time to be transformed.

required
datetime_format str

The format that date_time is in (to be passed to datetime.strptime()).

required

Returns:

Type Description
datetime

A datetime object or None if date_time is not in a recognised format.

Source code in importing/functions.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
def standardise_datetime(date_time: Any, datetime_format: str) -> datetime:
    """Returns a datetime object in the case that date_time is not already in that form.

    Args:
        date_time: The date_time to be transformed.
        datetime_format: The format that date_time is in (to be passed to
            datetime.strptime()).

    Returns:
        A datetime object or None if date_time is not in a recognised format.
    """
    if isinstance(date_time, datetime):
        return date_time
    elif isinstance(date_time, np.datetime64):
        date_time = datetime.fromtimestamp(
            float((date_time - unix_epoch) / one_second), tz=zoneinfo.ZoneInfo("UTC")
        )
        return date_time
    elif isinstance(date_time, str):
        pass
    elif isinstance(date_time, list):
        date_time = " ".join(date_time)
    elif isinstance(date_time, pd.Series):
        date_time = " ".join([str(val) for val in list(date_time[:])])
    else:
        date_time = ""

    # Now try converting the resulting string into a datetime obj
    try:
        _date_time = datetime.strptime(date_time, datetime_format)
    except Exception as e:
        raise ValueError(f"Error parsing date: {date_time} - {e}")
    return _date_time

standardise_float(val_str) ¤

Removes commas from strings for numbers that use a period as a decimal separator.

Args: val_str: string or Number-like Returns: val_num: float or None

Source code in importing/functions.py
493
494
495
496
497
498
499
500
501
502
503
504
505
506
def standardise_float(val_str):
    """Removes commas from strings for numbers that use a period as a decimal separator.

    Args: val_str: string or Number-like
    Returns: val_num: float or None
    """
    if isinstance(val_str, Number):
        return float(val_str)
    try:
        val_str = val_str.replace(",", "")
        val_num = float(val_str)
    except ValueError:
        val_num = None
    return val_num

standardise_float_comma(val_str) ¤

For strings representing numbers that use a comma as a decimal separator: (i) Removes full stops (ii) Replaces commas for full stops Args: val_str: string or Number-like Returns: val_num: float or None

Source code in importing/functions.py
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
def standardise_float_comma(val_str):
    """For strings representing numbers that use a comma as a decimal separator:
    (i) Removes full stops
    (ii) Replaces commas for full stops
    Args: val_str: string or Number-like
    Returns: val_num: float or None
    """
    if isinstance(val_str, Number):
        return float(val_str)
    try:
        val_str = val_str.replace(".", "")
        val_str = val_str.replace(",", ".")
        val_num = float(val_str)
    except ValueError:
        val_num = None
    return val_num

validate_dates(data_import) ¤

Verify if there already exists data for the dates of the data being imported.

Parameters:

Name Type Description Default
data_import

DataImportFull or DataImportTemp object.

required

Returns:

Type Description

tuple of: result: (list of dicts): one per classification for this file format of summary: dict containing information on the variable, the end date and whether the data exists. overwrite: (bool) True if any of the data already exists.

Source code in importing/functions.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def validate_dates(data_import):
    """Verify if there already exists data for the dates of the data being imported.

    Args:
        data_import: DataImportFull or DataImportTemp object.

    Returns:
        tuple of:
            result: (list of dicts): one per classification for this file format of
            summary:
                dict containing information on the variable, the end date and whether
                the data exists.
            overwrite: (bool) True if any of the data already exists.
    """
    start_date = data_import.start_date
    end_date = data_import.end_date
    file_format = data_import.format_id
    station = data_import.station
    classifications = list(Classification.objects.filter(format=file_format))

    overwrite = False
    result = []
    for classification in classifications:
        # variable_code is used to select the measurmenet class
        var_code = str(classification.variable.variable_code)
        last_upload_date = get_last_uploaded_date(station.station_id, var_code)

        # Check if data exists between dates)
        query = Measurement.timescale.filter(
            time__range=[start_date, end_date],
            station_id=station.station_id,
            variable_id=classification.variable.variable_id,
        )
        exists = True if query else False
        overwrite = overwrite or exists
        summary = {
            "variable_id": classification.variable.variable_id,
            "variable_code": classification.variable.variable_code,
            "variable_name": classification.variable.name,
            "last_upload_date": last_upload_date,
            "exists": exists,
        }
        result.append(summary)
    return result, overwrite