Skip to content

Lab 4: Bootstrap with ScriptFlow

Introduction

We want to use scriptflow and our previous estimator to run a bootstrap on PSID data and evaluate the confidence interval on theta (using the code you implemented before).

To do so we are going to pass arguments to our script, in the way it is done in pt_uroot.py.

Understanding argparse

Argparse is Python's standard library module for parsing command-line arguments. It allows you to:

  • Define what arguments your program accepts
  • Parse sys.argv to extract argument values
  • Generate help messages automatically
  • Handle argument validation and type conversion

Basic argparse usage involves:

  1. Creating an ArgumentParser object
  2. Adding arguments using add_argument()
  3. Parsing arguments with parse_args()
  4. Accessing parsed values as attributes
import argparse

parser = argparse.ArgumentParser(description='My script')
parser.add_argument('--param', type=float, default=0.5, help='Parameter value')
args = parser.parse_args()
print(f"Parameter: {args.param}")

How pt_uroot.py Uses argparse

Looking at the pt_uroot.py script, we can see that it currently doesn't implement argparse functionality, but based on how it's called in sflow.py, it's expected to handle command-line arguments. The script is called with:

cmd = [f"{prefix}python scripts/pt_uroot.py -o res_{i}.json -i {i}"]

This suggests that pt_uroot.py should accept: - -o argument: specifies where results should be saved (output file path) - -i argument: an identifier or iteration number for the current run

The argparse implementation would read where the config is coming from and where the results should be saved, following this pattern:

import argparse

parser = argparse.ArgumentParser(description='PT Unit Root Estimation')
parser.add_argument('-o', '--output', type=str, required=True,
                   help='Output file path for results')
parser.add_argument('-i', '--id', type=int, required=True,
                   help='Iteration ID for this run')
args = parser.parse_args()

Understanding Pydantic

Pydantic is a data validation library that provides:

  • Runtime type checking and validation
  • Automatic data conversion
  • Better error messages for invalid data
  • JSON schema generation
  • Integration with other frameworks

Basic pydantic usage:

from pydantic import BaseModel

class Config(BaseModel):
    name: str
    value: float
    enabled: bool = True

# Automatic validation and conversion
config = Config(name="test", value="3.14")  # value converted to float

How pt_uroot.py Uses Pydantic

Currently, pt_uroot.py uses dataclasses rather than pydantic, but the principle is similar. The PTSimulatorConfig dataclass provides structure and type hints:

@dataclass
class PTSimulatorConfig:
    N: int
    T: int
    rho: float
    sigma_e2: float
    sigma_u2: float
    sigma_p1_2: float
    sigma_alpha2: float = 0.0
    device: str = "cpu"
    dtype: torch.dtype = torch.float64
    seed: Optional[int] = None

To use pydantic instead, this would become:

from pydantic import BaseModel, validator

class PTConfig(BaseModel):
    N: int
    T: int
    rho: float
    sigma_e2: float
    sigma_u2: float
    sigma_p1_2: float
    sigma_alpha2: float = 0.0
    device: str = "cpu"
    seed: Optional[int] = None

    @validator('rho')
    def rho_must_be_valid(cls, v):
        if not -1 < v < 1:
            raise ValueError('rho must be between -1 and 1')
        return v

ScriptFlow and the flow_mc Function

Looking at the sflow.py file, the flow_mc function demonstrates how ScriptFlow orchestrates parallel execution:

async def flow_mc():
    tasks = [
        sf.Task(
            cmd    = [f"{prefix}python scripts/pt_uroot.py -o res_{i}.json -i {i}"],
            shell = True,
            outputs = f"res_{i}.json",
            name   = f"solve-{i}")
        for i in range(5)
    ]

    # Execute all tasks in parallel
    await sf.bag(*tasks)

    # Combine results
    combined = []
    for t in tasks:
        with open(t.get_outputs()[0]) as f:
            combined.extend(json.load(f))

    with open("res.json", "w") as f:
        json.dump(combined, f, indent=2)

This function calls the estimator in parallel and collects the different outputs of each of the processes. It splits the Monte Carlo across processes and collects all the results at the end.

Specifically, it:

  1. Creates multiple tasks, each running the same script with different parameters
  2. Executes all tasks simultaneously using sf.bag(*tasks)
  3. After all tasks complete, reads each output JSON file
  4. Combines all results into a single JSON file

Your Task: Transform Monte Carlo to Bootstrap

Now you need to transform the Monte Carlo simulation into a bootstrap analysis:

Step 1: Get the Data

  • Get the data from the shared folder
  • Ensure it's in the correct format for your analysis

Step 2: Adapt Your MA Code

Adapt your Moving Average (MA) code to look like the pt_uroot code structure:

  • Add argparse functionality to handle command-line arguments
  • Use pydantic or dataclasses for configuration validation
  • Structure your script to be called by ScriptFlow

Step 3: Implement Bootstrap Resampling

Modify your script to:

  • Load the data
  • Resample from it to create a bootstrap replication
  • Each replication should sample with replacement from the original dataset

Step 4: Run Estimation and Store Results

For each bootstrap replication:

  • Run the estimation procedure
  • Store the results to JSON format
  • Make sure the output can be easily combined with other replications

Final Implementation

Your completed bootstrap script should:

  • Accept command-line arguments for output file and replication ID
  • Load the original PSID data
  • Generate bootstrap samples by resampling
  • Estimate parameters for each bootstrap sample
  • Save results in JSON format for combination by ScriptFlow
  • Create confidence interval on theta (using a notebook or a script)