"""
Routines related to PollyXT files
"""
from pathlib import Path
from typing import Iterable, Tuple, Union
from datetime import datetime, timedelta
import numpy as np
import pandas as pd
from netCDF4 import Dataset
from pollyxt_pipelines.polly_to_scc.exceptions import (
NoFilesFound,
NoMeasurementsInTimePeriod,
BadMeasurementTime,
)
from pollyxt_pipelines.locations import Location
[docs]def polly_date_to_datetime(timestamp: Tuple[int, int]) -> datetime:
"""
Converts a PollyXT Date to a Python object.
Parameters:
timestamp: PollyXT timestamp in two-integer format: 1) date as YYYYMMDD 2) seconds since start of day
Returns:
A datetime object
"""
day, seconds = timestamp
day_str = str(day)
date = datetime.strptime(day_str, "%Y%m%d")
return date + timedelta(seconds=int(seconds))
[docs]def get_measurement_period(
input: Union[Path, Dataset, np.ndarray]
) -> Tuple[datetime, datetime]:
"""
Return the measurement time (i.e. start and end times) from a PollyXT file.
Parameters:
input: Either a path to a PollyXT netCDF file, an opened netCDF dataset or the
`measurement_time` variable.
Returns:
A tuple containing the start and end dates.
"""
# Read `measurement_time` variable, a bit different for each source
if isinstance(input, Path):
nc = Dataset(input, "r")
measurement_time = nc["measurement_time"][:]
nc.close()
elif isinstance(input, Dataset):
measurement_time = input["measurement_time"][:]
elif isinstance(input, np.ndarray):
measurement_time = input
else:
raise ValueError(
f"Parameter `input` must be a Path, a Dataset (netCDF) or a numpy array, not {type(input)}"
)
# Do some sanity checks on its shape
shape = measurement_time.shape
assert len(shape) == 2
assert shape[1] == 2
# Parse start/end times
day_str = str(measurement_time[0, 0])
date = datetime.strptime(day_str, "%Y%m%d")
start = date + timedelta(seconds=int(measurement_time[0, 1]))
end = date + timedelta(seconds=int(measurement_time[-1, 1]))
return start, end
[docs]def find_time_indices(
measurement_time: np.ndarray, start: datetime, end: datetime
) -> Tuple[int, int]:
"""
Given the `measurement_time` array from a PollyXT netCDF file and a time period (`start` and
`end` in HH:MM format), this function returns the indices of the time period in the array.
The `measurement_time` array has two columns, the first contains the date in YYYYMMDD format
and the second column contains each measurement's delta from the date, in seconds (!).
"""
measurement_start, measurement_end = get_measurement_period(measurement_time)
# Do some validation on the dates
if start > end:
raise ValueError(f"Selected start ({start}) is after selected end ({end})!")
if start < measurement_start:
mstart = measurement_start.strftime("%H:%M")
raise ValueError(f"Selected start ({start}) is before file start ({mstart})!")
if start > measurement_end:
mend = measurement_end.strftime("%H:%M")
raise ValueError(f"Selected end ({end}) is after file end ({mend})!")
# Find indices
dt1 = (start - measurement_start).seconds
dt2 = (end - measurement_start).seconds
index_start = dt1 // 30
index_end = dt2 // 30
return (index_start, index_end)
[docs]class PollyXTRepository:
"""
Represents a collection of PollyXT netCDF files. Provides facilities for reading data from such
files, even across single-file boundaries.
"""
def __init__(self, path: Path, location: Location):
"""
Create a repository
Parameters
path: Where are the PollyXT netCDF files stored. Can either be a directory of a single file
"""
# Create a list of files to include in the repository
self.path = path
# Keep location for later use
self.location = location
if self.path.is_dir():
self.files = list(self.path.glob("*.nc"))
elif self.path.is_file():
self.files = [self.path]
else:
raise ValueError(
f"Path {self.path} doesn't seem to be either a file or a directory"
)
if len(self.files) == 0:
raise NoFilesFound(self.path)
# Create the index table
rows = []
for path in self.files:
with Dataset(path, "r") as nc:
measurement_time = nc["measurement_time"][:]
depol_cal_angle = nc["depol_cal_angle"][:]
for i, (timestamp, dcv) in enumerate(
zip(measurement_time, depol_cal_angle)
):
# Parse date
try:
timestamp = polly_date_to_datetime(timestamp)
except ValueError:
raise BadMeasurementTime(path, timestamp)
rows.append(
{
"timestamp": timestamp,
"index": i,
"path": path,
"depol_cal_angle": dcv,
}
)
self.index = pd.DataFrame(rows)
self.index = self.index.sort_values("timestamp", ascending=True)
self.index["calibration"] = (
self.index["depol_cal_angle"] != location.depol_calibration_zero_state
)
[docs] def get_time_period(self) -> Tuple[datetime, datetime]:
"""
Returns the time period available in this repository
Returns:
A tuple containing the first and last available timestamps
"""
start = self.index.iloc[0]["timestamp"]
end = self.index.iloc[-1]["timestamp"]
return start, end
[docs] def get_calibration_periods(self) -> Iterable[Tuple[datetime, datetime]]:
"""
Returns a list of periods that refer to calibration times.
This function checks when the `depol_cal_angle` variable is not 0 and returns
the time periods that this occures.
Returns:
List[Tuple[datetime, datetime]]: A list containing periods (ie. tuples of start-time and end-time)
"""
groups = (self.index.calibration != self.index.calibration.shift()).cumsum()
for i, g in self.index.groupby(groups):
if (g.depol_cal_angle != self.location.depol_calibration_zero_state).all():
yield g.iloc[0]["timestamp"], g.iloc[-1]["timestamp"]
[docs] def get_pollyxt_file(self, time_start: datetime, time_end: datetime):
"""
Create a PollyXTFile for the given time range.
Parameters:
time_start: First measurement to include
time_end: Last measurement to include
Returns:
The PollyXTFile file for the requested period.
"""
# Filter index for given time range
mask = (self.index["timestamp"] >= time_start) & (
self.index["timestamp"] <= time_end
)
targets = self.index[mask]
if targets.shape[0] == 0:
raise NoMeasurementsInTimePeriod()
# Read all files and concat into the requested
polly_files = []
for path in targets["path"].unique():
targets_filtered = targets[targets["path"] == path]
start_index = targets_filtered["index"].min()
end_index = targets_filtered["index"].max()
polly_files.append(
PollyXTFile(
path, location=self.location, start=start_index, end=end_index
)
)
# Concatenate data into one file
pollyxt_file = polly_files[0]
pollyxt_file.raw_signal = np.concatenate([x.raw_signal for x in polly_files])
pollyxt_file.raw_signal_swap = np.concatenate(
[x.raw_signal_swap for x in polly_files]
)
t_len = pollyxt_file.raw_signal.shape[0]
pollyxt_file.measurement_time = np.arange(0, t_len * 30, 30)
pollyxt_file.measurement_shots = np.concatenate(
[x.measurement_shots for x in polly_files]
)
try:
pollyxt_file.zenith_angle = np.concatenate(
[x.zenith_angle for x in polly_files]
)
except ValueError:
# Sometimes these arrays are empty, this is not a problem
pass
pollyxt_file.depol_cal_angle = np.concatenate(
[x.depol_cal_angle for x in polly_files]
)
pollyxt_file.calibration_mask = np.concatenate(
[x.calibration_mask for x in polly_files]
)
pollyxt_file.end_date = polly_files[-1].end_date
return pollyxt_file
[docs]class PollyXTFile:
"""
Reads the variables of interest from a PollyXT netCDF file.
"""
start_date: datetime
start_index: int
end_index: int
end_date: datetime
raw_signal: np.ndarray
raw_signal_swap: np.ndarray
measurement_time: np.ndarray
measurement_shots: np.ndarray
zenith_angle: np.ndarray
location_coordinates: np.ndarray
depol_cal_angle: np.ndarray
# True where the depol_cal_angle is not 0, ie. where the system is doing a calibration
# Same size as `raw_signal*`, `measurement_time` and `measurement_shots`
calibration_mask: np.ndarray
def __init__(
self, input_path: Path, location: Location, start: int = None, end: int = None
):
"""
Read a PollyXT netcdf file
Parameters
input_path: Which file to read
start: Optionally, trim the file from this index
end: Optionally, trim file until this index
"""
# Read the file
nc = Dataset(input_path, "r")
# Read measurement time and trim accoarding to the user provided indices
self.measurement_time = nc["measurement_time"][:]
if start is None:
start = 0
if end is None:
end = self.measurement_time.shape[0]
self.measurement_time = self.measurement_time[start : end + 1]
# Read the rest of the variables
# raw_signal is converted to float64 because it is required by SCC
self.raw_signal = nc["raw_signal"][start : end + 1, :, :]
self.raw_signal = np.array(self.raw_signal, dtype=np.float64)
self.raw_signal_swap = np.swapaxes(self.raw_signal, 1, 2)
self.measurement_shots = nc["measurement_shots"][start : end + 1, :]
self.zenith_angle = nc["zenithangle"][:]
self.location_coordinates = nc["location_coordinates"][:]
self.depol_cal_angle = nc["depol_cal_angle"][start : end + 1]
nc.close()
self.calibration_mask = (
self.depol_cal_angle != location.depol_calibration_zero_state
)
# Store some variables for easy access
self.start_index = start
self.end_index = end
self.start_date = polly_date_to_datetime(self.measurement_time[0, :])
self.end_date = polly_date_to_datetime(self.measurement_time[-1, :])