Skip to content

functions

importing.functions ¤

Attributes¤

one_second = np.timedelta64(1, 's') module-attribute ¤

unix_epoch = np.datetime64(0, 's') module-attribute ¤

Classes¤

Classification ¤

Bases: PermissionsBase

Contains instructions on how to classify the data into a specific variable.

In particular, it links a format to a variable, and provides the column indices for the value, maximum, and minimum columns, as well as the validator columns. It also contains information on whether the data is accumulated, incremental, and the resolution of the data. For Thingsboard imports, only the format, variable, accumulate, resolution and incremental fields are applicable.

Attributes:

Name Type Description
cls_id AutoField

Primary key.

format ForeignKey

The format of the data file.

variable ForeignKey

The variable to which the data belongs.

value PositiveSmallIntegerField

Index of the value column, starting in 0.

maximum PositiveSmallIntegerField

Index of the maximum value column, starting in 0.

minimum PositiveSmallIntegerField

Index of the minimum value column, starting in 0.

value_validator_column PositiveSmallIntegerField

Index of the value validator column, starting in 0.

value_validator_text CharField

Value validator text.

maximum_validator_column PositiveSmallIntegerField

Index of the maximum value validator column, starting in 0.

maximum_validator_text CharField

Maximum value validator text.

minimum_validator_column PositiveSmallIntegerField

Index of the minimum value validator column, starting in 0.

minimum_validator_text CharField

Minimum value validator text.

accumulate PositiveSmallIntegerField

If set to a number of minutes, the data will be accumulated over that period.

resolution DecimalField

Resolution of the data. Only used if it is to be accumulated.

incremental BooleanField

Whether the data is an incremental counter. If it is, any value below the previous one will be removed.

decimal_comma BooleanField

Whether the data uses a comma as a decimal separator.

Functions¤
__str__() ¤

Return the string representation of the object.

Source code in formatting/models.py
464
465
466
def __str__(self) -> str:
    """Return the string representation of the object."""
    return str(self.cls_id)
clean() ¤

Validate the model instance.

It checks that the column indices are different, and that the accumulation period is greater than zero if it is set; the resolution is set if the data is accumulated; and that the value column is set if the import is not from Thingsboard.

Source code in formatting/models.py
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
def clean(self) -> None:
    """Validate the model instance.

    It checks that the column indices are different, and that the accumulation
    period is greater than zero if it is set; the resolution is set if the data is
    accumulated; and that the value column is set if the import is not from
    Thingsboard.
    """
    if self.accumulate and self.resolution is None:
        raise ValidationError(
            {"resolution": "The resolution must be set if the data is accumulated."}
        )

    col_names = [
        "value",
        "maximum",
        "minimum",
        "value_validator_column",
        "maximum_validator_column",
        "minimum_validator_column",
    ]
    unique = defaultdict(list)
    for name in col_names:
        if getattr(self, name) is not None:
            unique[getattr(self, name)].append(name)
    for _, names in unique.items():
        if len(names) != 1:
            msg = "The columns must be different."
            raise ValidationError({field: msg for field in names})

    # for non-Thingsboard classifications
    if not self.format.thingsboard and self.value is None:
        raise ValidationError(
            {
                "value": (
                    "A value column must be specified for non-Thingsboard formats."
                )
            }
        )
get_absolute_url() ¤

Get the absolute URL of the object.

Source code in formatting/models.py
468
469
470
def get_absolute_url(self) -> str:
    """Get the absolute URL of the object."""
    return reverse("formatting:classification_detail", kwargs={"pk": self.pk})

DataImport ¤

Bases: PermissionsBase

Model to store the data imports.

This model stores the data imports, which are, often, files with data that are uploaded to the system. The data is then processed asynchronously and stored in the database.

Attributes:

Name Type Description
station ForeignKey

Station to which the data belongs.

format ForeignKey

Format of the data.

rawfile FileField

File with the data to be imported.

date DateTimeField

Date of submission of the data.

start_date DateTimeField

Start date of the data.

end_date DateTimeField

End date of the data.

records IntegerField

Number of records in the data.

observations TextField

Notes or observations about the data.

status TextField

Status of the import.

log TextField

Log of the data ingestion, indicating any errors.

Functions¤
clean() ¤

Validate information and uploads the measurement data.

Source code in importing/models.py
138
139
140
141
142
143
144
145
def clean(self) -> None:
    """Validate information and uploads the measurement data."""
    tz = self.station.timezone
    if not tz:
        raise ValidationError("Station must have a timezone set.")

    if self.origin.origin == "Thingsboard" and not self.format.thingsboard:
        raise ValidationError("Ensure a Thingsboard-specific format is specified.")

Format ¤

Bases: PermissionsBase

Details of the data file format, describing how to read the file.

It combines several properties, such as the file extension, the delimiter, the date and time formats, and the column indices for the date and time columns, instructing how to read the data file and parse the dates. It is mostly used to ingest data from text files, like CSV. For Thingsboard imports, only the name, description and thingsboard fields are applicable.

Attributes:

Name Type Description
format_id AutoField

Primary key.

name CharField

Short name of the format entry.

description TextField

Description of the format.

extension ForeignKey

The extension of the data file.

delimiter ForeignKey

The delimiter between columns in the data file. Only required for text files.

first_row PositiveSmallIntegerField

Index of the first row with data, starting in 0.

footer_rows PositiveSmallIntegerField

Number of footer rows to be ignored at the end.

date ForeignKey

Format for the date column. Only required for text files.

date_column PositiveSmallIntegerField

Index of the date column, starting in 0.

time ForeignKey

Format for the time column. Only required for text files.

time_column PositiveSmallIntegerField

Index of the time column, starting in 0.

thingsboard BooleanField

Whether the data is being imported from Thingsboard.

Attributes¤
datetime_format property ¤

Obtain the datetime format string.

Functions¤
__str__() ¤

Return the string representation of the object.

Source code in formatting/models.py
272
273
274
def __str__(self) -> str:
    """Return the string representation of the object."""
    return str(self.name)
clean() ¤

Validate the model instance.

Checks that the required fields for non-Thingsboard data are provided.

Source code in formatting/models.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
def clean(self) -> None:
    """Validate the model instance.

    Checks that the required fields for non-Thingsboard data are provided.
    """
    super().clean()
    errors = {}
    if not self.thingsboard:
        required_fields = (
            "extension",
            "first_row",
            "footer_rows",
            "date_column",
            "time_column",
        )
        for field in required_fields:
            if getattr(self, field) is None:
                errors[field] = "Field is required for non-Thingsboard data."

    if errors:
        raise ValidationError(errors)
datetime_columns(delimiter) ¤

Column indices that correspond to the date and time columns in the dataset.

Parameters:

Name Type Description Default
delimiter str

The delimiter used to split the date and time codes.

required

Returns:

Type Description
list[int]

list[int]: A list of column indices.

Source code in formatting/models.py
285
286
287
288
289
290
291
292
293
294
295
296
297
298
def datetime_columns(self, delimiter: str) -> list[int]:
    """Column indices that correspond to the date and time columns in the dataset.

    Args:
        delimiter (str): The delimiter used to split the date and time codes.

    Returns:
        list[int]: A list of column indices.
    """
    date_items = self.date.code.split(delimiter)
    date_cols = list(range(self.date_column, self.date_column + len(date_items)))
    time_items = self.time.code.split(delimiter)
    time_cols = list(range(self.time_column, self.time_column + len(time_items)))
    return date_cols + time_cols
get_absolute_url() ¤

Get the absolute URL of the object.

Source code in formatting/models.py
276
277
278
def get_absolute_url(self) -> str:
    """Get the absolute URL of the object."""
    return reverse("formatting:format_detail", kwargs={"pk": self.pk})

Measurement ¤

Bases: MeasurementBase

Class to store the measurements and their validation status.

This class holds the value of a given variable and station at a specific time, as well as auxiliary information such as maximum and minimum values, depth and direction, for vector quantities. All of these have a raw version where a backup of the original data is kept, should this change at any point.

Flags to monitor its validation status, if the data is active (and therefore can be used for reporting) and if it has actually been used for that is also included.

Attributes:

Name Type Description
depth int

Depth of the measurement.

direction Decimal

Direction of the measurement, useful for vector quantities.

raw_value Decimal

Original value of the measurement.

raw_maximum Decimal

Original maximum value of the measurement.

raw_minimum Decimal

Original minimum value of the measurement.

raw_direction Decimal

Original direction of the measurement.

raw_depth int

Original depth of the measurement.

is_validated bool

Flag to indicate if the measurement has been validated.

is_active bool

Flag to indicate if the measurement is active. An inactive measurement is not used for reporting

Attributes¤
overwritten property ¤

Indicates if any of the values associated to the entry have been overwritten.

Returns:

Name Type Description
bool bool

True if any raw field is different to the corresponding standard field.

raws property ¤

Return the raw fields of the measurement.

Returns:

Type Description
tuple[str, ...]

tuple[str]: Tuple with the names of the raw fields of the measurement.

Functions¤
clean() ¤

Check consistency of validation, reporting and backs-up values.

Source code in measurement/models.py
259
260
261
262
263
264
265
266
267
268
269
def clean(self) -> None:
    """Check consistency of validation, reporting and backs-up values."""
    # Check consistency of validation
    if not self.is_validated and not self.is_active:
        raise ValidationError("Only validated entries can be declared as inactive.")

    # Backup values to raws, if needed
    for r in self.raws:
        value = getattr(self, r.removeprefix("raw_"))
        if value and not getattr(self, r):
            setattr(self, r, value)

Report ¤

Bases: MeasurementBase

Holds the different reporting data.

It also keeps track of which data has already been used when creating the reports.

Attributes:

Name Type Description
report_type str

Type of report. It can be hourly, daily or monthly.

completeness Decimal

Completeness of the report. Eg. a daily report with 24 hourly measurements would have a completeness of 100%.

Functions¤
clean() ¤

Validate that the report type and use of the data is consistent.

Source code in measurement/models.py
147
148
149
150
151
152
153
154
155
156
def clean(self) -> None:
    """Validate that the report type and use of the data is consistent."""
    if self.report_type == ReportType.HOURLY:
        self.time = self.time.replace(minute=0, second=0, microsecond=0)
    elif self.report_type == ReportType.DAILY:
        self.time = self.time.replace(hour=0, minute=0, second=0, microsecond=0)
    elif self.report_type == ReportType.MONTLY:
        self.time = self.time.replace(
            day=1, hour=0, minute=0, second=0, microsecond=0
        )

Functions¤

construct_matrix(data_import) ¤

Creates dataframes containing the processed data for each variable.

Checks classifications exist for the file format and that there are enough columns in the data file.

Parameters:

Name Type Description Default
data_import DataImport

The DataImport object.

required

Returns:

Type Description
tuple[Timestamp, Timestamp, list[tuple[int, DataFrame]]]

The start and end dates and a list of tuples containing the variable ID and the associated dataframe containing the variable data.

Source code in importing/functions.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
def construct_matrix(
    data_import: DataImport,
) -> tuple[pd.Timestamp, pd.Timestamp, list[tuple[int, pd.DataFrame]]]:
    """Creates dataframes containing the processed data for each variable.

    Checks classifications exist for the file format and that there are enough
    columns in the data file.

    Args:
        data_import: The DataImport object.

    Returns:
        The start and end dates and a list of tuples containing the variable ID and the
            associated dataframe containing the variable data.
    """
    # Get the "preformatted matrix" sorted by date col
    is_thingsboard = data_import.origin.origin == "Thingsboard"
    if is_thingsboard:
        matrix = read_thingsboard_data_to_import(
            data_import.rawfile, data_import.station.timezone
        )
    else:
        matrix = read_data_to_import(
            data_import.rawfile, data_import.format, data_import.station.timezone
        )

    # Find start and end dates from top and bottom row
    start_date = matrix["date"].iloc[0]
    end_date = matrix["date"].iloc[-1]

    classifications = list(Classification.objects.filter(format=data_import.format))

    if len(classifications) == 0:
        msg = "No classifications found for this format. Please add some."
        raise ValueError(msg)

    if not is_thingsboard:
        max_cols = max([c.value for c in classifications])
        ncols = len(matrix.columns)
        if max_cols >= ncols:
            msg = (
                f"The number of columns in the file {ncols} is less than the maximum "
                f"column number specified in the classifications {max_cols}. Please "
                "check the file and the classifications for this format."
            )
            raise ValueError(msg)

    to_ingest = []
    for classification in classifications:
        data = get_processed_variable_data(
            matrix, classification, start_date, end_date, is_thingsboard
        )
        to_ingest.append((classification.variable.variable_id, data))

    return start_date, end_date, to_ingest

get_processed_variable_data(matrix, classification, start_date, end_date, thingsboard=False) ¤

Returns the data table for a given variable, performing necessary validation and data processing steps.

Parameters:

Name Type Description Default
matrix DataFrame

the preformatted matrix containing the raw data.

required
classification Classification

a formatting.Classification object.

required
start_date Timestamp

the start date of the data being imported.

required
end_date Timestamp

the end date of the data being imported.

required

Returns:

Type Description
DataFrame

The processed dataframe for the given classification.

Source code in importing/functions.py
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
def get_processed_variable_data(
    matrix: pd.DataFrame,
    classification: Classification,
    start_date: pd.Timestamp,
    end_date: pd.Timestamp,
    thingsboard: bool = False,
) -> pd.DataFrame:
    """Returns the data table for a given variable, performing necessary validation
    and data processing steps.

    Args:
        matrix: the preformatted matrix containing the raw data.
        classification: a formatting.Classification object.
        start_date: the start date of the data being imported.
        end_date: the end date of the data being imported.

    Returns:
        The processed dataframe for the given classification.
    """
    if not thingsboard:
        data, columns = validate_values(matrix, classification)
        data = standardise_floats(data, classification)
        data = remove_nan_rows(data, classification, columns)
    else:
        data = parse_thingsboard_values(matrix)

    if classification.incremental:
        data = process_incremental_data(data)

    if acc := classification.accumulate:
        data = process_cumulative_data(data, classification, acc, start_date, end_date)
    else:
        if classification.resolution:
            data["value"] = data["value"] * float(classification.resolution)

    return data

parse_thingsboard_values(data) ¤

Parse the values column for Thingsboard dataframe.

Parameters:

Name Type Description Default
data DataFrame

the dataframe containing data to parse.

required

Returns:

Type Description
DataFrame

The dataframe with a numeric values column.

Source code in importing/functions.py
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
def parse_thingsboard_values(data: pd.DataFrame) -> pd.DataFrame:
    """Parse the values column for Thingsboard dataframe.

    Args:
        data: the dataframe containing data to parse.

    Returns:
        The dataframe with a numeric values column.
    """
    try:
        data["value"] = pd.to_numeric(data["value"])
    except ValueError as exc:
        raise ValueError(
            "Failed to parse value column for Thingsboard data. Check that numerical"
            " data are provided."
        ) from exc
    return data

process_cumulative_data(data, classification, acc, start_date, end_date) ¤

Processes cumulative time series data aggregates over specified time periods.

Parameters:

Name Type Description Default
data Dataframe

Dataframe containing validated data to be processed.

required
classification Classification

a formatting.Classification object.

required
acc int

The accumulation period in minutes.

required
start_date Timestamp

the start date of the data being imported.

required
end_date Timestamp

the end date of the data being imported.

required

Returns:

Type Description
DataFrame

The processed dataframe with cumulative data.

Source code in importing/functions.py
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
def process_cumulative_data(
    data: pd.Dataframe,
    classification: Classification,
    acc: int,
    start_date: pd.Timestamp,
    end_date: pd.Timestamp,
) -> pd.DataFrame:
    """Processes cumulative time series data aggregates over specified time periods.

    Args:
        data: Dataframe containing validated data to be processed.
        classification: a formatting.Classification object.
        acc: The accumulation period in minutes.
        start_date: the start date of the data being imported.
        end_date: the end date of the data being imported.

    Returns:
        The processed dataframe with cumulative data.
    """
    # Timestamps are rounded down to nearest acc minutes
    data["date"] = data["date"].dt.floor(f"{acc}min")
    data["date"] = data["date"] + pd.Timedelta(minutes=acc)  # Shift timestamps forward
    count = data.groupby("date")["value"].sum().to_frame()  # Group and aggregate
    data = count["value"] * float(classification.resolution)

    # Create new date range with acc
    start_date = start_date.replace(
        minute=int(start_date.minute / acc) * acc,
        second=0,
        microsecond=0,
        nanosecond=0,
    ) + pd.Timedelta(minutes=acc)
    end_date = end_date.replace(
        minute=int(end_date.minute / acc) * acc,
        second=0,
        microsecond=0,
        nanosecond=0,
    ) + pd.Timedelta(minutes=acc)
    table = pd.date_range(
        start_date, end_date, freq=f"{acc}min", name="date"
    ).to_frame()
    data = pd.concat([table, data], axis=1)

    return data.fillna(0)  # Fill missing values

process_datetime_columns(data, file_format, timezone) ¤

Process the datetime columns in a DataFrame.

Parameters:

Name Type Description Default
data DataFrame

The DataFrame to process.

required
file_format Format

The file format.

required
timezone str

The timezone to use.

required

Returns:

Type Description
DataFrame

The DataFrame with the datetime columns processed.

Source code in importing/functions.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def process_datetime_columns(
    data: pd.DataFrame, file_format: Format, timezone: str
) -> pd.DataFrame:
    """Process the datetime columns in a DataFrame.

    Args:
        data: The DataFrame to process.
        file_format: The file format.
        timezone: The timezone to use.

    Returns:
        The DataFrame with the datetime columns processed.
    """
    tz = zoneinfo.ZoneInfo(timezone)
    dt_format = file_format.datetime_format

    # Join columns if date and time are separate
    if file_format.date_column != file_format.time_column:
        cols = file_format.datetime_columns(file_format.delimiter.character)
        data["date"] = data.iloc[:, cols].astype(str).agg(" ".join, axis=1)
    else:  # If single column, rename to 'date'
        data = data.rename(columns={data.columns[file_format.date_column]: "date"})

    # Invalid dates will cause an error
    try:
        data["date"] = pd.to_datetime(data["date"], format=dt_format).dt.tz_localize(tz)
    except ValueError as exc:
        raise ValueError(
            "Failed to process datetime column(s). Ensure datetimes are provided in the"
            f" correct format: {dt_format}."
        ) from exc

    return data.sort_values("date").reset_index(drop=True)

process_incremental_data(data) ¤

Processes incremental time series data.

If incremental, it is assumed to only work with 'value' columns; maximum and minimum are excluded.

Parameters:

Name Type Description Default
data Dataframe

the dataframe containing validated data to be processed.

required

Returns:

Type Description
DataFrame

The processed dataframe with incremental data.

Source code in importing/functions.py
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
def process_incremental_data(data: pd.Dataframe) -> pd.DataFrame:
    """Processes incremental time series data.

    If incremental, it is assumed to only work with 'value' columns; maximum and
    minimum are excluded.

    Args:
        data: the dataframe containing validated data to be processed.

    Returns:
        The processed dataframe with incremental data.
    """
    data["value"] = data["value"].diff()
    data.loc[data["value"] < 0, "value"] = np.nan
    return data.dropna()

read_data_to_import(source_file, file_format, timezone) ¤

Reads the data from file into a pandas DataFrame.

Works out what sort of file is being read and adds standardised columns for datetime.

Parameters:

Name Type Description Default
source_file Any

Stream of data to be parsed.

required
file_format Format

Format of the data to be parsed.

required
timezone str

Timezone name, eg. 'America/Chicago'.

required

Returns:

Type Description
DataFrame

The DataFrame with raw data read and extra column(s) for datetime correctly parsed.

Source code in importing/functions.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def read_data_to_import(
    source_file: Any, file_format: Format, timezone: str
) -> pd.DataFrame:
    """Reads the data from file into a pandas DataFrame.

    Works out what sort of file is being read and adds standardised columns for
    datetime.

    Args:
        source_file: Stream of data to be parsed.
        file_format: Format of the data to be parsed.
        timezone: Timezone name, eg. 'America/Chicago'.

    Returns:
        The DataFrame with raw data read and extra column(s) for datetime
            correctly parsed.
    """
    if file_format.extension.value in ["xlsx", "xlx"]:
        data = read_file_excel(source_file, file_format)
    else:
        data = read_file_csv(source_file, file_format)

    return process_datetime_columns(data, file_format, timezone)

read_file_csv(source_file, file_format) ¤

Reads a CSV file into a pandas DataFrame.

Parameters:

Name Type Description Default
source_file Any

Stream of data to be parsed.

required
file_format Format

The file format.

required

Returns:

Type Description
DataFrame

A pandas DataFrame containing the data from the file.

Source code in importing/functions.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def read_file_csv(source_file: Any, file_format: Format) -> pd.DataFrame:
    """Reads a CSV file into a pandas DataFrame.

    Args:
        source_file: Stream of data to be parsed.
        file_format: The file format.

    Returns:
        A pandas DataFrame containing the data from the file.
    """
    firstline = file_format.first_row if file_format.first_row else 0
    skipfooter = file_format.footer_rows if file_format.footer_rows else 0
    delimiter = file_format.delimiter.character

    skiprows: int | list[int] = firstline
    if not isinstance(source_file, str | Path):
        # The file was uploaded as binary
        lines = sum(1 for _ in source_file)
        source_file.seek(0)
        skiprows = list(range(0, firstline)) + list(range(lines - skipfooter, lines))
        skipfooter = 0

    # Deal with the delimiter
    if "\\x" in delimiter:
        delim_hexcode = delimiter.replace("\\x", "")
        delim_intcode = eval("0x" + delim_hexcode)
        delimiter = chr(delim_intcode)
    elif delimiter == " ":
        delimiter = r"\s+"  # This is a regex for whitespace

    return pd.read_csv(
        source_file,
        sep=delimiter,
        header=None,
        index_col=False,
        skiprows=skiprows,
        skipfooter=skipfooter,
        encoding="ISO-8859-1",
    )

read_file_excel(file_path, file_format) ¤

Reads an Excel file into a pandas DataFrame.

Parameters:

Name Type Description Default
file_path str

The path to the file to be read.

required
file_format Format

The file format.

required

Returns:

Type Description
DataFrame

A pandas DataFrame containing the data from the file.

Source code in importing/functions.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def read_file_excel(file_path: str, file_format: Format) -> pd.DataFrame:
    """Reads an Excel file into a pandas DataFrame.

    Args:
        file_path: The path to the file to be read.
        file_format: The file format.

    Returns:
        A pandas DataFrame containing the data from the file.
    """
    firstline = file_format.first_row if file_format.first_row else 0
    skipfooter = file_format.footer_rows if file_format.footer_rows else 0
    return pd.read_excel(
        file_path,
        header=None,
        skiprows=firstline,
        skipfooter=skipfooter,
        engine=None,
        index_col=None,
    )

read_thingsboard_data_to_import(source_file, timezone) ¤

Reads the data from a Thingsboard json file into a pandas DataFrame.

Parameters:

Name Type Description Default
data_file

The path to the json file.

required
timezone str

The station timezone.

required

Returns:

Type Description
DataFrame

The DataFrame with raw data read and datetime parsed.

Source code in importing/functions.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def read_thingsboard_data_to_import(
    source_file: FieldFile, timezone: str
) -> pd.DataFrame:
    """Reads the data from a Thingsboard json file into a pandas DataFrame.

    Args:
        data_file: The path to the json file.
        timezone: The station timezone.

    Returns:
        The DataFrame with raw data read and datetime parsed.
    """
    with open(source_file.path, encoding="utf-8") as f:
        raw_data = json.load(f)

    thingsboard_variable = next(iter(raw_data))
    data = pd.DataFrame(raw_data[thingsboard_variable])

    # Set the date column
    tz = zoneinfo.ZoneInfo(timezone)
    data["date"] = pd.to_datetime(data["ts"], unit="ms").dt.tz_localize(tz)
    data = data.drop(columns="ts").sort_values("date").reset_index(drop=True)
    return data

remove_nan_rows(data, classification, columns) ¤

Cleans the dataframe by removing rows composed of only nan values.

Parameters:

Name Type Description Default
data DataFrame

the dataframe to be cleaned.

required
classification Classification

a formatting.Classification object.

required
columns list[tuple[str, str]]

A mapping for the validated columns.

required

Returns:

Type Description
DataFrame

The cleaned dataframe.

Source code in importing/functions.py
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
def remove_nan_rows(
    data: pd.DataFrame, classification: Classification, columns: list[tuple[str, str]]
) -> pd.DataFrame:
    """Cleans the dataframe by removing rows composed of only nan values.

    Args:
        data: the dataframe to be cleaned.
        classification: a formatting.Classification object.
        columns: A mapping for the validated columns.

    Returns:
        The cleaned dataframe.
    """
    # Eliminate NAs if all values in row are nan
    data_columns = [column[1] for column in columns if column[1] != "date"]
    data = data.dropna(axis=0, how="all", subset=data_columns)
    if len(data) == 0:
        raise ValueError(
            f"Importing variable {classification.variable.name} from "
            f"column {classification.value} (starting in 0) results in no valid "
            "data."
        )
    return data

save_temp_data_to_permanent(data_import) ¤

Function to pass the temporary import to the final table.

This function carries out the following steps: - Bulk delete of existing data between two times on a given measurement table for the station in question. - Bulk create to add the new data from the uploaded file.

Parameters:

Name Type Description Default
data_import DataImport

The DataImport object.

required

Returns:

Type Description
tuple[datetime, datetime, int]

A tuple containing the start date, end date and number of records inserted.

Source code in importing/functions.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
def save_temp_data_to_permanent(
    data_import: DataImport,
) -> tuple[datetime, datetime, int]:
    """Function to pass the temporary import to the final table.

    This function carries out the following steps:
    - Bulk delete of existing data between two times on a given measurement table for
    the station in question.
    - Bulk create to add the new data from the uploaded file.

    Args:
        data_import: The DataImport object.

    Returns:
        A tuple containing the start date, end date and number of records inserted.
    """
    # Delete existing measurements and reports for the same data_import_id
    Measurement.objects.filter(data_import_id=data_import.data_import_id).delete()
    Report.objects.filter(data_import_id=data_import.data_import_id).delete()

    start_date, end_date, all_data = construct_matrix(data_import)

    if not all_data:
        msg = "No data to import. Is the chosen format correct?"
        raise ValueError(msg)

    must_cols = ["date", "value"]
    num_records = len(all_data[0][1])
    for variable_id, table in all_data:
        cols = [
            c for c in table.columns if c in Measurement._meta.fields or c in must_cols
        ]
        table = (
            table[cols]
            .dropna(axis=0, subset=must_cols)
            .rename(columns={"date": "time"})
        )

        # Delete existing data between the date ranges. Needed for data not linked
        # to a data_import_id. Both measurements and reports are deleted.
        station_id = data_import.station.station_id
        Measurement.timescale.filter(
            time__range=[start_date, end_date],
            station_id=station_id,
            variable_id=variable_id,
        ).delete()
        Report.objects.filter(
            time__range=[start_date, end_date],
            station_id=station_id,
            variable_id=variable_id,
        ).delete()

        # Add data to the database in batches
        for start in range(0, len(table), settings.IMPORT_BATCH_SIZE):
            batch = table.iloc[start : start + settings.IMPORT_BATCH_SIZE]

            model_instances = []
            for row in batch.itertuples(index=False):
                instance = Measurement(
                    **row._asdict(),
                    station_id=station_id,
                    variable_id=variable_id,
                    data_import_id=data_import.data_import_id,
                )
                instance.clean()
                model_instances.append(instance)

            # WARNING: This is a bulk insert, so it will not call the save()
            # method nor send the pre_save or post_save signals for each instance.
            Measurement.objects.bulk_create(model_instances)

    return start_date, end_date, num_records

standardise_floats(data, classification) ¤

Standardises floats and commas.

If a period is used as a decimal separator, commas are removed. If a comma is used, periods are removed and commas replaced with periods. Columns are then converted to numeric type. Note: this assumes that all values are formatted in the same way.

Parameters:

Name Type Description Default
data DataFrame

the dataframe containing data to standardise.

required
classification Classification

a formatting.Classification object.

required

Returns:

Type Description
DataFrame

The dataframe with data now standardised.

Source code in importing/functions.py
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
def standardise_floats(
    data: pd.DataFrame, classification: Classification
) -> pd.DataFrame:
    """Standardises floats and commas.

    If a period is used as a decimal separator, commas are removed. If a comma is used,
    periods are removed and commas replaced with periods. Columns are then converted
    to numeric type. Note: this assumes that all values are formatted in the same way.

    Args:
        data: the dataframe containing data to standardise.
        classification: a formatting.Classification object.

    Returns:
        The dataframe with data now standardised.
    """
    for col in data:
        if col == "date":
            continue
        if classification.decimal_comma:  # comma used as decimal separator
            try:
                data[col] = pd.to_numeric(
                    data[col].astype(str).str.replace(".", "").str.replace(",", "."),
                )
            except ValueError as exc:
                raise ValueError(
                    "Failed to parse value column. Expected values formatted with "
                    "commas as decimal separators."
                ) from exc
        else:  # float used as decimal separator
            try:
                data[col] = pd.to_numeric(data[col].astype(str).str.replace(",", ""))
            except ValueError as exc:
                raise ValueError(
                    "Failed to parse value column. Expected values formatted with "
                    "periods as decimal separators."
                ) from exc
    return data

validate_values(matrix, classification) ¤

Validates the values, maxima and minima according to the classification model, and renames the columns to standard names.

Parameters:

Name Type Description Default
matrix DataFrame

the preformatted matrix containing the raw data.

required
classification Classification

a formatting.Classification object.

required

Returns:

Type Description
tuple[DataFrame, list[tuple[str, str]]]

A tuple of the validated DataFrame and a list of mappings for the columns that have been validated, to be used in renaming.

Source code in importing/functions.py
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
def validate_values(
    matrix: pd.DataFrame, classification: Classification
) -> tuple[pd.DataFrame, list[tuple[str, str]]]:
    """Validates the values, maxima and minima according to the classification model,
    and renames the columns to standard names.

    Args:
        matrix: the preformatted matrix containing the raw data.
        classification: a formatting.Classification object.

    Returns:
        A tuple of the validated DataFrame and a list of mappings for the columns that
            have been validated, to be used in renaming.
    """
    columns = [("date", "date"), (classification.value, "value")]

    # Validation of values; non-validated values are set to np.nan
    if classification.value_validator_column:
        matrix.loc[
            matrix[classification.value_validator_column]
            != classification.value_validator_text,
            classification.value,
        ] = np.nan

    # Validation of maximum
    if classification.maximum:
        columns.append((classification.maximum, "maximum"))
        if classification.maximum_validator_column:
            matrix.loc[
                matrix[classification.maximum_validator_column]
                != classification.maximum_validator_text,
                classification.maximum,
            ] = np.nan

    # Validation of minimum
    if classification.minimum:
        columns.append((classification.minimum, "minimum"))
        if classification.minimum_validator_column:
            matrix.loc[
                matrix[classification.minimum_validator_column]
                != classification.minimum_validator_text,
                classification.minimum,
            ] = np.nan

    # Rename validated columns
    data = matrix.loc[:, [v[0] for v in columns]].rename(columns=dict(columns))
    return data, columns