"""
Orion, a class that provides an interface to work with the Barentswatch API
and a nice functions to help do stuff with the data
Documentation of available API's and how to use them
https://wiki.barentswatch.net/display/BO/API-Documentation
Example:
"""
import json
import logging
import math
import os
import urllib.parse
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Union
import dotenv
import geopandas
import pandas as pd
import pytz
import requests
from requests.auth import HTTPBasicAuth
from shapely.geometry import LineString, Point
from orion.mmsi import MmsiMixin
from orion.types.ais import Ais
from orion.urls import URLS
from orion.vessel_codes import VesselCodeMixin
project_dir = os.path.join(os.path.dirname(__file__), os.pardir)
dotenv_path = os.path.join(project_dir, ".env")
dotenv.load_dotenv(dotenv_path)
_log_fmt = "%(asctime)s - %(module)s - %(levelname)s - %(message)s"
logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO"), format=_log_fmt)
logger = logging.getLogger(__name__)
CLIENT_ID = os.getenv("CLIENT_ID", None)
CLIENT_SECRET = os.getenv("CLIENT_SECRET", None)
[docs]class Orion(MmsiMixin, VesselCodeMixin):
"""Interface to Barentswatch API
The CLIENT_ID and CLIENT_SECRET should be exposed as environment variables called
`CLIENT_ID` and `CLIENT_SECRET` or passed as parameters
when creating an instance of the class.
orion = Orion(client_id="myclientid", client_secret="myclientsecret")
Args:
client_id (Optional[str]): id for your user at Barentswatch.
Use if not set in .env file.
client_secret (Optional[str]): secret for your user at Barentswatch.
Use if not set in .env file.
skip_auth (Optional[bool]): skip authentication. Useful for testing.
"""
def __init__(
self,
client_id: Optional[str] = None,
client_secret: Optional[str] = None,
skip_auth: Optional[bool] = False,
) -> None:
if skip_auth:
return
self.client_id = client_id or CLIENT_ID
self.client_secret = client_secret or CLIENT_SECRET
if (not self.client_id) | (not self.client_secret): # pragma: no cover # noqa
raise ValueError(
"""
Please either set CLIENT_ID and CLIENT_SECRET in .env file
or provide them when creating an instance of the class
"""
)
if type(self.client_id) == str: # pragma: no cover
self.client_id = urllib.parse.quote_plus(self.client_id)
if type(self.client_secret) == str: # pragma: no cover
self.client_secret = urllib.parse.quote_plus(self.client_secret)
self.authenticate_session = requests.Session() # Session for tokens
self.authenticate()
self.access = ""
self.session = requests.Session()
self.session.auth = self.auth # type: ignore
self.session.hooks["response"].append(self.reauth)
def reauth( # type: ignore
self, response: requests.models.Response, *args, **kwargs
) -> requests.models.Response:
if response.status_code != requests.codes.unauthorized:
return response
logger.info("Fetching new token as the previous token expired")
if response.request.headers.get("REATTEMPT"): # pragma: no cover
response.raise_for_status()
self.authenticate()
request = response.request
request.headers["REATTEMPT"] = "1"
if self.session.auth:
authenticated_request = self.auth(request)
response = self.session.send(authenticated_request) # type: ignore
return response
raise ValueError( # pragma: no cover
"No session object found. Please authenticate first."
)
[docs] def auth(
self,
request: Union[requests.models.Request, requests.models.PreparedRequest],
) -> requests.models.Request:
"""
Set the authentication token on every request
"""
request.headers["Authorization"] = f"Bearer {self.access}"
return request # type: ignore
[docs] def authenticate(self) -> None:
"""
Authenticate with Barentswatch
"""
if not hasattr(self, "client_id") or not hasattr(self, "client_secret"):
raise ValueError("Please provide a client id and client secret")
if not self.client_id or not self.client_secret: # pragma: no cover
raise ValueError("Please provide a client id and client secret")
Headers = {"Content-Type": "application/x-www-form-urlencoded"}
body = f"grant_type=client_credentials&client_id={self.client_id}&client_secret={self.client_secret}" # noqa: E501
try:
response = self.authenticate_session.post(
URLS["TOKEN"],
data=body,
auth=HTTPBasicAuth(self.client_id, self.client_secret),
headers=Headers,
)
response.raise_for_status()
data = response.json()
self.access = data["access_token"]
except requests.exceptions.HTTPError as err: # pragma: no cover
raise err
[docs] def get_ais_last_24H(self, mmsi: int) -> List[Ais]:
"""
Get AIS for a ship last 24 hour
Args:
mmsi (int): Maritime Mobile Service Identity (MMSI) is used as
an uinique identifer for a ship
Returns:
json: json of ais track
"""
if not self.mmsi.is_valid_ship_mmsi(mmsi):
raise ValueError("Please provide a valid ship mmsi")
try:
response = self.session.get(
f"{URLS['HISTORIC_AIS']}/historic/trackslast24hours/{mmsi}"
)
return self.decorate_ais_response(response)
except requests.exceptions.HTTPError as err: # pragma: no cover
raise err
[docs] def get_ais(self, mmsi: int, fromDate: str, toDate: str) -> List[Ais]:
"""
Get AIS for a ship in a give timeframe
Args:
mmsi (int): Maritime Mobile Service Identity (MMSI) is used as an uinique
identifer for a ship
fromDate (str): start of timeframe example: 2021-07-21T00:00:00Z
toDate (str): end of timeframe example: 2021-07-23T18:00:00Z
Returns:
json: json of ais track
"""
if not self.mmsi.is_valid_ship_mmsi(mmsi):
raise ValueError("Please provide a valid ship mmsi")
try:
response = self.session.get(
f"{URLS['HISTORIC_AIS']}/historic/tracks/{mmsi}/{fromDate}/{toDate}"
)
return self.decorate_ais_response(response)
except requests.exceptions.HTTPError as err: # pragma: no cover
raise err
def decorate_ais_response(self, response: requests.models.Response) -> List[Ais]:
response.raise_for_status()
ais = response.json()
ais = self.add_jurisdiction_and_ship_type(ais)
return ais
[docs] def get_multiple_ais(
self,
mmsis: List[int],
fromDate: Optional[str] = None,
toDate: Optional[str] = None,
) -> List[Ais]:
"""
Get AIS data from multiple ships in a give timeframe
Args:
mmsis (Array(int)): An array of MMSIs
fromDate (datetime, optional): The start of the timeframe,
if not given the function will get last 24H. Defaults to None.
toDate (_type_, optional): The end of the timeframe, if not given the
function will get last 24H. Defaults to None.
Returns:
Array: Json of combined AIS tracks
"""
ais = []
for mmsi in mmsis:
if not self.mmsi.is_valid_ship_mmsi(mmsi):
raise ValueError("Please provide a valid ship mmsi")
if (not fromDate) or (not toDate):
ais.extend(self.get_ais_last_24H(mmsi))
else:
ais.extend(self.get_ais(mmsi, fromDate, toDate))
return ais
[docs] def get_mmsis_in_area(
self,
geometry: Dict[str, object],
from_date: Optional[str] = None,
to_date: Optional[str] = None,
) -> List[Dict[str, object]]:
"""
Get AIS for ships in given area inside the timeframe
Args:
geometry (Dict[str, object]): GeoJSON geometry
from_date (datetime, optional): The start of the timeframe, if not given
the function will get last 24H. Defaults to None.
to_date (datetime, optional): The end of the timeframe, if not given
the function will get last 24H. Defaults to None.
"""
# Check if features is present, if so pick the first one
if "features" in geometry and len(geometry["features"]) > 0: # type: ignore
geometry = geometry["features"][0]["geometry"] # type: ignore
if "coordinates" not in geometry:
raise ValueError("Geometry does not contain coordinates")
if from_date is None or to_date is None:
_from_date = datetime.now() - timedelta(days=1)
_to_date = datetime.now()
from_date = _from_date.strftime("%Y-%m-%dT%H:%M:%SZ")
to_date = _to_date.strftime("%Y-%m-%dT%H:%M:%SZ")
body = {
"msgtimefrom": f"{from_date}",
"msgtimeto": f"{to_date}",
"polygon": geometry,
}
try:
self.session.headers["Content-Type"] = "application/json"
response = self.session.post(
f"{URLS['HISTORIC_AIS']}/historic/mmsiinarea/",
json=body,
)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as err: # pragma: no cover
raise err
[docs] def buffer_around_point( # type: ignore[no-any-unimported]
self, lat: float, lon: float, distance: int
) -> geopandas.GeoDataFrame:
"""
Create a buffer around a point
Args:
lat (float): latitude
lon (float): longitude
distance (int): distance in meters
Returns:
str: GeoJSON geometry
"""
point = Point(lon, lat)
geo_point = geopandas.GeoSeries(point)
geo_point = geo_point.set_crs(epsg=4326)
geo_point_buffered = (
geo_point.to_crs(epsg=23032).buffer(distance).to_crs(epsg=4326)
)
return json.loads(geo_point_buffered.to_json())["features"][0]["geometry"]
[docs] def buffer_around_gdf( # type: ignore[no-any-unimported]
self, gpd: geopandas.GeoDataFrame, distance: int, column: Optional[str] = None
) -> geopandas.GeoDataFrame:
"""
Create a buffer around a geopandas dataframe
Args:
gpd (geopandas): geopandas dataframe
distance (int): distance in meters
Returns:
geopandas: geopands dataframe with buffer
"""
geo_column = column or "geometry"
gpd[geo_column] = (
gpd[geo_column].to_crs(epsg=23032).buffer(distance).to_crs(epsg=4326)
)
return gpd
[docs] def get_mmsis_in_area_around_point(
self,
lat: float,
lon: float,
distance: int,
from_date: Optional[str] = None,
to_date: Optional[str] = None,
) -> List[Dict[str, object]]:
"""
Get AIS data from ships inside the geometry in a give timeframe
Args:
lat (float): latitude
lon (float): longitude
distance (int): distance in meters
from_date (str, optional): The start of the timeframe,
if not given the function will get last 24H. Defaults to None.
to_date (str, optional): The end of the timeframe,
if not given the function will get last 24H. Defaults to None.
Returns:
Array: Json of combined AIS tracks
"""
geometry = self.buffer_around_point(lat, lon, distance)
return self.get_mmsis_in_area(geometry, from_date, to_date)
[docs] def json_to_gdf( # type: ignore[no-any-unimported]
self, _json: Dict[str, object]
) -> geopandas.GeoDataFrame:
"""
Transforms a json response from NAIS to a GeoDataFrame
Args:
_json ([dict]): array of dicts, json response from NAIS
Returns:
GeoDataFrame: a GeoDataFrame with geometry column and crs
"""
_gdf = geopandas.GeoDataFrame(_json)
_gdf.msgtime = pd.to_datetime(_gdf.msgtime)
_timezone = pytz.timezone("Europe/Berlin")
_gdf.msgtime = _gdf.msgtime.dt.tz_convert(_timezone)
_gdf.geometry = geopandas.points_from_xy(_gdf.longitude, _gdf.latitude)
_gdf.set_crs(epsg=4326, inplace=True)
_gdf = _gdf.sort_values(by="msgtime")
return _gdf
[docs] def explore( # type: ignore[no-any-unimported]
self, _gdf: geopandas.GeoDataFrame
) -> str: # pragma: no cover
"""helper function to geopandas explore
Args:
_gdf (GeoDataFrame): the geodataframe to explore
Returns:
html: an interactive map of our data
"""
# easy fix to SettingWithCopyWarning in Pandas
_g = _gdf.copy()
_g.msgtime = _g.msgtime.astype(str)
return _g.explore(column="speedOverGround", cmap="plasma")
[docs] def merge_points_to_line( # type: ignore[no-any-unimported]
self, _gdf: geopandas.GeoDataFrame, simplify: Optional[int] = None
) -> geopandas.GeoDataFrame:
"""Creates a line for all the points in the geodataframe
Args:
_gdf (GeoDataFrame): the gdf with all the points
Returns:
GeoSeries: dataframe with one line
"""
lineStringObj = LineString([[a.x, a.y] for a in _gdf.geometry.values])
line_df = pd.DataFrame()
line_df["Attrib"] = [
1,
]
gdf = geopandas.GeoDataFrame(
line_df,
geometry=[
lineStringObj,
],
)
gdf.set_crs(epsg=4326, inplace=True)
gs = gdf["geometry"]
if simplify is not None:
gs = gs.to_crs(epsg=23032).simplify(simplify).to_crs(epsg=4326)
return gs.to_json()
[docs] def ais_to_line( # type: ignore[no-any-unimported]
self, ais: Dict[str, object], simplify: Optional[int] = None
) -> geopandas.GeoDataFrame:
"""Creates a line for all the points in the ais json
Args:
ais (Dict[str, object]): ais json
simplify (int, optional): simplify the line, threshold given in meter.
Defaults to None.
Returns:
GeoDataFrame: dataframe with one line
"""
_gdf = self.json_to_gdf(ais)
return self.merge_points_to_line(_gdf, simplify=simplify)
[docs] def calculate_radius_in_meters_from_km2(self, area: float) -> float:
"""
Calculate the radius of a circle with a given area
Args:
area (float): area of the circle in km^2
Returns:
float: radius in meters
"""
area_in_meters = area * 1000000
return math.sqrt(area_in_meters / math.pi)
def max_api_radius(self) -> float:
return self.calculate_radius_in_meters_from_km2(500)
[docs] def add_jurisdiction_and_ship_type(
self,
ais: List[Ais],
) -> List[Ais]:
"""
Adds the jurisdiction and ship type to the dataframe
Args:
ais (List[Ais]): the ais track from NAIS
"""
for a in ais:
if "shipType" in a:
a["shipTypeTxt"] = self.ais_vessel_codes.get_vessel_type_name(
a["shipType"]
)
a["jurisdiction"] = self.mmsi.get_jurisdiction_name(a["mmsi"])
return ais