run_test = False
if run_test:
    from sklearn.datasets import make_regression
    from sklearn.preprocessing import MinMaxScaler
    from ddopai.dataloaders.tabular import XYDataLoader
    def run_test_loop(env):
        truncated = False
        while not truncated:
            action = env.action_space.sample()
            obs, reward, terminated, truncated, info = env.step(action)
            print("##### STEP: ", env.index, "#####")
            print("reward:", reward)
            print("info:", info)
            print("next observation:")
            for key, value in obs.items():
                print("     ", key, ":")
                print(value)
            print("truncated:", truncated)
    # create a simple dataset bounded between 0 and 1.
    # We just scale all the data, pretending that it is the demand.
    # When using real data, one should only fit the scaler on the training data
    X, Y = make_regression(n_samples=8, n_features=2, n_targets=1, noise=0.1, random_state=42)
    if len(Y.shape) == 1:
        Y = Y.reshape(-1, 1)
    scaler = MinMaxScaler()
    X = scaler.fit_transform(X)
    Y = scaler.fit_transform(Y)
    dataloader = XYDataLoader(X, Y, val_index_start = 4, test_index_start = 6)
    env_kwargs = dict(
        q_bound_low = 0, # lower bound of the order quantity
        q_bound_high= 1, # upper bound of the order quantity
        underage_cost=0.5, # underage cost per unit
        overage_cost=0.5, # overage cost per unit (zero in most cases)
        fixed_ordering_cost=[2], # fixed ordering cost
        variable_ordering_cost=[0.5], # variable ordering cost per unit
        inventory_pipeline_params = dict(
                                            lead_time_mean=[2], 
                                            lead_time_stochasticity="normal_relative",
                                            lead_time_variance=[0.2],
                                            max_lead_time=[3],
                                            min_lead_time=[1],
                                            ),
    )
    test_env = MultiPeriodEnv(
                            dataloader=dataloader,
                            horizon_train="use_all_data",
                            **env_kwargs
    )
    obs = test_env.reset(start_index=0)
    print("#################### RESET ####################")
    print("#################### RUN IN TRAIN MODE ####################")
    run_test_loop(test_env)
    print("#################### RUN IN VAL MODE ####################")
    test_env.val()
    run_test_loop(test_env)
    print("#################### RUN IN TEST MODE ####################")
    test_env.test()
    run_test_loop(test_env)
    print("#################### RUN IN TRAIN MODE AGAIN ####################")
    test_env.train()
    run_test_loop(test_env)Multi-Period Inventory Management
    Dynamic inventory management problem with inventory carry-over. Can be used to model the Lost Sales problem (when fixed cost are set to 0), and the Multi-Period Fixed Cost problem (when fixed cost are larger than 0).
  
MultiPeriodEnv
MultiPeriodEnv (underage_cost:numpy.ndarray|ddopai.utils.Parameter|int|f loat=1, overage_cost:numpy.ndarray|ddopai.utils.Parameter |int|float=0, fixed_ordering_cost:numpy.ndarray|ddopai.ut ils.Parameter|int|float=0, variable_ordering_cost:numpy.n darray|ddopai.utils.Parameter|int|float=0, holding_cost:n umpy.ndarray|ddopai.utils.Parameter|int|float=1, start_in ventory:numpy.ndarray|ddopai.utils.Parameter|int|float=0, max_inventory:numpy.ndarray|ddopai.utils.Parameter|int|fl oat=inf, inventory_pipeline_params:dict|None=None, q_boun d_low:numpy.ndarray|ddopai.utils.Parameter|int|float=0, q _bound_high:numpy.ndarray|ddopai.utils.Parameter|int|floa t=inf, dataloader:ddopai.dataloaders.base.BaseDataLoader=None, num_SKUs:int|None=None, gamma:float=1, horizon_train:int|str=100, postprocessors:list[object]|None=None, mode:str='train', return_truncation:bool=True, step_info_verbosity=0)
XXX
| Type | Default | Details | |
|---|---|---|---|
| underage_cost | numpy.ndarray | ddopai.utils.Parameter | int | float | 1 | underage cost per unit | 
| overage_cost | numpy.ndarray | ddopai.utils.Parameter | int | float | 0 | overage cost per unit (zero in most cases) | 
| fixed_ordering_cost | numpy.ndarray | ddopai.utils.Parameter | int | float | 0 | fixed ordering cost (applies per SKU, not jointly) | 
| variable_ordering_cost | numpy.ndarray | ddopai.utils.Parameter | int | float | 0 | variable ordering cost per unit | 
| holding_cost | numpy.ndarray | ddopai.utils.Parameter | int | float | 1 | holding cost per unit | 
| start_inventory | numpy.ndarray | ddopai.utils.Parameter | int | float | 0 | initial inventory | 
| max_inventory | numpy.ndarray | ddopai.utils.Parameter | int | float | inf | maximum inventory | 
| inventory_pipeline_params | dict | None | None | parameters for the inventory pipeline, only lead_time_mean must be given. | 
| q_bound_low | numpy.ndarray | ddopai.utils.Parameter | int | float | 0 | lower bound of the order quantity | 
| q_bound_high | numpy.ndarray | ddopai.utils.Parameter | int | float | inf | upper bound of the order quantity | 
| dataloader | BaseDataLoader | None | dataloader | 
| num_SKUs | int | None | None | if None, it will be inferred from the DataLoader | 
| gamma | float | 1 | discount factor | 
| horizon_train | int | str | 100 | if “use_all_data”, then horizon is inferred from the DataLoader | 
| postprocessors | list[object] | None | None | default is an empty list | 
| mode | str | train | Initial mode (train, val, test) of the environment | 
| return_truncation | bool | True | whether to return a truncated condition in step function | 
| step_info_verbosity | int | 0 | 0: no info, 1: some info, 2: all info | 
| Returns | None | 
MultiPeriodEnv.step_
MultiPeriodEnv.step_ (action:numpy.ndarray)
XXX.
| Type | Details | |
|---|---|---|
| action | ndarray | order quantity | 
| Returns | Tuple | 
Example usage of [`NewsvendorEnv`](https://opimwue.github.io/ddopai/20_environments/21_envs_inventory/single_period_envs.html#newsvendorenv) with a distributional dataloader:
# from ddopai.dataloaders.distribution import NormalDistributionDataLoader
# def run_test_loop(env):
#     truncated = False
#     while not truncated:
#         action = env.action_space.sample()
#         obs, reward, terminated, truncated, info = env.step(action)
#         print("##### STEP: ", env.index, "#####")
#         print("reward:", reward)
#         print("info:", info)
#         print("next observation:", obs)
#         print("truncated:", truncated)
# dataloader = NormalDistributionDataLoader(mean=[4, 3], std=[1, 2], num_units=2)
# test_env = MultiPeriodEnv(underage_cost=1, overage_cost=2, dataloader=dataloader, horizon_train=3)
# obs = test_env.reset(start_index=0)
# print("##### RESET #####")
# run_test_loop(test_env)Example usage of [`NewsvendorEnv`](https://opimwue.github.io/ddopai/20_environments/21_envs_inventory/single_period_envs.html#newsvendorenv) using a fixed dataset:
# from sklearn.datasets import make_regression
# from sklearn.preprocessing import MinMaxScaler
# from ddopai.dataloaders.tabular import XYDataLoader
# # create a simple dataset bounded between 0 and 1.
# # We just scale all the data, pretending that it is the demand.
# # When using real data, one should only fit the scaler on the training data
# X, Y = make_regression(n_samples=8, n_features=2, n_targets=2, noise=0.1, random_state=42)
# scaler = MinMaxScaler()
# X = scaler.fit_transform(X)
# Y = scaler.fit_transform(Y)
# dataloader = XYDataLoader(X, Y, val_index_start = 4, test_index_start = 6)
# test_env = NewsvendorEnv(underage_cost=Parameter(np.array([1,1]), shape = (2,)), overage_cost=Parameter(np.array([0.5,0.5]), shape = (2,)), dataloader=dataloader, horizon_train="use_all_data")
# obs = test_env.reset(start_index=0)
# print("#################### RESET ####################")
# print("#################### RUN IN TRAIN MODE ####################")
# run_test_loop(test_env)
# print("#################### RUN IN VAL MODE ####################")
# test_env.val()
# run_test_loop(test_env)
# print("#################### RUN IN TEST MODE ####################")
# test_env.test()
# run_test_loop(test_env)
# print("#################### RUN IN TRAIN MODE AGAIN ####################")
# test_env.train()
# run_test_loop(test_env)