"""
Tools for communicating with the SCC backend
Author: Thanasis Georgiou <ageorgiou@noa.gr>
Based on `scc-access` by Iannis Binietoglou <i.binietoglou@impworks.gr>: https://repositories.imaa.cnr.it/public/scc_access
"""
from datetime import date
import contextlib
from typing import Union, List, Tuple
from pathlib import Path
import shutil
import requests
from bs4 import BeautifulSoup
from pollyxt_pipelines.console import console
from pollyxt_pipelines.locations import Location
from pollyxt_pipelines.config import Config
from pollyxt_pipelines.scc_access import constants, exceptions
from pollyxt_pipelines.scc_access.types import APIObject, LidarConstant, Measurement
[docs]class SCC_Credentials:
"""
Contains all required credentials to authenticate with SCC
"""
http_auth_user: str
http_auth_password: str
username: str
password: str
def __init__(self, config: Config):
self.http_auth_user = config["http"]["username"]
self.http_auth_password = config["http"]["password"]
self.username = config["auth"]["username"]
self.password = config["auth"]["password"]
[docs]class SCC:
"""
Represents a session with SCC.
Before making any calls, the user should login using `login()`!
It's recommended to use the `scc_session()` context manager, which handles logging in and out.
"""
def __init__(self, credentials: SCC_Credentials):
self.credentials = credentials
# Create requests session
self.session = requests.Session()
self.session.auth = (credentials.http_auth_user, credentials.http_auth_password)
self.session.verify = True
[docs] def login(self):
"""
Login to SCC
This function starts a session with the SCC backend, storing the authentication
cookies so they can be used by the rest of the methods. Remember to call `logout()`!
"""
# Get login form (for csrf token)
login_page = self.session.get(constants.login_url)
if not login_page.ok:
raise exceptions.PageNotAccessible(
constants.login_url, login_page.status_code
)
# Submit login form
body = {
"username": self.credentials.username,
"password": self.credentials.password,
}
headers = {
"X-CSRFToken": login_page.cookies["csrftoken"],
"referer": constants.login_url,
}
logon_request = self.session.post(
constants.login_url, data=body, headers=headers
)
# Do some basic checking on the response
if "Wrong username or password" in logon_request.text:
raise exceptions.WrongCredentialsException()
[docs] def logout(self):
"""Logout of SCC"""
self.session.get(constants.login_url)
[docs] def download_file(self, url: str, path: Path):
"""
Downloads a single file from SCC to the given path
Parameters:
url: Which URL to download the file from
path: Where to store the downloaded file
"""
with self.session.get(url, stream=True) as r:
with open(path, "wb") as file:
shutil.copyfileobj(r.raw, file)
[docs] def query_measurements(
self, date_start: date, date_end: date, location: Union[Location, None], page=1
) -> Tuple[int, List[Measurement]]:
"""
Searches SCC for uploaded measurements
Parameters:
date_start: First day of results
date_end: Last day of results
location: Optionally, filter results by a location
page: Which page to return (starts from 1, default value is 1)
Returns:
The number of pages and the list of measurements
"""
if page - 1 < 0:
raise ValueError("Page numbers start at 1!")
params = {
"start__gte": date_start.strftime("%Y-%m-%d %H:%M:%S"),
"start__lt": date_end.strftime("%Y-%m-%d %H:%M:%S"),
"p": page - 1,
}
if location is not None:
params["station_id"] = location.scc_code
results = self.session.get(constants.list_measurements_url, params=params)
if not results.ok:
raise exceptions.UnexpectedResponse
# Parse body to find measurements and page count
body = BeautifulSoup(results.text, "html.parser")
pagination = body.find("nav", class_="grp-pagination")
last_page = pagination.find("a", class_="end")
if last_page is None:
pages = 1
else:
pages = int(last_page.text)
measurements = [
Measurement.from_table_row(tr)
for tr in body.findAll("tr", {"class": "grp-row"})
]
return pages, measurements
[docs] def download_products(
self,
measurement_id: str,
download_path: Path,
hirelpp=True,
cloudmask=True,
elpp=True,
optical=True,
elic=True,
):
"""
Downloads products for a given measurement (ID) to the given path.
This function is a generator, yielding the filename of each downloaded file.
Parameters:
measurement_id: Which measurement to download products for
download_path: Where to store the downloaded products
hirelpp: Whether to download HiRELPP files
cloudmask: Whether to download Cloudmask files
elpp: Whether to download ELPP files
optical: Whether to download optical (ELDA or ELDEC) files
elic: Whether to download ELIC files
"""
# Determine URLs to download
to_download = []
if hirelpp:
to_download.append(
{
"url": constants.download_hirelpp_pattern.format(measurement_id),
"path": download_path / f"hirelpp_{measurement_id}.zip",
}
)
if cloudmask:
to_download.append(
{
"url": constants.download_cloudmask_pattern.format(measurement_id),
"path": download_path / f"cloudmask_{measurement_id}.zip",
}
)
if elpp:
to_download.append(
{
"url": constants.download_preprocessed_pattern.format(
measurement_id
),
"path": download_path / f"preprocessed_{measurement_id}.zip",
}
)
if optical:
to_download.append(
{
"url": constants.download_optical_pattern.format(measurement_id),
"path": download_path / f"optical_{measurement_id}.zip",
}
)
if elic:
to_download.append(
{
"url": constants.download_elic_pattern.format(measurement_id),
"path": download_path / f"elic_{measurement_id}.zip",
}
)
if len(to_download) == 0:
raise ValueError("At least one product must be downloaded!")
# Download each file
for download in to_download:
try:
self.download_file(**download)
yield download["path"]
except Exception as ex:
console.print("[error]Error while downloading file from SCC[/error]")
console.print(f'[error]URL:[/error] {download["url"]}')
console.print(f'[error]Path:[/error] {download["path"]}')
console.print("[error]Exception:[/error]")
console.print_exception()
continue
[docs] def get_anchillary(self, file_id: str, file_type: str) -> Union[APIObject, None]:
"""
Uses the SCC API to fetch information about anchillary files.
Parameters:
file_id: File ID to lookup
file_type: What kind of file to lookup ('sounding', 'overlap' or 'lidarratio')
Returns:
The API response about the file
"""
# Determine correct endpoint
if file_type == "sounding":
url = constants.api_sounding_search_pattern.format(file_id)
elif file_type == "overlap":
url = constants.api_overlap_search_pattern.format(file_id)
elif file_type == "lidarratio":
url = constants.api_lidarratio_search_pattern.format(file_id)
else:
raise ValueError(
f"File type should be one of: sounding, overlap, lidarratio"
)
# Make request
response = self.session.get(url)
if not response.ok:
raise exceptions.UnexpectedResponse("Could not get anchillary file info")
# Parse body
# It should have an 'objects' dictionary containing one entry, if it is found
response_body = response.json()
objects = response_body["objects"]
if objects:
return APIObject(objects[0])
else:
return None
[docs] def upload_file(
self,
filename: Path,
system_id: str,
rs_filename: Union[Path, None] = None,
ov_filename: Union[Path, None] = None,
lr_filename: Union[Path, None] = None,
):
"""
Uploads a file to SCC, together with the auxilary files. There is no return value, but it will
throw for potential errors.
Parameters:
filename: Path to the SCC netCDF file
system_id: SCC Lidar System ID for the system that made the measurement
rs_filename: Path to the radiosonde netCDF file
ov_filename: Path to the overlap netCDF file
lr_filename: Path to the lidar ratio netCDF file
"""
# Check if the given anchillary files already exist before adding them to the request body
files = {}
if rs_filename is not None:
info = self.get_anchillary(rs_filename.name, "sounding")
if info is not None and info.exists:
console.print(
f"[warn]Radiosonde file[/warn] {rs_filename.name} [warn]already exists on SCC.[/warn]"
)
else:
files["sounding_file"] = open(rs_filename, "rb")
if ov_filename is not None:
info = self.get_anchillary(ov_filename.name, "overlap")
if info is not None and info.exists:
console.print(
f"[warn]Overlap file[/warn] {ov_filename.name} [warn]already exists on SCC.[/warn]"
)
else:
files["overlap_file"] = open(ov_filename, "rb")
if lr_filename is not None:
info = self.get_anchillary(lr_filename.name, "lidarratio")
if info is not None and info.exists:
console.print(
f"[warn]Lidar ratio file[/warn] {lr_filename.name} [warn]already exists on SCC.[/warn]"
)
else:
files["lidar_ratio_file"] = open(lr_filename, "rb")
files["data"] = open(filename, "rb")
# Get the form and submit it
upload_page = self.session.get(constants.upload_url)
body = {"system": system_id}
headers = {
"X-CSRFToken": upload_page.cookies["csrftoken"],
"referer": constants.upload_url,
}
upload_submit = self.session.post(
constants.upload_url, data=body, files=files, headers=headers
)
# Check response
response_body = BeautifulSoup(upload_submit.text, "html.parser")
alerts = response_body.find_all("div", class_="alert-box")
if len(alerts) > 0:
errors = ", ".join([alert.p.text.strip() for alert in alerts])
raise exceptions.SCCError(errors)
data_input_field = response_body.find("input", id="id_data")
if data_input_field is not None:
data_text = (
data_input_field.parent.find("p").text.strip().replace("\n", " ")
)
if "Error:" in data_text:
raise exceptions.SCCError(data_text)
if (
upload_submit.status_code != 200
or upload_submit.url == constants.upload_url
):
raise exceptions.UnexpectedResponse("Upload to SCC failed, unknown reason")
[docs] def get_measurement(self, measurement_id: str) -> Union[Measurement, None]:
"""
Fetches information about one measurement from SCC.
Parameters:
measurement_id: Which measurement to lookup
Returns:
The measurement if it exists, None otherwise
"""
url = constants.api_measurement_pattern.format(measurement_id)
response = self.session.get(url)
if response.status_code == 404:
return None
elif not response.ok:
raise exceptions.UnexpectedResponse()
response_body = response.json()
if response_body:
return Measurement.from_json(response_body)
else:
raise exceptions.UnexpectedResponse()
[docs] def delete_measurement(self, measurement_id: str):
"""
Deletes a measurement from SCC
Parameters:
measurement_id: Which measurement to delete
"""
# Submit form
url = constants.delete_measurement_pattern.format(measurement_id)
body = {
"select_delete_related_measurements": "not_delete_related",
"post": "yes",
}
headers = {
"referer": url,
"X-CSRFToken": self.session.cookies["csrftoken"],
}
response = self.session.post(url, data=body, headers=headers)
# Look for success banner
if response.status_code == 404:
raise exceptions.MeasurementNotFound(measurement_id)
if response.status_code != 200:
raise exceptions.UnexpectedResponse("Response code is not 200")
[docs] def rerun_processing(self, measurement_id: str):
"""
Asks SCC to re-run processing routines for a given measurement ID
Parameters:
measurement_id: Which measurement to re-run
"""
# Submit form
url = constants.rerun_measurement_url
body = {
"_selected_action": measurement_id,
"action": "rerun_all",
"selected_across": "0",
"index": 0,
}
headers = {
"referer": url,
"X-CSRFToken": self.session.cookies["csrftoken"],
}
response = self.session.post(
url, data=body, headers=headers, allow_redirects=False
)
# Look for success banner
if response.status_code == 404:
raise exceptions.MeasurementNotFound(measurement_id)
if response.status_code != 302:
raise exceptions.UnexpectedResponse("Response code is not 302")
# Check for message in cookie
messages_cookie = response.cookies["messages"]
if messages_cookie is None:
raise exceptions.UnexpectedResponse("`Messages` cookie not found")
if "The processing chain was restarted" not in messages_cookie:
raise exceptions.UnexpectedResponse(
"Could not found restart message in cookie"
)
[docs] def get_lidar_consants(
self, date_start: date, date_end: date, location: Union[Location, None], page=1
) -> Tuple[int, List[Measurement]]:
"""
Fetches the Lidar constants from SCC
Parameters:
date_start: First day of results
date_end: Last day of results
location: Optionally, filter results by a location
page: Which page to return (starts from 1, default value is 1)
Returns:
The number of pages and the list of measurements
"""
if page - 1 < 0:
raise ValueError("Page numbers start at 1!")
params = {
"profile_start_time__gte": date_start.strftime("%Y-%m-%d %H:%M:%S"),
"profile_start_time__lt": date_end.strftime("%Y-%m-%d %H:%M:%S"),
"p": page - 1,
}
if location is not None:
params["station"] = location.scc_code
results = self.session.get(constants.lidar_constants_url, params=params)
if not results.ok:
raise exceptions.UnexpectedResponse
# Parse body to find measurements and page count
body = BeautifulSoup(results.text, "html.parser")
pagination = body.find("nav", class_="grp-pagination")
last_page = pagination.find("a", class_="end")
if last_page is None:
pages = 1
else:
pages = int(last_page.text)
lidar_constants = [
LidarConstant.from_table_row(tr)
for tr in body.findAll("tr", {"class": "grp-row"})
]
return pages, lidar_constants
[docs]@contextlib.contextmanager
def scc_session(credentials: SCC_Credentials):
"""
An SCC session as a context, to use with `with:`
Example::
with scc_access(credentials) as scc:
# Use scc
# ...
"""
try:
scc = SCC(credentials)
scc.login()
yield scc
finally:
scc.logout()