Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deployer with new injection mechanism #1910

Merged
merged 23 commits into from
Jul 25, 2024
Merged

Conversation

madhur-ob
Copy link
Collaborator

@madhur-ob madhur-ob commented Jul 1, 2024

Usage as follows:

import time
from metaflow import Deployer

ar = Deployer(flow_file="../../try.py").argo_workflows(name="madhur")
ar_obj = ar.create()
print(ar_obj.production_token)
result = ar_obj.trigger(myparam=300)
print(result.status)
run = result.run
while run is None:
    print("trying again...")
    run = result.run
print(result.run)
print(result.status)
time.sleep(120)
print(result.terminate())
Copy link
Contributor

@romain-intel romain-intel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the direction looks good and matches what we had discussed. A few nit comments and a few other general things too:

  • docstrings of course
  • we are going to have to find a good way to properly generate docstrings/stubs just like for the current object. It shouldn't be too hard but something to think about.
  • I haven't looked at the argo impl in detail but got the general idea and I think it looks nice.
metaflow/plugins/argo/argo_workflows_cli.py Show resolved Hide resolved
metaflow/plugins/argo/argo_workflows_cli.py Outdated Show resolved Hide resolved
metaflow/plugins/deployer.py Outdated Show resolved Hide resolved
metaflow/plugins/deployer.py Outdated Show resolved Hide resolved
metaflow/plugins/deployer.py Outdated Show resolved Hide resolved
metaflow/plugins/deployer.py Outdated Show resolved Hide resolved
metaflow/plugins/deployer.py Outdated Show resolved Hide resolved
metaflow/plugins/deployer.py Outdated Show resolved Hide resolved
metaflow/plugins/deployer.py Outdated Show resolved Hide resolved
metaflow/plugins/deployer.py Outdated Show resolved Hide resolved
This was referenced Jul 9, 2024
@madhur-ob
Copy link
Collaborator Author

Overall

Functions

  1. handle_timeout(tfp_runner_attribute, command_obj: CommandManager)

    • Output Type: str
    • Description: Reads content from a temporary file and handles timeout errors by logging the stdout and stderr of the command if a TimeoutError occurs. Raises a RuntimeError if the command times out.
  2. get_lower_level_group(api, top_level_kwargs: Dict, _type: Optional[str], deployer_kwargs: Dict)

    • Output Type: Any
    • Description: Retrieves a lower-level group from the API based on the provided type and arguments. Raises a ValueError if _type is None.

Classes

  1. Deployer

    • Description: Manages the deployment of flows using different provider implementations. Dynamically adds methods for each deployment provider.

    Key Methods:

    • __init__(self, flow_file: str, show_output: bool = True, profile: Optional[str] = None, env: Optional[Dict] = None, cwd: Optional[str] = None, **kwargs)

      • Initializes the Deployer object with the given parameters and dynamically adds methods for each deployment provider.
    • make_function(self, deployer_class)

      • Creates a function for the given deployer class.
  2. TriggeredRun

    • Description: Represents a run that has been triggered for deployment.

    Key Methods:

    • __init__(self, deployer: "DeployerImpl", content: str)

      • Initializes the TriggeredRun object with the deployer implementation and JSON content.
    • _enrich_object(self, env)

      • Enriches the TriggeredRun object with additional properties and methods from the environment dictionary.
    • run(self)

      • Retrieves the Run object for the triggered run, or None if not available.
  3. DeployedFlow

    • Description: Represents a flow that has been deployed.

    Key Methods:

    • __init__(self, deployer: "DeployerImpl")

      • Initializes the DeployedFlow object with the deployer implementation.
    • _enrich_object(self, env)

      • Enriches the DeployedFlow object with additional properties and methods from the environment dictionary.
  4. DeployerImpl

    • Description: Base class for deployer implementations. Each implementation should define a TYPE class variable that matches the name of the CLI group.

    Key Methods:

    • __init__(self, flow_file: str, show_output: bool = True, profile: Optional[str] = None, env: Optional[Dict] = None, cwd: Optional[str] = None, **kwargs)

      • Initializes the DeployerImpl object with the given parameters and sets up the environment for deployment.
    • __enter__(self) -> "DeployerImpl"

      • Enters the context for the deployer implementation.
    • create(self, **kwargs) -> DeployedFlow

      • Creates a deployed flow using the deployer implementation. Raises an Exception if there is an error during deployment.
    • _enrich_deployed_flow(self, deployed_flow: DeployedFlow)

      • Enriches the DeployedFlow object with additional properties and methods. This method must be implemented by subclasses.
    • __exit__(self, exc_type, exc_value, traceback)

      • Cleans up resources on exit.
    • cleanup(self)

      • Cleans up resources.

For Argo-Workflows

Functions

  1. suspend(instance: TriggeredRun, **kwargs)

    • Output Type: bool
    • Description: Suspends a running workflow. Returns True if the command was successful, False otherwise.
  2. unsuspend(instance: TriggeredRun, **kwargs)

    • Output Type: bool
    • Description: Unsuspends a suspended workflow. Returns True if the command was successful, False otherwise.
  3. terminate(instance: TriggeredRun, **kwargs)

    • Output Type: bool
    • Description: Terminates a running workflow. Returns True if the command was successful, False otherwise.
  4. status(instance: TriggeredRun)

    • Output Type: Optional[str]
    • Description: Retrieves the status of a triggered run. Returns the status of the workflow considering the run object, or None if the status could not be retrieved.
  5. production_token(instance: DeployedFlow)

    • Output Type: str
    • Description: Retrieves the production token for a deployed flow.
  6. delete(instance: DeployedFlow, **kwargs)

    • Output Type: bool
    • Description: Deletes a deployed flow. Returns True if the command was successful, False otherwise.
  7. trigger(instance: DeployedFlow, **kwargs)

    • Output Type: TriggeredRun
    • Description: Triggers a new run for a deployed flow. Returns the triggered run instance. Raises an Exception if there is an error during the trigger process.

Class

  1. ArgoWorkflowsDeployer

    • Description: Deployer implementation for Argo Workflows.

    Attributes:

    • TYPE: ClassVar[Optional[str]]
      • The type of the deployer, which is "argo-workflows".

    Key Methods:

    • __init__(self, deployer_kwargs, **kwargs)

      • Initializes the ArgoWorkflowsDeployer with deployer-specific keyword arguments and additional arguments for the superclass constructor.
    • _enrich_deployed_flow(self, deployed_flow: DeployedFlow)

      • Enriches the DeployedFlow object with additional properties and methods.

For Step Functions

Functions

  1. terminate(instance: TriggeredRun, **kwargs)

    • Output Type: bool
    • Description: Terminates a running workflow. Returns True if the command was successful, False otherwise.
  2. production_token(instance: DeployedFlow)

    • Output Type: str
    • Description: Retrieves the production token for a deployed flow.
  3. list_runs(instance: DeployedFlow, **kwargs)

    • Output Type: bool
    • Description: Lists runs of a deployed flow. Returns True if the command was successful, False otherwise.
  4. delete(instance: DeployedFlow, **kwargs)

    • Output Type: bool
    • Description: Deletes a deployed flow. Returns True if the command was successful, False otherwise.
  5. trigger(instance: DeployedFlow, **kwargs)

    • Output Type: TriggeredRun
    • Description: Triggers a new run for a deployed flow. Returns the triggered run instance. Raises an Exception if there is an error during the trigger process.

Class

  1. StepFunctionsDeployer

    • Description: Deployer implementation for AWS Step Functions.

    Attributes:

    • TYPE: ClassVar[Optional[str]]
      • The type of the deployer, which is "step-functions".

    Key Methods:

    • __init__(self, deployer_kwargs, **kwargs)

      • Initializes the StepFunctionsDeployer with deployer-specific keyword arguments and additional arguments for the superclass constructor.
    • _enrich_deployed_flow(self, deployed_flow: DeployedFlow)

      • Enriches the DeployedFlow object with additional properties and methods.
Copy link
Contributor

@romain-intel romain-intel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nits but I think this looks good. I haven't closely reviewed the argo/sfn implementations but the interface and methodology looks good.

metaflow/runner/deployer.py Outdated Show resolved Hide resolved
metaflow/runner/deployer.py Outdated Show resolved Hide resolved
metaflow/runner/deployer.py Outdated Show resolved Hide resolved
metaflow/runner/nbdeploy.py Outdated Show resolved Hide resolved
metaflow/runner/nbdeploy.py Show resolved Hide resolved
@savingoyal savingoyal merged commit cdd9e02 into Netflix:master Jul 25, 2024
26 checks passed
@madhur-ob madhur-ob deleted the new-deployer branch July 25, 2024 17:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
4 participants