Skip to content

Function

The Function class wraps user functions decorated with @hog.function() to enable remote execution on HPC clusters. This class is NOT intended to be instantiated directly by users; the decorator is the preferred way to wrap functions.

Function Class

Function(func, endpoint=None, **user_endpoint_config)

Wrapper that enables a Python function to be executed remotely on Globus Compute.

Decorated functions can be called in four ways:

  1. Direct call: func(*args) - executes locally (regular python call)
  2. Remote call: func.remote(*args) - executes remotely and blocks until complete
  3. Async submit: func.submit(*args) - executes remotely and returns a GroundhogFuture
  4. Local subprocess: func.local(*args) - executes locally in a separate process

Attributes:

Name Type Description
endpoint str | None

Default Globus Compute endpoint UUID or named endpoint from [tool.hog.<name>] PEP 723 metadata, or None to use resolved config

default_user_endpoint_config dict[str, Any]

Default endpoint configuration (e.g., worker_init, walltime)

Initialize a Function wrapper.

Parameters:

Name Type Description Default
func FunctionType

The Python function to wrap

required
endpoint str | None

Globus Compute endpoint UUID or named endpoint from [tool.hog.<name>] PEP 723

None
**user_endpoint_config Any

Additional endpoint configuration to pass to Globus Compute Executor (e.g., worker_init commands, walltime)

{}
Source code in src/groundhog_hpc/function.py
def __init__(
    self,
    func: FunctionType,
    endpoint: str | None = None,
    **user_endpoint_config: Any,
) -> None:
    """Initialize a Function wrapper.

    Args:
        func: The Python function to wrap
        endpoint: Globus Compute endpoint UUID or named endpoint from `[tool.hog.<name>]` PEP 723
        **user_endpoint_config: Additional endpoint configuration to pass to
            Globus Compute Executor (e.g., worker_init commands, walltime)
    """
    self._script_path: str | None = None
    self.endpoint: str | None = endpoint
    self.default_user_endpoint_config: dict[str, Any] = user_endpoint_config

    # ShellFunction walltime - always None here to prevent conflicts with a
    # 'walltime' endpoint config, but the attribute exists as an escape
    # hatch if users need to set it after the function's been created.
    # NOTE: walltime must be set before the first .submit() or .local() call;
    # changing it afterwards has no effect because shell_function is cached.
    self.walltime: int | float | None = None

    self._wrapped_function: FunctionType = func
    self._config_resolver: ConfigResolver | None = None

    # Cached parameterized shell command and ShellFunction (built once, reused per instance)
    self._shell_command: str | None = None
    self._shell_function: ShellFunction | None = None

__call__(*args, **kwargs)

Execute the function locally (not remotely).

Parameters:

Name Type Description Default
*args Any

Positional arguments to pass to the function

()
**kwargs Any

Keyword arguments to pass to the function

{}

Returns:

Type Description
Any

The result of the local function execution

Source code in src/groundhog_hpc/function.py
def __call__(self, *args: Any, **kwargs: Any) -> Any:
    """Execute the function locally (not remotely).

    Args:
        *args: Positional arguments to pass to the function
        **kwargs: Keyword arguments to pass to the function

    Returns:
        The result of the local function execution
    """
    return self._wrapped_function(*args, **kwargs)

remote(*args, endpoint=None, user_endpoint_config=None, executor_kwargs=None, **kwargs)

Execute the function remotely and block until completion.

This is a convenience method that calls submit() and immediately waits for the result. While waiting, displays live status updates with task ID, elapsed time, and status.

Parameters:

Name Type Description Default
*args Any

Positional arguments to pass to the function

()
endpoint str | None

Globus Compute endpoint UUID (or named endpoint from [tool.hog.<name>] PEP 723 metadata). Replaces decorator default.

None
user_endpoint_config dict[str, Any] | None

Endpoint configuration dict (merged with decorator default)

None
executor_kwargs dict[str, Any] | None

Keyword arguments forwarded to Globus Compute Executor

None
**kwargs Any

Keyword arguments to pass to the function

{}

Returns:

Type Description
Any

The deserialized result of the remote function execution

Raises:

Type Description
RuntimeError

If called during module import

ValueError

If source file cannot be located

PayloadTooLargeError

If serialized arguments exceed 10MB

RemoteExecutionError

If remote execution fails (non-zero exit code)

Source code in src/groundhog_hpc/function.py
def remote(
    self,
    *args: Any,
    endpoint: str | None = None,
    user_endpoint_config: dict[str, Any] | None = None,
    executor_kwargs: dict[str, Any] | None = None,
    **kwargs: Any,
) -> Any:
    """Execute the function remotely and block until completion.

    This is a convenience method that calls submit() and immediately waits for the result.
    While waiting, displays live status updates with task ID, elapsed time, and status.

    Args:
        *args: Positional arguments to pass to the function
        endpoint: Globus Compute endpoint UUID (or named endpoint from
            `[tool.hog.<name>]` PEP 723 metadata). Replaces decorator default.
        user_endpoint_config: Endpoint configuration dict (merged with decorator default)
        executor_kwargs: Keyword arguments forwarded to Globus Compute Executor
        **kwargs: Keyword arguments to pass to the function

    Returns:
        The deserialized result of the remote function execution

    Raises:
        RuntimeError: If called during module import
        ValueError: If source file cannot be located
        PayloadTooLargeError: If serialized arguments exceed 10MB
        RemoteExecutionError: If remote execution fails (non-zero exit code)
    """
    logger.debug(f"Calling remote execution for '{self.name}'")
    future = self.submit(
        *args,
        endpoint=endpoint,
        user_endpoint_config=user_endpoint_config,
        executor_kwargs=executor_kwargs,
        **kwargs,
    )
    display_task_status(future)
    result = future.result()
    logger.debug(f"Remote execution of '{self.name}' completed successfully")
    return result

submit(*args, endpoint=None, user_endpoint_config=None, executor_kwargs=None, **kwargs)

Submit the function for asynchronous remote execution.

Parameters:

Name Type Description Default
*args Any

Positional arguments to pass to the function

()
endpoint str | None

Globus Compute endpoint UUID (or named endpoint from [tool.hog.<name>] PEP 723 metadata). Replaces decorator default.

None
user_endpoint_config dict[str, Any] | None

Endpoint configuration dict (merged with decorator default)

None
executor_kwargs dict[str, Any] | None

Keyword arguments forwarded to Globus Compute Executor

None
**kwargs Any

Keyword arguments to pass to the function

{}

Returns:

Type Description
GroundhogFuture

A GroundhogFuture that will contain the deserialized result

Raises:

Type Description
RuntimeError

If called during module import

ValueError

If endpoint is not specified and cannot be resolved from config

PayloadTooLargeError

If serialized arguments exceed 10MB

Source code in src/groundhog_hpc/function.py
def submit(
    self,
    *args: Any,
    endpoint: str | None = None,
    user_endpoint_config: dict[str, Any] | None = None,
    executor_kwargs: dict[str, Any] | None = None,
    **kwargs: Any,
) -> GroundhogFuture:
    """Submit the function for asynchronous remote execution.

    Args:
        *args: Positional arguments to pass to the function
        endpoint: Globus Compute endpoint UUID (or named endpoint from
            `[tool.hog.<name>]` PEP 723 metadata). Replaces decorator default.
        user_endpoint_config: Endpoint configuration dict (merged with decorator default)
        executor_kwargs: Keyword arguments forwarded to Globus Compute Executor
        **kwargs: Keyword arguments to pass to the function

    Returns:
        A GroundhogFuture that will contain the deserialized result

    Raises:
        RuntimeError: If called during module import
        ValueError: If endpoint is not specified and cannot be resolved from config
        PayloadTooLargeError: If serialized arguments exceed 10MB
    """
    # Check if module has been marked as safe for .remote() calls
    module = sys.modules.get(self._wrapped_function.__module__)
    if not getattr(module, "__groundhog_imported__", False):
        logger.error(
            f"Import safety check failed for module '{self._wrapped_function.__module__}'"
        )
        raise ModuleImportError(
            self._wrapped_function.__name__,
            "submit",
            self._wrapped_function.__module__,
        )

    logger.debug(f"Preparing to submit function '{self.name}'")
    endpoint = endpoint or self.endpoint

    decorator_config = self.default_user_endpoint_config.copy()
    call_time_config = user_endpoint_config.copy() if user_endpoint_config else {}

    # merge all config sources
    config = self.config_resolver.resolve(
        endpoint_name=endpoint or "",  # will validate below
        decorator_config=decorator_config,
        call_time_config=call_time_config,
    )

    # get endpoint UUID from config if specified (maps friendly names to UUIDs)
    if "endpoint" in config:
        endpoint = config.pop("endpoint")

    # Validate that we have an endpoint at this point
    if not endpoint:
        # Try to provide helpful error message by listing available endpoints in config
        available_endpoints = self._get_available_endpoints_from_pep723()
        if available_endpoints:
            endpoints_str = ", ".join(f"'{e}'" for e in available_endpoints)
            logger.error(f"No endpoint specified. Available: {endpoints_str}")
            raise ValueError(
                f"No endpoint specified. Available endpoints found in config: {endpoints_str}. "
                f"Call with endpoint=<name>, or specify a function default endpoint in decorator."
            )
        else:
            logger.error("No endpoint specified and none found in config")
            raise ValueError("No endpoint specified")

    logger.debug(
        f"Serializing {len(args)} args and {len(kwargs)} kwargs for '{self.name}'"
    )
    payload = serialize((args, kwargs), use_proxy=False, proxy_threshold_mb=None)

    future: GroundhogFuture = submit_to_executor(
        UUID(endpoint),
        user_endpoint_config=config,
        shell_function=self.shell_function,
        payload=payload,
        executor_kwargs=executor_kwargs,
    )
    future.endpoint = endpoint
    future.user_endpoint_config = config
    future.function_name = self.name
    return future

local(*args, **kwargs)

Execute the function locally in an isolated subprocess.

Parameters:

Name Type Description Default
*args Any

Positional arguments to pass to the function

()
**kwargs Any

Keyword arguments to pass to the function

{}

Returns:

Type Description
Any

The deserialized result of the local function execution

Raises:

Type Description
ModuleImportError

If called during module import

ValueError

If source file cannot be located

LocalExecutionError

If local execution fails (non-zero exit code)

Source code in src/groundhog_hpc/function.py
def local(self, *args: Any, **kwargs: Any) -> Any:
    """Execute the function locally in an isolated subprocess.

    Args:
        *args: Positional arguments to pass to the function
        **kwargs: Keyword arguments to pass to the function

    Returns:
        The deserialized result of the local function execution

    Raises:
        ModuleImportError: If called during module import
        ValueError: If source file cannot be located
        LocalExecutionError: If local execution fails (non-zero exit code)
    """
    # Check if module has been marked as safe for .local() calls
    module = sys.modules.get(self._wrapped_function.__module__)
    if not getattr(module, "__groundhog_imported__", False):
        logger.error(
            f"Import safety check failed for module '{self._wrapped_function.__module__}'"
        )
        raise ModuleImportError(
            self._wrapped_function.__name__,
            "local",
            self._wrapped_function.__module__,
        )

    logger.debug(f"Executing function '{self.name}' in local subprocess")
    with prefix_output(prefix="[local]", prefix_color="blue"):
        payload = serialize((args, kwargs), proxy_threshold_mb=1.0)

        with tempfile.TemporaryDirectory() as tmpdir:
            result = _run_shell_locally(self.shell_function.cmd, payload, tmpdir)

            if result.returncode != 0:
                logger.error(
                    f"Local subprocess failed with exit code {result.returncode}"
                )
                if result.stderr:
                    print(result.stderr, file=sys.stderr)
                if result.stdout:
                    print(result.stdout, file=sys.stdout)
                msg = "Local subprocess failed"
                if result.exception_name:
                    msg += f": {result.exception_name}"
                raise LocalExecutionError(msg)

            try:
                user_stdout, deserialized_result = deserialize_stdout(result.stdout)
            except DeserializationError as e:
                logger.error(f"Failed to deserialize local result: {e}")
                if result.stderr:
                    print(result.stderr, file=sys.stderr)
                if e.user_output:
                    print(e.user_output)
                raise
            else:
                logger.debug(
                    f"Local execution of '{self.name}' completed successfully"
                )
                if result.stderr:
                    print(result.stderr, file=sys.stderr)
                if user_stdout:
                    print(user_stdout, file=sys.stdout)
                return deserialized_result

batch_submit(args=None, kwargs=None, endpoint=None, user_endpoint_config=None)

Submit the function for asynchronous remote execution as a batch.

Submits all tasks as a single Globus Compute batch request, avoiding per-task API calls that can hit rate limits.

Parameters:

Name Type Description Default
args list[tuple] | None

List of positional-argument tuples, one per task

None
kwargs list[dict] | None

List of keyword-argument dicts, one per task

None
endpoint str | None

Globus Compute endpoint UUID or named endpoint

None
user_endpoint_config dict[str, Any] | None

Endpoint configuration dict (merged with decorator default)

None

Returns:

Type Description
list[GroundhogFuture]

A list of GroundhogFutures in the same order as the input tasks

Raises:

Type Description
ModuleImportError

If called during module import

ValueError

If both args and kwargs are empty

Source code in src/groundhog_hpc/function.py
def batch_submit(
    self,
    args: list[tuple] | None = None,
    kwargs: list[dict] | None = None,
    endpoint: str | None = None,
    user_endpoint_config: dict[str, Any] | None = None,
) -> list[GroundhogFuture]:
    """Submit the function for asynchronous remote execution as a batch.

    Submits all tasks as a single Globus Compute batch request, avoiding
    per-task API calls that can hit rate limits.

    Args:
        args: List of positional-argument tuples, one per task
        kwargs: List of keyword-argument dicts, one per task
        endpoint: Globus Compute endpoint UUID or named endpoint
        user_endpoint_config: Endpoint configuration dict (merged with decorator default)

    Returns:
        A list of GroundhogFutures in the same order as the input tasks

    Raises:
        ModuleImportError: If called during module import
        ValueError: If both args and kwargs are empty
    """
    args = args or []
    kwargs = kwargs or []
    module = sys.modules.get(self._wrapped_function.__module__)
    if not getattr(module, "__groundhog_imported__", False):
        raise ModuleImportError(
            self._wrapped_function.__name__,
            "batch_submit",
            self._wrapped_function.__module__,
        )

    if max(len(args), len(kwargs)) == 0:
        raise ValueError(
            "batch_submit requires at least one task: args and kwargs are both empty"
        )

    endpoint = endpoint or self.endpoint
    decorator_config = self.default_user_endpoint_config.copy()
    call_time_config = user_endpoint_config.copy() if user_endpoint_config else {}
    config = self.config_resolver.resolve(
        endpoint_name=endpoint or "",
        decorator_config=decorator_config,
        call_time_config=call_time_config,
    )
    if "endpoint" in config:
        endpoint = config.pop("endpoint")
    if not endpoint:
        available_endpoints = self._get_available_endpoints_from_pep723()
        if available_endpoints:
            endpoints_str = ", ".join(f"'{e}'" for e in available_endpoints)
            raise ValueError(
                f"No endpoint specified. Available endpoints found in config: {endpoints_str}."
            )
        raise ValueError("No endpoint specified")

    payloads = []
    for a, kw in itertools.zip_longest(args, kwargs, fillvalue=None):
        a = a if a is not None else ()
        kw = kw if kw is not None else {}
        payloads.append(
            serialize((a, kw), use_proxy=False, proxy_threshold_mb=None)
        )

    futures = submit_batch(UUID(endpoint), config, self.shell_function, payloads)
    for future in futures:
        future.function_name = self.name
        future.user_endpoint_config = config
    return futures

batch_local(args=None, kwargs=None, executor_kwargs=None)

Execute the function locally in parallel subprocesses for each task.

Submits all tasks to a ThreadPoolExecutor immediately and returns futures without waiting for completion. Each task runs in its own subprocess with an isolated temporary directory.

Parameters:

Name Type Description Default
args list[tuple] | None

List of positional-argument tuples, one per task

None
kwargs list[dict] | None

List of keyword-argument dicts, one per task

None
executor_kwargs dict[str, Any] | None

Keyword arguments forwarded to ThreadPoolExecutor

None

Returns:

Type Description
list[GroundhogFuture]

A list of GroundhogFutures in the same order as the input tasks

Raises:

Type Description
ModuleImportError

If called during module import

ValueError

If both args and kwargs are empty

Source code in src/groundhog_hpc/function.py
def batch_local(
    self,
    args: list[tuple] | None = None,
    kwargs: list[dict] | None = None,
    executor_kwargs: dict[str, Any] | None = None,
) -> list[GroundhogFuture]:
    """Execute the function locally in parallel subprocesses for each task.

    Submits all tasks to a ThreadPoolExecutor immediately and returns futures
    without waiting for completion. Each task runs in its own subprocess with
    an isolated temporary directory.

    Args:
        args: List of positional-argument tuples, one per task
        kwargs: List of keyword-argument dicts, one per task
        executor_kwargs: Keyword arguments forwarded to ThreadPoolExecutor

    Returns:
        A list of GroundhogFutures in the same order as the input tasks

    Raises:
        ModuleImportError: If called during module import
        ValueError: If both args and kwargs are empty
    """
    args, kwargs = args or [], kwargs or []
    module = sys.modules.get(self._wrapped_function.__module__)
    if not getattr(module, "__groundhog_imported__", False):
        raise ModuleImportError(
            self._wrapped_function.__name__,
            "batch_local",
            self._wrapped_function.__module__,
        )

    if max(len(args), len(kwargs)) == 0:
        raise ValueError(
            "batch_local requires at least one task: args and kwargs are both empty"
        )

    payloads = []
    for a, kw in itertools.zip_longest(args, kwargs, fillvalue=None):
        a = a if a is not None else ()
        kw = kw if kw is not None else {}
        payloads.append(serialize((a, kw), proxy_threshold_mb=1.0))

    cmd_template = self.shell_function.cmd

    def _worker(payload: str) -> ShellResult:
        with tempfile.TemporaryDirectory() as tmpdir:
            return _run_shell_locally(cmd_template, payload, tmpdir)

    executor = ThreadPoolExecutor(**(executor_kwargs or {}))
    return [GroundhogFuture(executor.submit(_worker, p)) for p in payloads]

Method Class

The Method class is similar to Function but designed for class methods decorated with @hog.method().

Method(func, endpoint=None, **user_endpoint_config)

Bases: Function

Minimal descriptor variant of Function for use as class methods.

Provides staticmethod-like semantics (no self/cls) with remote execution.

Source code in src/groundhog_hpc/function.py
def __init__(
    self,
    func: FunctionType,
    endpoint: str | None = None,
    **user_endpoint_config: Any,
) -> None:
    """Initialize a Function wrapper.

    Args:
        func: The Python function to wrap
        endpoint: Globus Compute endpoint UUID or named endpoint from `[tool.hog.<name>]` PEP 723
        **user_endpoint_config: Additional endpoint configuration to pass to
            Globus Compute Executor (e.g., worker_init commands, walltime)
    """
    self._script_path: str | None = None
    self.endpoint: str | None = endpoint
    self.default_user_endpoint_config: dict[str, Any] = user_endpoint_config

    # ShellFunction walltime - always None here to prevent conflicts with a
    # 'walltime' endpoint config, but the attribute exists as an escape
    # hatch if users need to set it after the function's been created.
    # NOTE: walltime must be set before the first .submit() or .local() call;
    # changing it afterwards has no effect because shell_function is cached.
    self.walltime: int | float | None = None

    self._wrapped_function: FunctionType = func
    self._config_resolver: ConfigResolver | None = None

    # Cached parameterized shell command and ShellFunction (built once, reused per instance)
    self._shell_command: str | None = None
    self._shell_function: ShellFunction | None = None