sio3pack.workflow.execution

Package Contents

class sio3pack.workflow.execution.Channel(buffer_size, source_pipe, target_pipe, file_buffer_size=None, limit=None)[source]

A configuration of a channel. A channel is a connection between two pipes.

Parameters:
  • buffer_size (int) – The maximum amount of data stored in the channel that has been written by the writer, but not yet read by the reader. This value must be positive.

  • source_pipe (int) – The pipe this channel will be reading from.

  • target_pipe (int) – The pipe this channel will be writing to.

  • file_buffer_size (int) – Controls whether this channel is backed by a file on the disk. A larger buffer may then be allocated on the disk.

  • limit (int) – Limits the maximum amount of data sent through the channel.

classmethod from_json(data)[source]

Create a new channel from a dictionary.

Parameters:

data (dict) – The dictionary to create the channel from.

Return type:

Channel

to_json()[source]

Convert the channel to a dictionary.

Return type:

dict

class sio3pack.workflow.execution.DescriptorManager(objects_manager, filesystem_manager)[source]

A class to manage file descriptors and their associated streams.

Parameters:
add(fd, stream)[source]

Add a stream to the descriptor manager.

Parameters:
  • fd (int) – The file descriptor.

  • stream (Stream) – The stream to add.

from_json(data)[source]

Load the descriptor manager from a JSON-serializable dictionary.

Parameters:

data (dict) – The JSON-serializable dictionary to load from.

to_json()[source]

Convert the descriptor manager to a JSON-serializable dictionary.

Return type:

dict

items()[source]

Get the items in the descriptor manager.

Returns:

A view of the descriptor manager’s items.

Return type:

ItemsView[int, sio3pack.workflow.execution.stream.Stream]

all()[source]

Get all the streams in the descriptor manager.

Returns:

A dictionary of all the streams in the descriptor manager.

Return type:

dict[int, sio3pack.workflow.execution.stream.Stream]

get(fd)[source]

Get a stream by its file descriptor.

Parameters:

fd (int) – The file descriptor.

Returns:

The stream associated with the file descriptor.

Return type:

sio3pack.workflow.execution.stream.Stream

size()[source]

Get the number of streams in the descriptor manager.

Returns:

The number of streams in the descriptor manager.

Return type:

int

class sio3pack.workflow.execution.EmptyFilesystem(id=None)[source]

Bases: Filesystem

A base class to represent a filesystem.

Parameters:

id (int) – The id of the filesystem in the task.

classmethod from_json(data, id, workflow)[source]

Create a new empty filesystem from a dictionary. :param data: The dictionary to create the empty filesystem from. :param id: The id of the empty filesystem. :param workflow: The workflow the empty filesystem belongs to.

Parameters:
to_json()[source]

Convert the empty filesystem to a dictionary.

Return type:

dict

class sio3pack.workflow.execution.Filesystem(id=None)[source]

A base class to represent a filesystem.

Parameters:

id (int) – The id of the filesystem in the task.

classmethod from_json(data, id, workflow)[source]

Create a new filesystem from a dictionary.

Parameters:
  • data (dict) – The dictionary to create the filesystem from.

  • id (int) – The id of the filesystem.

  • workflow (Workflow) – The workflow the filesystem belongs to.

to_json()[source]

Convert the filesystem to a dictionary.

Returns:

The dictionary representation of the filesystem.

Return type:

dict

replace_templates(replacements)[source]

Replace strings in the task with the given replacements.

Parameters:

replacements (dict[str, str]) – The replacements to make.

class sio3pack.workflow.execution.FilesystemManager(task)[source]

A class to manage filesystems.

Parameters:
  • task (Task) – The task the filesystem manager belongs to.

  • filesystems (list[Filesystem]) – The list of filesystems.

from_json(data, workflow)[source]

Create filesystems from a list of dictionaries.

Parameters:
  • data (list[dict]) – The list of dictionaries to create the filesystems from.

  • workflow (Workflow) – The workflow the filesystems belong to.

to_json()[source]

Convert the filesystems to a list of dictionaries.

Return type:

list[dict]

get_by_id(id)[source]

Get a filesystem by id. :param id: The id of the filesystem.

Parameters:

id (int)

Return type:

Filesystem

add(filesystem)[source]

Add a filesystem to the manager. :param filesystem: The filesystem to add.

Parameters:

filesystem (Filesystem)

replace_templates(replacements)[source]

Replace strings in the task with the given replacements.

Parameters:

replacements (dict[str, str]) – The replacements to make.

len()[source]

Get the number of filesystems.

Return type:

int

has_by_id(id)[source]

Check if a filesystem with the given id exists.

Parameters:

id (int) – The id of the filesystem to check.

Returns:

True if the filesystem exists, False otherwise.

Return type:

bool

class sio3pack.workflow.execution.ImageFilesystem(image, path=None, id=None)[source]

Bases: Filesystem

A class to represent an image filesystem. An image can be for example a g++ compilator, a python interpreter, etc.

Parameters:
  • id (int) – The id of the image filesystem.

  • image (str) – The image to use.

  • path (str) – The path to the image. If None, the path is “”.

classmethod from_json(data, id, workflow)[source]

Create a new image filesystem from a dictionary.

Parameters:
  • data (dict) – The dictionary to create the image filesystem from.

  • id (id) – The id of the image filesystem.

  • workflow (Workflow) – The workflow the image filesystem belongs to.

Return type:

ImageFilesystem

to_json()[source]

Convert the image filesystem to a dictionary.

Returns:

The dictionary representation of the image filesystem.

Return type:

dict

class sio3pack.workflow.execution.ObjectFilesystem(object, id=None)[source]

Bases: Filesystem

A base class to represent a filesystem.

Parameters:
  • id (int) – The id of the filesystem in the task.

  • object (sio3pack.workflow.object.Object)

classmethod from_json(data, id, workflow)[source]

Create a new object filesystem from a dictionary. :param data: The dictionary to create the object filesystem from. :param id: The id of the object filesystem. :param workflow: The workflow the object filesystem belongs to.

Parameters:
to_json()[source]

Convert the object filesystem to a dictionary.

Return type:

dict

replace_templates(replacements)[source]

Replace strings in the task with the given replacements.

Parameters:

replacements (dict[str, str]) – The replacements to make.

class sio3pack.workflow.execution.MountNamespace(mountpoints=None, root=0, id=None)[source]

A class to represent a mount namespace. It can mount Mountpoint instances in the target filesystem.

Parameters:
  • id (int) – The id of the mount namespace.

  • mountpoints (list[Mountpoint]) – The mountpoints in the mount namespace.

  • root (int)

classmethod from_json(data, id, filesystem_manager)[source]

Create a new mount namespace from a dictionary. :param data: The dictionary to create the mount namespace from. :param id: The id of the mount namespace. :param filesystem_manager: The filesystem manager to use.

Parameters:
  • data (dict)

  • id (int)

  • filesystem_manager (sio3pack.workflow.execution.filesystems.FilesystemManager)

to_json()[source]

Convert the mount namespace to a dictionary.

Return type:

dict

add_mountpoint(mountpoint)[source]

Add a mountpoint to the mount namespace.

Parameters:

mountpoint (Mountpoint) – The mountpoint to add.

class sio3pack.workflow.execution.MountNamespaceManager(task, filesystem_manager)[source]
Parameters:
  • task (Task)

  • filesystem_manager (sio3pack.workflow.execution.filesystems.FilesystemManager)

from_json(data)[source]

Create a new mount namespace manager from a list of dictionaries.

Parameters:

data (list[dict]) – The list of dictionaries to create the mount namespace manager from.

add(mount_namespace)[source]

Add a mount namespace to the manager.

Parameters:

mount_namespace (MountNamespace) – The mount namespace to add.

get_by_id(id)[source]

Get a mount namespace by its id.

Parameters:

id (int) – The id of the mount namespace.

Return type:

MountNamespace

to_json()[source]

Convert the mount namespace manager to a dictionary.

Return type:

list[dict]

len()[source]

Get the number of mount namespaces.

Return type:

int

class sio3pack.workflow.execution.Mountpoint(source, target, writable=False, capacity=None)[source]

A class to represent a mountpoint.

Parameters:
  • source ("Filesystem") – The source filesystem.

  • target (str) – The target path in the filesystem.

  • writable (bool) – Whether the mountpoint is writable or not.

  • capacity (int) – The capacity of the mountpoint. If None, the capacity is unlimited.

classmethod from_json(data, filesystem_manager)[source]

Create a new mountpoint from a dictionary.

Parameters:
  • data (dict) – The dictionary to create the mountpoint from.

  • filesystem_manager (FilesystemManager) – The filesystem manager to use.

Return type:

Mountpoint

to_json()[source]

Convert the mountpoint to a dictionary.

Return type:

dict

class sio3pack.workflow.execution.Process(workflow, task, arguments=None, environment=None, image='', mount_namespace=None, resource_group=None, pid_namespace=0, working_directory='/', start_after=None)[source]

A class to represent a process in a workflow.

Parameters:
  • workflow (Workflow) – The workflow the process belongs to.

  • task (Executiontask) – The task which the process belongs to.

  • arguments (list[str]) – Executable arguments for the process.

  • environment (dict) – Environment variables for the process.

  • image (str) – The image of the process, which can be a Docker image or similar.

  • mount_namespace (MountNamespace) – The mount namespace to use for the process.

  • resource_group (ResourceGroup) – The resource group of the process.

  • working_directory (str) – The working directory of the process.

  • pid_namespace (int) – The PID namespace of the process.

  • start_after (list[int]) – The processes that must be finished before this process starts.

  • environment

to_json()[source]

Convert the process to a JSON-serializable dictionary.

Return type:

dict

classmethod from_json(data, workflow, task)[source]

Create a new process from a dictionary.

Parameters:
  • data (dict) – The dictionary to create the process from.

  • workflow (Workflow) – The workflow the process belongs to.

  • task (Task) – The task the process belongs to.

replace_templates(replacements)[source]

Replace strings in the process with the given replacements.

Parameters:
  • replacements (dict[str, str]) – The replacements to make.

  • replacements

class sio3pack.workflow.execution.ResourceGroup(cpu_usage_limit=100.0, instruction_limit=1000000000.0, memory_limit=2147483648, oom_terminate_all_tasks=False, pid_limit=2, swap_limit=0, time_limit=1000000000.0, id=None)[source]

A resource group is a set of limits that can be applied to a task. It can limit CPU usage, instruction usage, memory usage, and more.

Parameters:
  • id (int) – The id of the resource group.

  • cpu_usage_limit (int) – The CPU usage limit.

  • instruction_limit (int) – The instruction usage limit.

  • memory_limit (int) – The memory limit.

  • oom_terminate_all_tasks (bool) – Whether to terminate all tasks on OOM.

  • pid_limit (int) – The PID limit.

  • swap_limit (int) – The swap limit.

  • time_limit (int) – The time limit.

set_limits(cpu_usage_limit, instruction_limit, memory_limit, time_limit)[source]

Set the limits of the resource group.

Parameters:
  • cpu_usage_limit (int) – The CPU usage limit.

  • instruction_limit (int) – The instruction usage limit.

  • memory_limit (int) – The memory limit.

  • time_limit (int) – The time limit.

classmethod from_json(data, id)[source]

Create a new resource group from a dictionary.

Parameters:
  • data (dict) – The dictionary to create the resource group from.

  • id (int) – The id of the resource group.

to_json()[source]

Convert the resource group to a dictionary.

Return type:

dict

class sio3pack.workflow.execution.ResourceGroupManager(task)[source]

A class to manage resource groups in a workflow. Allows creation, retrieval, and management of resource groups.

Parameters:

task (Task)

add(resource_group)[source]

Add a resource group to the resource group manager.

Parameters:

resource_group (ResourceGroup) – The resource group to add.

get_by_id(id)[source]

Get a resource group by its id.

Parameters:

id (int) – The id of the resource group to get.

Return type:

ResourceGroup

to_json()[source]

Convert the resource group manager to a dictionary.

Return type:

list[dict]

from_json(data)[source]

Create a new resource group manager from a list of dictionaries.

Parameters:

data (list[dict]) – The list of dictionaries to create the resource group manager from.

all()[source]

Get all resource groups.

Return type:

list[ResourceGroup]

class sio3pack.workflow.execution.FileMode(*args, **kwds)[source]

Bases: enum.Enum

Enum representing the mode of a file stream.

class sio3pack.workflow.execution.FileStream(filesystem, path, mode)[source]

Bases: Stream

Class representing a file stream. A file will be opened and passed to the process as a file descriptor.

Parameters:
  • filesystem (Filesystem) – The filesystem to use.

  • path (str) – The path to the file.

  • mode (FileMode) – The mode to open the file in.

classmethod from_json(filesystem_manager, data)[source]

Create a file stream from a JSON-serializable dictionary.

Parameters:
  • filesystem_manager (FilesystemManager) – The filesystem manager.

  • data (dict) – The JSON-serializable dictionary to create the file stream from.

Return type:

FileStream

to_json()[source]

Convert the file stream to a dictionary.

Returns:

The dictionary representation of the file stream.

Return type:

dict

class sio3pack.workflow.execution.NullStream[source]

Bases: Stream

Class representing a null stream.

classmethod from_json(data)[source]

Create a null stream from a JSON-serializable dictionary.

Parameters:

data (dict) – The JSON-serializable dictionary to create the null stream from.

Return type:

NullStream

to_json()[source]

Convert the null stream to a dictionary.

Returns:

The dictionary representation of the null stream.

Return type:

dict

class sio3pack.workflow.execution.ObjectReadStream(object)[source]

Bases: ObjectStream

Class representing an object read stream. An object read stream is a stream that reads from an object via a file descriptor.

Parameters:

object (Object) – The object to read from.

class sio3pack.workflow.execution.ObjectStream(type, object)[source]

Bases: Stream

A base class for object streams. An object stream is a stream that reads or writes to an object via a file descriptor.

Parameters:
  • type (StreamType) – The type of the stream.

  • object (Object) – The object to use.

classmethod from_json(data, objects_manager)[source]

Create an object stream from a JSON-serializable dictionary.

Parameters:
  • data (dict) – The JSON-serializable dictionary to create the object stream from.

  • objects_manager (sio3pack.workflow.object.ObjectsManager)

Return type:

ObjectStream

to_json()[source]

Convert the object stream to a dictionary.

Return type:

dict

replace_templates(replacements)[source]

Replace strings in the object stream with the given replacements.

Parameters:

replacements (dict[str, str])

class sio3pack.workflow.execution.ObjectWriteStream(object)[source]

Bases: ObjectStream

Class representing an object write stream. An object write stream is a stream that writes to an object via a file descriptor.

Parameters:

object (Object) – The object to write to.

class sio3pack.workflow.execution.PipeReadStream(pipe_index)[source]

Bases: PipeStream

Class representing a pipe read stream. A pipe read stream is a stream that reads from a pipe via a file descriptor.

Parameters:

pipe_index (int) – The index of the pipe.

class sio3pack.workflow.execution.PipeStream(type, pipe_index)[source]

Bases: Stream

A base class for pipe streams. A pipe stream is a stream that reads or writes to a pipe via a file descriptor.

Parameters:
  • type (StreamType) – The type of the stream.

  • pipe_index (int) – The index of the pipe.

classmethod from_json(data)[source]

Create a pipe stream from a JSON-serializable dictionary.

Parameters:

data (dict) – The JSON-serializable dictionary to create the pipe stream from.

Return type:

PipeStream

to_json()[source]

Convert the pipe stream to a dictionary.

Return type:

dict

class sio3pack.workflow.execution.PipeWriteStream(pipe_index)[source]

Bases: PipeStream

Class representing a pipe write stream. A pipe write stream is a stream that writes to a pipe via a file descriptor.

Parameters:

pipe_index (int) – The index of the pipe.

class sio3pack.workflow.execution.Stream(type)[source]

Base class for all streams.

Parameters:

type (StreamType) – The type of the stream.

classmethod from_json(data, objects_manager, filesystem_manager)[source]

Create a stream from a JSON-serializable dictionary.

Parameters:
  • data (dict) – The JSON-serializable dictionary to create the stream from.

  • objects_manager (ObjectsManager) – The objects manager.

  • filesystem_manager (FilesystemManager) – The filesystem manager.

Return type:

Stream

abstractmethod to_json()[source]

Convert the stream to a JSON-serializable dictionary.

Return type:

dict

replace_templates(replacements)[source]

Replace strings in the stream with the given replacements. This method is a no-op for streams that do not support template replacement.

Parameters:

replacements (dict[str, str])

class sio3pack.workflow.execution.StreamType(*args, **kwds)[source]

Bases: enum.Enum

Enum representing the type of stream.