Source code for forecast_processing.ForecastClassifier

"""
Simple Algorithm to classify forecasts based on number of opposing winds.  Similar to method used in RadioWinds.

For now, we are assuming looking at 6 hour increments for a 24 hour window  (4 time instances)

For each time instance,  calculate opposing winds with **n_sectors** (default is 8)

Levels are based on the netcdf file and pressure range.  (Synth will have more levels than ERA5)
"""


import numpy as np
from env.forecast_processing.forecast import Forecast, Forecast_Subset
import windrose
import pandas as pd
from env.forecast_processing.forecast_visualizer import ForecastVisualizer
import matplotlib.pyplot as plt
from utils.initialize_forecast import initialize_forecasts


[docs] class ForecastClassifier: """ Classifies forecasts based on opposing wind patterns. Methods: - determine_opposing_winds: Identify opposing wind levels and directions. - determine_OW_Rate: Calculate the opposing wind rate for a forecast subset. """ def __init__(self): """ Initialize the ForecastClassifier. """ pass
[docs] def determine_opposing_winds(self, wd, levels, n_sectors): """ Identify opposing wind levels and directions from wind data. Args: wd (numpy.ndarray): Wind direction array (degrees). levels (numpy.ndarray): Pressure or altitude levels. n_sectors (int): Number of angular sectors. Returns: tuple: - opposing_wind_directions (numpy.ndarray): Indices of opposing wind directions. - opposing_wind_levels (numpy.ndarray): Levels with opposing winds. """ dir_edges, var_bins, table = windrose.windrose.histogram(wd, levels, bins=levels, nsector=n_sectors) #Determine the sectors (directions) that contain non zero values (altitude levels that have wind) df = pd.DataFrame(table) altitude_lookup_idxs = df.apply(np.flatnonzero, axis=0) # altitude can be pressure or height, depending on by_pressure variable opposing_wind_levels = np.array([]) opposing_wind_directions = np.array([]) # Determine the sectors that have opposing winds by checking the current index and the complimentary index at n_sectors/2. # Also determine the altitudes contains in the opposing wind pairs for calculating probabilities later. for i in range (0,int(n_sectors/2)): # check if opposing sectors in the histogram tables have values greater than 0 # (therefore, there are winds in that sectors) if np.sum(table, axis=0)[i] != 0 and np.sum(table, axis=0)[i+int(n_sectors/2)] != 0: for idx in altitude_lookup_idxs[i]: opposing_wind_levels = np.append(opposing_wind_levels, var_bins[idx]) opposing_wind_directions = np.append(opposing_wind_directions, i) for idx in altitude_lookup_idxs[i+int(n_sectors/2)]: #print(var_bins[idx]) opposing_wind_levels = np.append(opposing_wind_levels, var_bins[idx]) opposing_wind_directions = np.append(opposing_wind_directions, i+int(n_sectors/2)) # sort the opposing wind altitudes and direction idxs (format later) in ascending order and remove duplicates opposing_wind_levels = np.sort(np.unique(opposing_wind_levels)) opposing_wind_directions = np.sort(np.unique(opposing_wind_directions)) return opposing_wind_directions, opposing_wind_levels
[docs] def determine_OW_Rate(self, forecast_subset): """ Calculate the opposing wind rate for a forecast subset over a 24-hour window. Args: forecast_subset (Forecast_Subset): Subset of the forecast. Returns: tuple: - scores (list): Number of opposing wind levels at each time interval. - score (float): Normalized opposing wind rate. """ #Assuming 24 hour subset window right now start_time = forecast_subset.start_time timestamp = start_time scores = [] intervals = 4 # Default Value (how many time incrememnts to look at given a 24 hour window) n_sectors = 8 # Default Value (how many angular bins to classify angular bins into) for i in range (0,intervals): z, u, v = forecast_subset.np_lookup(forecast_subset.lat_central, forecast_subset.lon_central, timestamp) bearing, speed = forecast_subset.windVectorToBearing(u, v) bearing = bearing % (2 * np.pi) bearing = np.degrees(bearing) levels = forecast_subset.ds.level.values opposing_wind_directions, opposing_wind_levels = self.determine_opposing_winds(bearing, levels, n_sectors) scores.append(len(opposing_wind_levels)) timestamp = timestamp + np.timedelta64(6, "h") max_score = (forecast_subset.level_dim*intervals) score = np.sum(scores)/max_score return scores, score
if __name__ == '__main__': # Import Forecasts FORECAST_SYNTH, FORECAST_ERA5, forecast_subset_era5, forecast_subset_synth = initialize_forecasts() # Initialize ForecastClassifier ForecastClassifier = ForecastClassifier() #randomize coord, ERA5 or Synth forecast_subset = forecast_subset_synth #choose _era5 or _synth forecast_subset.randomize_coord() print("random_coord", forecast_subset.lat_central, forecast_subset.lon_central, forecast_subset.start_time) forecast_subset.subset_forecast() #Determine Forecast Score scores, score = ForecastClassifier.determine_OW_Rate(forecast_subset) print(scores,score) # Visualize the forecast at the first timestamp Forecast_visualizer = ForecastVisualizer(forecast_subset) Forecast_visualizer.generate_flow_array(timestamp=forecast_subset.start_time) # Initialize Figure fig = plt.figure(figsize=(15, 10)) ax1 = fig.add_subplot(111, projection='custom3dquiver') fig.add_axes(ax1) Forecast_visualizer.visualize_3d_planar_flow(ax1, quiver_skip= 2) plt.show()