Source code for are.simulation.scenarios.scenario_imported_from_json.scenario

# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.


import datetime
import logging
from typing import Any

from fsspec import AbstractFileSystem

from are.simulation.apps import ALL_APPS
from are.simulation.scenarios.scenario import Scenario, ScenarioStatus
from are.simulation.types import CapabilityTag, ExecutionMetadata

logger = logging.getLogger(__name__)


def get_apps(
    apps_metadata,
    apps_to_skip: list[str] | None = None,
    apps_to_keep: list[str] | None = None,
    sandbox_dir: str | None = None,
):
    initial_apps = []
    app_class_mapping = {cls.__name__: cls for cls in ALL_APPS}

    for app_data in apps_metadata:
        app_name = app_data.name

        if (
            apps_to_skip
            and app_name in apps_to_skip
            or apps_to_keep
            and app_name not in apps_to_keep
        ):
            logger.debug(f"Skipping loading app {app_name}.")
            continue

        # There could be 2 apps of the same class but different names
        app_class_name = app_data.class_name
        app_state = app_data.app_state
        app_class = app_class_mapping.get(app_class_name or app_name)
        if not app_class:
            logger.error(f"No class found for the app name {app_name}.")
            continue
        if app_class_name and app_class_name != app_name:
            app_instance = (
                app_class(sandbox_dir=sandbox_dir, name=app_name)
                if issubclass(app_class, AbstractFileSystem)
                else app_class(name=app_name)
            )
        else:
            app_instance = (
                app_class(sandbox_dir=sandbox_dir)
                if issubclass(app_class, AbstractFileSystem)
                else app_class()
            )
        logger.debug(f"Created new instance of {app_class}.")
        try:
            app_instance.load_state(app_state)  # type: ignore
            initial_apps.append(app_instance)
        except Exception as e:
            logger.exception(
                f"An error occurred while loading state for {app_name}: {e}"
            )

    return initial_apps


[docs] class ScenarioImportedFromJson(Scenario): """ This is a special class used for importing scenarios from JSON files. """ # ID will be overridden upon import. scenario_id: str = "scenario_imported_from_json" tags: tuple[CapabilityTag, ...] = () start_time: float | None = datetime.datetime.now().timestamp() duration: float | None = 1000 # Scenario duration in seconds serialized_events: Any = None serialized_apps: Any = None apps_to_skip: list[str] | None = None apps_to_keep: list[str] | None = None hints: Any = None status: ScenarioStatus = ScenarioStatus.Draft execution_metadata: ExecutionMetadata | None = None # New fields added to support ExportedTraceDefinitionMetadata config: str | None = None has_a2a_augmentation: bool = False has_tool_augmentation: bool = False has_env_events_augmentation: bool = False has_exception: bool = False exception_type: str | None = None exception_message: str | None = None
[docs] def build_events_flow(self) -> None: if self.serialized_events: self.events = [] # type: ignore self.process_events(self.serialized_events)
[docs] def init_and_populate_apps(self, *args, **kwargs) -> None: """ Initialize the apps that will be used in the Scenario. """ self.apps = get_apps( apps_metadata=self.serialized_apps, apps_to_skip=self.apps_to_skip, apps_to_keep=self.apps_to_keep, sandbox_dir=kwargs.get("sandbox_dir", None), )
if __name__ == "__main__": from are.simulation.scenarios.utils.cli_utils import run_and_validate run_and_validate(ScenarioImportedFromJson())