Source code for are.simulation.scenarios.utils.scenario_expander

# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.


import logging
import random
from dataclasses import dataclass, field
from typing import cast

import numpy as np

from are.simulation.apps.agent_user_interface import AgentUserInterface
from are.simulation.apps.apartment_listing import ApartmentListingApp
from are.simulation.apps.email_client import EmailClientApp
from are.simulation.apps.messaging import MessagingApp
from are.simulation.apps.messaging_v2 import MessagingAppV2
from are.simulation.apps.shopping import ShoppingApp
from are.simulation.types import (
    AbstractEnvironment,
    ConditionCheckEvent,
    EventRegisterer,
    EventType,
)

logger = logging.getLogger(__name__)

ENV_EVENT_EXPANSION_TAG = "env_expansion"
# ENV events are scheduled using a Poisson process with finite horizon
ENV_EVENT_DEFAULT_HORIZON = 360


def start_simulation_condition(env: AbstractEnvironment) -> bool:
    # check that the user sent a message to the agent
    return any(
        event.app_class_name() == AgentUserInterface.__name__
        and event.function_name() == "send_message_to_agent"
        for event in env.event_log.list_view()
        if event.event_type == EventType.USER
    )


def default_weight_per_app_class() -> dict[str, float]:
    return {
        "EmailClientApp": 1.0,
        "ApartmentListingApp": 1.0,
        "MessagingApp": 1.0,
        "MessagingAppV2": 1.0,
        "ShoppingApp": 1.0,
    }


[docs] @dataclass class EnvEventsConfig: """ Configuration class for controlling environmental event generation and scheduling in simulations. This class defines parameters that control how background events (messages, emails, shopping updates, apartment listings) are automatically generated and scheduled during scenario execution. Events are scheduled using a Poisson process to simulate realistic timing patterns. These environmental events add noise to scenarios by generating synthetic background activity that is unrelated to the main scenario task. This simulates a more realistic environment where agents receive distracting notifications and updates while trying to complete their primary objective. Attributes: num_env_events_per_minute: Rate of environmental events to generate per minute of simulation time. Higher values create more noise and distractions, making the scenario more challenging by increasing the volume of irrelevant background activity. env_events_seed: Random seed for reproducible event sampling and Poisson scheduling. Different seeds will generate different patterns of noise events while maintaining the same overall distribution and timing characteristics. n_message_events_per_conversation: Maximum number of message events to generate per conversation. Higher values create longer conversation threads in messaging apps, adding more textual noise that the agent must filter through when looking for relevant information. n_item_events_per_product: Maximum number of item events to generate per shopping product. Higher values create more product variants and options in shopping apps, increasing the complexity of product catalogs and making it harder to find specific items. weight_per_app_class: Relative weights for distributing events across different app types. Adjusting these weights changes which types of noise dominate - higher email weights create more inbox clutter, higher messaging weights create more chat notifications, etc. """ num_env_events_per_minute: int = 10 # random seed for sampling ENV events from the scenario universe data and Poisson scheduling env_events_seed: int = 0 # scheduling parameters n_message_events_per_conversation: int = 4 n_item_events_per_product: int = 2 weight_per_app_class: dict[str, float] = field( default_factory=default_weight_per_app_class )
class EnvEventsExpander: def __init__( self, env_events_config: EnvEventsConfig, ) -> None: self.config = env_events_config def get_num_env_events_per_app( self, num_env_events: int, app_names: list[str] ) -> dict[str, int]: # Resolve app names to their canonical form self.resolved_app_names = self._resolve_app_names(app_names) # Calculate the number of events per app num_env_events_per_app = {} total_weight = sum( self.config.weight_per_app_class.get(self.resolved_app_names[app], 0) for app in app_names ) for app in app_names: weight = self.config.weight_per_app_class.get( self.resolved_app_names[app], 0 ) num_env_events_per_app[app] = int((weight / total_weight) * num_env_events) return num_env_events_per_app def _resolve_app_names(self, app_names: list[str]) -> dict[str, str]: # Import here to avoid circular import from are.simulation.validation.constants import APP_ALIAS resolved_names = {} for app in app_names: for canonical_name, aliases in APP_ALIAS.items(): if app == canonical_name or app in ( aliases if isinstance(aliases, list) else [aliases] ): resolved_names[app] = canonical_name break return resolved_names def add_env_events_to_scenario(self, scenario) -> None: # type: ignore """ Expands the scenario in-place with ENV events. :param scenario: scenario to expand """ augmentation_data = scenario.augmentation_data or {} apps_augmentation_data = ( augmentation_data["apps"] if "apps" in augmentation_data else [] ) logger.warning( f"Adding {self.config.num_env_events_per_minute} env events per minute, total duration: {scenario.duration} seconds" ) d_events = dict() add_start_event_flag = False if scenario.events: # first processed event without parent d_events["start_event"] = scenario.events[0] else: # otherwise, wait for first message from the user d_events["start_event"] = ConditionCheckEvent.from_condition( start_simulation_condition, every_tick=3 ) add_start_event_flag = True assert d_events["start_event"].dependencies == [], ( "start event should have no dependencies" ) duration = scenario.duration if scenario.duration else ENV_EVENT_DEFAULT_HORIZON # scheduling ENV events using an exponential distribution np_rng = np.random.default_rng(self.config.env_events_seed) rng = random.Random(self.config.env_events_seed) app_names = [d["name"] for d in apps_augmentation_data] num_env_events = int(self.config.num_env_events_per_minute * duration / 60) num_env_events_per_app = self.get_num_env_events_per_app( num_env_events, app_names ) messaging_apps = ["MessagingAppV2", "Chats", "Messages"] email_apps = ["EmailClientV2", "EmailClientApp", "Mail"] shopping_apps = ["Shopping", "ShoppingApp"] apartment_apps = ["RentAFlat", "ApartmentListingApp"] with EventRegisterer.capture_mode(): for d in apps_augmentation_data: # schedule messaging events app_name = self.resolved_app_names.get(d["name"], d["name"]) if d["name"] in messaging_apps: app = ( cast(MessagingApp, scenario.get_app(d["name"])) if "V2" not in app_name # type: ignore else cast(MessagingAppV2, scenario.get_app(d["name"])) ) conversations = list(d["app_state"]["conversations"].values()) # number of conversations to update n_conversation_events = max( num_env_events_per_app[d["name"]] // self.config.n_message_events_per_conversation, len(conversations), ) n_conversation_events = min( n_conversation_events, len(conversations), ) conversations = rng.sample( conversations, k=n_conversation_events, ) average_rate = n_conversation_events / duration inter_arrival_times = np_rng.exponential( scale=1 / average_rate, size=n_conversation_events ) ticks = np.cumsum(inter_arrival_times) for i, (tick, conversation) in enumerate(zip(ticks, conversations)): if tick > duration: break n_messages = len(conversation["messages"]) if n_messages == 0: continue n_message_events = min( n_messages, self.config.n_message_events_per_conversation, ) message_average_rate = n_message_events / (duration - tick) message_inter_arrival_times = np_rng.exponential( scale=1 / message_average_rate, size=n_message_events ) for i, message in enumerate(conversation["messages"]): if i >= n_message_events: break if "V2" not in app_name: # type: ignore d_events[ f"{d['name']}_{conversation['conversation_id']}_{i}" ] = app.create_and_add_message( conversation_id=conversation["conversation_id"], sender=message["sender"], content=message["content"], ) else: d_events[ f"{d['name']}_{conversation['conversation_id']}_{i}" ] = app.create_and_add_message( conversation_id=conversation["conversation_id"], sender_id=message["sender_id"], content=message["content"], ) if i == 0: d_events[ f"{d['name']}_{conversation['conversation_id']}_{i}" ].depends_on( d_events["start_event"], delay_seconds=tick, ) else: d_events[ f"{d['name']}_{conversation['conversation_id']}_{i}" ].depends_on( d_events[ f"{d['name']}_{conversation['conversation_id']}_{i - 1}" ], delay_seconds=message_inter_arrival_times[i - 1], ) # schedule email events if d["name"] in email_apps: email_client = cast(EmailClientApp, scenario.get_app(d["name"])) emails = d["app_state"]["folders"]["INBOX"]["emails"] rng.shuffle(emails) n_emails = len(emails) if n_emails == 0: continue n_events = min(n_emails, num_env_events_per_app[d["name"]]) average_rate = n_events / duration inter_arrival_times = np_rng.exponential( scale=1 / average_rate, size=n_events ) ticks = np.cumsum(inter_arrival_times) for i, (tick, email) in enumerate(zip(ticks, emails)): d_events[f"email_{email['email_id']}"] = ( email_client.create_and_add_email( sender=email["sender"], recipients=email["recipients"], subject=email["subject"], content=email["content"], folder_name="INBOX", ).depends_on(d_events["start_event"], delay_seconds=tick) ) # schedule shopping events if d["name"] in shopping_apps: shopping = cast(ShoppingApp, scenario.get_app(d["name"])) n_products = len(d["app_state"]["products"]) product_list = list(d["app_state"]["products"].values()) rng.shuffle(product_list) if n_products == 0: continue n_events = min( n_products, num_env_events_per_app[d["name"]] // self.config.n_item_events_per_product, ) average_rate = n_events / duration inter_arrival_times = np_rng.exponential( scale=1 / average_rate, size=n_events ) ticks = np.cumsum(inter_arrival_times) for i, (tick, product) in enumerate(zip(ticks, product_list)): if tick > duration: continue d_events[f"shopping_product_{product['product_id']}"] = ( shopping.add_product( name=product["name"], ) ).depends_on(d_events["start_event"], delay_seconds=tick) n_items = len(product["variants"]) if n_items == 0: continue n_item_events = min( n_items, self.config.n_item_events_per_product ) item_average_rate = n_item_events / (duration - tick) item_inter_arrival_times = np_rng.exponential( scale=1 / item_average_rate, size=n_item_events ) item_ticks = np.cumsum(item_inter_arrival_times) for j, (item_tick, item) in enumerate( zip(item_ticks, product["variants"].values()) ): d_events[f"shopping_item_{item['item_id']}"] = ( shopping.add_item_to_product( product_id="{{" + f"{ENV_EVENT_EXPANSION_TAG}_shopping_product_{product['product_id']}" + "}}", price=item["price"], available=item["available"], options=item["options"], ).depends_on( d_events[ f"shopping_product_{product['product_id']}" ], delay_seconds=item_tick, ) ) for i, (item_id, discount_codes) in enumerate( d["app_state"]["discount_codes"].items() ): discount_codes = cast(dict[str, float], discount_codes) discount_codes: dict[str, float] = { str(k): float(v) for k, v in discount_codes.items() } delay_tick = np_rng.exponential(scale=duration // 2, size=1)[0] if f"shopping_item_{item_id}" in d_events: for code, value in discount_codes.items(): discount_code = {code: value} d_events[f"shopping_discount_code_{item_id}_{code}"] = ( shopping.add_discount_code( item_id="{{" + f"{ENV_EVENT_EXPANSION_TAG}_shopping_item_{item_id}" + "}}", discount_code=discount_code, ) ).depends_on( d_events[f"shopping_item_{item_id}"], delay_seconds=delay_tick, ) # schedule apartment events if d["name"] in apartment_apps: rent_a_flat = cast(ApartmentListingApp, scenario.get_app(d["name"])) apartment_list = list(d["app_state"]["apartments"].values()) rng.shuffle(apartment_list) n_apartments = len(apartment_list) if n_apartments == 0: continue n_events = min(n_apartments, num_env_events_per_app[d["name"]]) average_rate = n_events / duration inter_arrival_times = np_rng.exponential( scale=1 / average_rate, size=n_events ) ticks = np.cumsum(inter_arrival_times) for i, (tick, apartment) in enumerate(zip(ticks, apartment_list)): apartment["number_of_bedrooms"] = apartment["bedrooms"] apartment["number_of_bathrooms"] = apartment["bathrooms"] del apartment["bedrooms"] del apartment["bathrooms"] if "apartment_id" in apartment: del apartment["apartment_id"] apartment["price"] = float(apartment["price"]) d_events[f"apartment_{i}"] = ( rent_a_flat.add_new_apartment(**apartment) ).depends_on(d_events["start_event"], delay_seconds=tick) if not add_start_event_flag: del d_events["start_event"] # already added to scenario.events scenario.events += [ e.with_id(f"{ENV_EVENT_EXPANSION_TAG}_{key}") for key, e in d_events.items() ] logger.warning( f"Added {len(d_events)} env events to the scenario, total {len(scenario.events)} events" )