import collections
from itertools import cycle
from typing import List, Tuple, Union
import geopandas as gpd
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from matplotlib.figure import Figure
from scipy.stats._distn_infrastructure import rv_frozen
from .area import Area
from .assemblage import Assemblage
from .coverage import Coverage
from .team import Team
[docs]class Survey:
"""Unique index for a set of `Area`, `Assemblage`, `Coverage`, and `Team`
Parameters
----------
name : str
Unique name for the survey
Attributes
----------
name : str
Name of the survey
"""
def __init__(
self,
name: str,
area: Area = None,
assemblage: Assemblage = None,
coverage: Coverage = None,
team: Team = None,
):
"""Create `Survey` instance"""
self.name = name
self.area = area
self.assemblage = assemblage
self.coverage = coverage
self.team = team
# initialize empty outputs
self.raw = None
self.discovery = None
self.time_surveyunit = None
self.time_surveyor = None
self.total_time = 0
[docs] def add_bb(self, bb: List[Union[Area, Assemblage, Coverage, Team]]):
"""Attach building blocks to survey.
Parameters
----------
bb : List[Union[Area, Assemblage, Coverage, Team]]
List of building block objects
"""
# TODO: check that bb is a list
for block in bb:
if isinstance(block, Area):
self.area = block
elif isinstance(block, Assemblage):
self.assemblage = block
elif isinstance(block, Coverage):
self.coverage = block
elif isinstance(block, Team):
self.team = block
[docs] def run(
self,
n_runs: int,
start_run_id: int = 0,
discovery_threshold: float = 0.0,
overwrite: bool = False,
):
stop_run_id = start_run_id + n_runs
if overwrite:
self.raw = None
self.discovery = None
self.time_surveyunit = None
self.time_surveyor = None
self.total_time = 0
resolved_runs = [
_resolve(self, run_id=run_id) for run_id in range(start_run_id, stop_run_id)
]
# first concat outputs, then concat those with class attributes
# From pandas.concat() docs: Any None objects will be dropped silently unless they are all None in which case a ValueError will be raised
raws = pd.concat([run.raw for run in resolved_runs], ignore_index=True)
self.raw = pd.concat([self.raw, raws], ignore_index=True)
discoveries = pd.concat(
[run.discovery for run in resolved_runs], ignore_index=True
)
self.discovery = pd.concat([self.discovery, discoveries], ignore_index=True)
time_surveyunits = pd.concat(
[run.time_surveyunit for run in resolved_runs], ignore_index=True
)
self.time_surveyunit = pd.concat(
[self.time_surveyunit, time_surveyunits], ignore_index=True
)
time_surveyors = pd.concat(
[run.time_surveyor for run in resolved_runs], ignore_index=True
)
self.time_surveyor = pd.concat(
[self.time_surveyor, time_surveyors], ignore_index=True
)
for i in range(len(resolved_runs)):
self.total_time += resolved_runs[i].total_time
[docs] def discovery_plot(
self,
title_size: int = 20,
figsize: Tuple[float, float] = (8.0, 20.0),
**kwargs,
) -> Figure:
# TODO: raise error if self.discovery is None
# function to create basemap of polygon outline
def _make_outline(gdf, ax):
return gdf.plot(ax=ax, facecolor="white", edgecolor="black")
fig, axarr = plt.subplots(1, 1, figsize=figsize)
self.discovery.plot(
ax=_make_outline(self.area.df, axarr),
column="discovery_prob",
legend=False,
legend_kwds={"loc": (1, 0)},
)
axarr.set_title(f"{self.name} (Survey)", fontsize=title_size)
axarr.set_axis_off()
plt.close() # close the plot so that Jupyter won't print it twice
return fig
def _resolve(survey, run_id: int):
"""Determine input parameters, resolve discovery probabilities, and calculate search times"""
def _get_floats_or_distr_vals(item):
"""Duplicate value or randomly select value from distribution,
depending on type
"""
if isinstance(item, rv_frozen):
return item.rvs(size=1)[0]
elif isinstance(float(item), float):
return item
else:
return np.nan
def _extract_values(df, input_col):
return df.loc[:, input_col].apply(_get_floats_or_distr_vals)
ResolvedRun = collections.namedtuple(
"ResolvedRun",
"raw discovery time_surveyunit time_surveyor total_time",
)
# Create inputs df of features from assemblage
assemblage_inputs = survey.assemblage.df.copy()
# Extract obs_rate values
assemblage_inputs.loc[:, "obs_rate"] = _extract_values(
assemblage_inputs, "ideal_obs_rate"
)
# Extract feature time_penalty values
assemblage_inputs.loc[:, "time_penalty_obs"] = _extract_values(
assemblage_inputs, "time_penalty"
)
# Extract surface visibility values
# TODO: if raster, extract value from raster
assemblage_inputs.loc[:, "vis_obs"] = [
_get_floats_or_distr_vals(survey.area.vis)
for i in range(assemblage_inputs.shape[0])
]
# get survey units
coverage_inputs = survey.coverage.df.copy()
# extract min_time_per_unit
coverage_inputs.loc[:, "min_time_per_unit_obs"] = _extract_values(
coverage_inputs, "min_time_per_unit"
)
# calculate search_time
coverage_inputs.loc[:, "base_search_time"] = np.where(
coverage_inputs.loc[:, "surveyunit_type"] == "transect",
coverage_inputs.loc[:, "min_time_per_unit"] * coverage_inputs.loc[:, "length"],
coverage_inputs.loc[:, "min_time_per_unit"],
)
# Allocate surveyors to survey units based on method
# def _assign_surveyors(team, coverage):
if survey.team.assignment == "naive":
people = cycle(survey.team.df.loc[:, "surveyor_name"])
coverage_inputs["surveyor_name"] = [
next(people) for i in range(coverage_inputs.shape[0])
]
elif survey.team.assignment == "speed":
# minimize total team time
# TODO: figure out how to optimize assignment
# Can calculate:
# - total search time,
# - individual surveyor's fraction of the total team time
pass
elif survey.team.assignment == "random":
pass
# Map surveyors to inputs df based on survey units
coverage_team = coverage_inputs.merge(
survey.team.df, how="left", on="surveyor_name"
)
# Extract surveyor speed penalty values
coverage_team.loc[:, "speed_penalty_obs"] = _extract_values(
coverage_team, "speed_penalty"
)
# self.coverage_team = coverage_team
# Find features that intersect coverage
assem_cov_team = gpd.sjoin(assemblage_inputs, coverage_team, how="left")
assert (
assem_cov_team.shape[0] == assemblage_inputs.shape[0]
), "Problem with spatial join. Check for accidental spatial overlap in Coverage."
# record which survey unit it intersects (or NaN)
# if intersects, set proximity to 1.0
# else set proximity to 0.0
assem_cov_team.loc[:, "proximity_obs"] = np.where(
~assem_cov_team.loc[:, "surveyunit_name"].isna(), 1.0, 0.0
)
# Extract surveyor skill values
assem_cov_team.loc[:, "skill_obs"] = _extract_values(assem_cov_team, "skill")
# Calculate final probability of discovery
assem_cov_team.loc[:, "discovery_prob"] = (
assem_cov_team.loc[:, "obs_rate"]
* assem_cov_team.loc[:, "vis_obs"]
* assem_cov_team.loc[:, "proximity_obs"]
* assem_cov_team.loc[:, "skill_obs"]
)
assem_cov_team.loc[:, "run"] = run_id
discovery_df = assem_cov_team.loc[
:,
[
"run",
"feature_name",
"shape",
"obs_rate",
"vis_obs",
"proximity_obs",
"skill_obs",
"discovery_prob",
],
]
# Calculate time stats
# TODO: Duplicate calculations for threshold and no threshold
# def _total_time_calc(
# df,
# out_col="total",
# base_col="base_search_time",
# t_pen_col="time_penalty_obs",
# speed_pen_col="speed_penalty_obs",
# ):
# base_pen = df.loc[:, base_col] + df.loc[:, t_pen_col]
# df.loc[:, out_col] = (base_pen) + (
# base_pen * df.loc[:, speed_pen_col]
# )
# return df
# groupby survey unit
time_per_surveyunit = (
assem_cov_team.groupby(["surveyunit_name", "surveyor_name", "base_search_time"])
.agg({"time_penalty_obs": "sum", "speed_penalty_obs": "mean"})
.reset_index()
.rename(columns={"time_penalty_obs": "sum_time_penalty_obs"})
)
# base penalty = base search time + sum(artifact penalties)
# surveyor penalty =
# base penalty * surveyor penalty factor
# total time =
# base penalty + surveyor penalty
base_pen = (
time_per_surveyunit.loc[:, "base_search_time"]
+ time_per_surveyunit.loc[:, "sum_time_penalty_obs"]
)
surveyor_pen = base_pen * time_per_surveyunit.loc[:, "speed_penalty_obs"]
# multiply above base
time_per_surveyunit.loc[:, "total_time_per_surveyunit"] = base_pen + surveyor_pen
time_per_surveyunit.loc[:, "run"] = run_id
time_surveyunit = time_per_surveyunit.loc[
:,
[
"run",
"surveyunit_name",
"surveyor_name",
"base_search_time",
"sum_time_penalty_obs",
"speed_penalty_obs",
"total_time_per_surveyunit",
],
]
total_time = time_surveyunit.loc[:, "total_time_per_surveyunit"].sum()
# per surveyor
time_per_surveyor = (
time_surveyunit.groupby("surveyor_name")
.agg(
{
"base_search_time": "sum",
"sum_time_penalty_obs": "sum",
"speed_penalty_obs": "mean",
"total_time_per_surveyunit": "sum",
}
)
.reset_index()
.rename(
columns={
"base_search_time": "sum_base_search_time",
"total_time_per_surveyunit": "total_time_per_surveyor",
}
)
)
time_per_surveyor.loc[:, "run"] = run_id
time_surveyor = time_per_surveyor.loc[
:,
[
"run",
"surveyor_name",
"sum_base_search_time",
"sum_time_penalty_obs",
"speed_penalty_obs",
"total_time_per_surveyor",
],
]
return ResolvedRun(
raw=assem_cov_team,
discovery=discovery_df,
time_surveyunit=time_surveyunit,
time_surveyor=time_surveyor,
total_time=total_time,
)