Source code for are.simulation.scenarios.scenario_imported_from_json.scenario
# Copyright (c) Meta Platforms, Inc. and affiliates.# All rights reserved.## This source code is licensed under the terms described in the LICENSE file in# the root directory of this source tree.importdatetimeimportloggingfromtypingimportAnyfromfsspecimportAbstractFileSystemfromare.simulation.appsimportALL_APPSfromare.simulation.scenarios.scenarioimportScenario,ScenarioStatusfromare.simulation.typesimportCapabilityTag,ExecutionMetadatalogger=logging.getLogger(__name__)defget_apps(apps_metadata,apps_to_skip:list[str]|None=None,apps_to_keep:list[str]|None=None,sandbox_dir:str|None=None,):initial_apps=[]app_class_mapping={cls.__name__:clsforclsinALL_APPS}forapp_datainapps_metadata:app_name=app_data.nameif(apps_to_skipandapp_nameinapps_to_skiporapps_to_keepandapp_namenotinapps_to_keep):logger.debug(f"Skipping loading app {app_name}.")continue# There could be 2 apps of the same class but different namesapp_class_name=app_data.class_nameapp_state=app_data.app_stateapp_class=app_class_mapping.get(app_class_nameorapp_name)ifnotapp_class:logger.error(f"No class found for the app name {app_name}.")continueifapp_class_nameandapp_class_name!=app_name:app_instance=(app_class(sandbox_dir=sandbox_dir,name=app_name)ifissubclass(app_class,AbstractFileSystem)elseapp_class(name=app_name))else:app_instance=(app_class(sandbox_dir=sandbox_dir)ifissubclass(app_class,AbstractFileSystem)elseapp_class())logger.debug(f"Created new instance of {app_class}.")try:app_instance.load_state(app_state)# type: ignoreinitial_apps.append(app_instance)exceptExceptionase:logger.exception(f"An error occurred while loading state for {app_name}: {e}")returninitial_apps
[docs]classScenarioImportedFromJson(Scenario):""" This is a special class used for importing scenarios from JSON files. """# ID will be overridden upon import.scenario_id:str="scenario_imported_from_json"tags:tuple[CapabilityTag,...]=()start_time:float|None=datetime.datetime.now().timestamp()duration:float|None=1000# Scenario duration in secondsserialized_events:Any=Noneserialized_apps:Any=Noneapps_to_skip:list[str]|None=Noneapps_to_keep:list[str]|None=Nonehints:Any=Nonestatus:ScenarioStatus=ScenarioStatus.Draftexecution_metadata:ExecutionMetadata|None=None# New fields added to support ExportedTraceDefinitionMetadataconfig:str|None=Nonehas_a2a_augmentation:bool=Falsehas_tool_augmentation:bool=Falsehas_env_events_augmentation:bool=Falsehas_exception:bool=Falseexception_type:str|None=Noneexception_message:str|None=None
[docs]definit_and_populate_apps(self,*args,**kwargs)->None:""" Initialize the apps that will be used in the Scenario. """self.apps=get_apps(apps_metadata=self.serialized_apps,apps_to_skip=self.apps_to_skip,apps_to_keep=self.apps_to_keep,sandbox_dir=kwargs.get("sandbox_dir",None),)