Source code for RLHAB_gym_DUAL

from env.RLHAB_gym_BASE import FlowFieldEnv3dBase
from env.config.env_config import env_params
from env.rendering.renderertriple import MatplotlibRendererTriple
from env.forecast_processing.forecast import Forecast_Subset
from env.forecast_processing.forecast_visualizer import ForecastVisualizer
from env.balloon import BalloonState, SimulatorState
from env.balloon import AltitudeControlCommand as command
from utils.initialize_forecast import initialize_forecasts
import random
import numpy as np
import time
from gymnasium import spaces


[docs] class FlowFieldEnv3d_DUAL(FlowFieldEnv3dBase): def __init__(self, FORECAST_ERA5, FORECAST_SYNTH, render_style="direction", **kwargs): super().__init__(**kwargs) self.FORECAST_ERA5 = FORECAST_ERA5 self.FORECAST_SYNTH = FORECAST_SYNTH self.render_style = render_style self.forecast_subset_era5 = Forecast_Subset(FORECAST_ERA5) self.forecast_subset_synth = Forecast_Subset(FORECAST_SYNTH) self.forecast_subset_era5.randomize_coord(self.np_rng) self.forecast_subset_era5.subset_forecast(days=self.days) self.forecast_subset_synth.assign_coord( lat=self.forecast_subset_era5.lat_central, lon=self.forecast_subset_era5.lon_central, timestamp=self.forecast_subset_era5.start_time ) self.forecast_subset_synth.subset_forecast(days=self.days) if self.render_mode == "human": self.vis_era5 = ForecastVisualizer(self.forecast_subset_era5) self.vis_era5.generate_flow_array(self.forecast_subset_era5.start_time) self.vis_synth = ForecastVisualizer(self.forecast_subset_synth) self.vis_synth.generate_flow_array(self.forecast_subset_synth.start_time) self.renderer = MatplotlibRendererTriple( Forecast_visualizer_ERA5=self.vis_era5, Forecast_visualizer_SYNTH=self.vis_synth, render_mode=self.render_mode, radius=self.radius, coordinate_system="geographic" ) num_levels = len(self.forecast_subset_era5.pressure_levels) self.observation_space = self._build_obs_space(num_levels)
[docs] def reset(self, **kwargs): self.forecast_scores = [5] * 4 # dummy score to trigger randomizing self.forecast_score = -1 # dummy score to trigger randomizing # For not including bad forecasts (score of 0): while self.forecast_score < env_params['forecast_score_threshold']: self.forecast_subset_era5.randomize_coord(self.np_rng) self.forecast_subset_era5.subset_forecast(days=self.days) # Then assign coord to synth winds self.forecast_subset_synth.assign_coord(lat=self.forecast_subset_era5.lat_central, lon=self.forecast_subset_era5.lon_central, timestamp=self.forecast_subset_era5.start_time) self.forecast_subset_synth.subset_forecast(days=self.days) self.forecast_scores, self.forecast_score = self.ForecastClassifier.determine_OW_Rate(self.forecast_subset_era5) #Reset custom metrics self.total_steps = 0 self.within_target = False self.rogue_status = False self.rogue_count = 0 self.rogue_step_trigger = None self.timestamp = self.forecast_subset_era5.start_time # Reset Balloon State to forecast subset central point. self.Balloon = BalloonState(lat = self.forecast_subset_era5.lat_central, lon = self.forecast_subset_era5.lon_central, x = 0, y = 0, altitude = int(random.uniform(env_params['alt_min'],env_params['alt_max'])) ) # Reset simulator state (timestamp to forecast subset start time, counts back to 0) self.SimulatorState = SimulatorState(self.Balloon, self.forecast_subset_era5.start_time) # Do an artificial move to get some initial velocity, disntance, and bearing values, then reset back to initial coordinates self.move_agent(1) self.total_steps = 0 #Reset total steps for the initialization "move" # Synth and ERA5 don't amtter here because the variables should be identical self.Balloon.update(lat=self.forecast_subset_era5.lat_central, lon=self.forecast_subset_era5.lon_central, x=0, y=0, distance=0) # Relative wind column is still era5 self.Balloon.rel_wind_column = self.calculate_relative_wind_column(self.forecast_subset_era5) if self.render_mode == "human": self.renderer.reset(self.goal, self.Balloon, self.SimulatorState) self.vis_era5.generate_flow_array(self.forecast_subset_era5.start_time) # Reset custom metrics self.twr = self.twr_inner = self.twr_outer = 0 self.within_target = False return self._get_obs(), self._get_info()
[docs] def get_motion_forecast(self): return self.forecast_subset_synth
[docs] def get_flow_forecast(self): return self.forecast_subset_era5
[docs] def main(): # Import Forecasts FORECAST_SYNTH, FORECAST_ERA5, forecast_subset_era5, forecast_subset_synth = initialize_forecasts() env = FlowFieldEnv3d_DUAL(FORECAST_ERA5=FORECAST_ERA5, FORECAST_SYNTH=FORECAST_SYNTH, render_mode=env_params['render_mode']) while True: start_time = time.time() obs, info = env.reset() total_reward = 0 for step in range( env_params["episode_length"]): # For random actions obs, reward, done, truncated, info = env.step(random.randint(0, 2)) total_reward += reward if env_params['render_mode'] == "human": env.render() if done: break print("Total reward:", total_reward, info) end_time = time.time() execution_time = end_time - start_time print("Execution time:", execution_time)
if __name__ == '__main__': main()