Source code for sio3pack.packages.sinolpack.model

import json
import os
import re
import tempfile
from typing import Any, Type

import yaml

from sio3pack.exceptions import ImproperlyConfigured, ParsingFailedOn, ProcessPackageError, WorkflowParsingError
from sio3pack.files import File, LocalFile
from sio3pack.packages.package import Package
from sio3pack.packages.package.configuration import SIO3PackConfig
from sio3pack.packages.sinolpack import constants
from sio3pack.packages.sinolpack.enums import ModelSolutionKind
from sio3pack.packages.sinolpack.workflows import SinolpackWorkflowManager
from sio3pack.test import Test
from sio3pack.util import naturalsort_key
from sio3pack.utils.archive import Archive, UnrecognizedArchiveFormat
from sio3pack.workflow import Workflow, WorkflowManager, WorkflowOperation


[docs] class Sinolpack(Package): """ Represents a OIOIOI's standard problem package. :param str short_name: Short name of the problem. :param str full_name: Full name of the problem. :param dict[str, str] lang_titles: A dictionary of problem titles, where keys are language codes and values are titles. :param dict[str, File] lang_statements: A dictionary of problem statements, where keys are language codes and values are files. :param dict[str, Any] config: Configuration of the problem. :param list[dict[str, Any]] model_solutions: A list of model solutions, where each element is a dict containing a model solution kind and a file. :param list[File] additional_files: A list of additional files for the problem. :param dict[str, File] extra_files: A dictionary of extra files as specified in the config.yml file, where keys are file paths and values are :class:`sio3pack.files.File` objects. :param list[File] attachments: A list of attachments related to the problem. :param WorkflowManager workflow_manager: A workflow manager for the problem. :param File | None main_model_solution: The main model solution file. :param dict[str, File | None] special_files: A dictionary of special files, where keys are file names and values are booleans indicating whether the file exists or not. :param list[Test] tests: A list of tests, where each element is a :class:`sio3pack.Test` object. :param bool is_from_db: A flag indicating whether the package is loaded from the database or not. :param SinolpackWorkflowManager workflow_manager: A workflow manager for the problem. """ django_handler = "sio3pack.django.sinolpack.handler.SinolpackDjangoHandler" @classmethod def _find_main_dir(cls, archive: Archive) -> str | None: dirs = list(map(os.path.normcase, archive.dirnames())) dirs = list(map(os.path.normpath, dirs)) toplevel_dirs = list(set(f.split(os.sep)[0] for f in dirs)) problem_dirs = [] for dir in toplevel_dirs: for required_subdir in ("in", "out"): if all(f.split(os.sep)[:2] != [dir, required_subdir] for f in dirs): break else: problem_dirs.append(dir) if len(problem_dirs) == 1: return problem_dirs[0] return None
[docs] @classmethod def identify(cls, file: LocalFile) -> bool: """ Identifies whether file is a Sinolpack. :param file: File with package. :return: True when file is a Sinolpack, otherwise False. """ path = file.path try: archive = Archive(path) return cls._find_main_dir(archive) is not None except UnrecognizedArchiveFormat: return os.path.exists(os.path.join(path, "in")) and os.path.exists(os.path.join(path, "out"))
[docs] @classmethod def identify_db(cls, problem_id: int) -> bool: """ Identifies whether problem is a Sinolpack. :param problem_id: ID of the problem. :return: True when problem is a Sinolpack, otherwise False. """ from sio3pack.django.common.models import SIO3Package return SIO3Package.objects.filter(problem_id=problem_id).exists()
def __del__(self): if hasattr(self, "tmpdir"): self.tmpdir.cleanup() def __init__(self): super().__init__() def _from_file(self, file: LocalFile, configuration: SIO3PackConfig = None): super()._from_file(file, configuration) if self.is_archive: archive = Archive(file.path) self.short_name = self._find_main_dir(archive) self.tmpdir = tempfile.TemporaryDirectory() archive.extract(to_path=self.tmpdir.name) self.rootdir = os.path.join(self.tmpdir.name, self.short_name) else: self.short_name = os.path.basename(os.path.abspath(file.path)) self.rootdir = os.path.abspath(file.path) if os.path.exists(os.path.join(self.rootdir, "workflows.json")): try: with open(os.path.join(self.rootdir, "workflows.json"), "r") as f: workflows = json.load(f) self.workflow_manager = SinolpackWorkflowManager(self, workflows) except json.JSONDecodeError as e: raise WorkflowParsingError( f"Invalid JSON in workflows.json: {e}", ParsingFailedOn.JSON, full_message="Invalid JSON in workflows.json file. Please check the file for syntax errors.", ) else: self.workflow_manager = self._default_workflow_manager() self._process_package() def _from_db(self, problem_id: int, configuration: SIO3PackConfig = None): super()._from_db(problem_id, configuration) super()._setup_django_handler(problem_id) # TODO: Workflows probably should be fetched only if they are needed, since this can be slow super()._setup_workflows_from_db() if not self.django_enabled: raise ImproperlyConfigured( "sio3pack is not installed with Django support.", "from_db function was used, but sio3pack isn't installed with Django support. " "Read the documentation to learn more.", ) def _workflow_manager_class(self) -> Type[WorkflowManager]: return SinolpackWorkflowManager
[docs] def get_doc_dir(self) -> str: """ Returns the path to the directory containing the problem's documents. """ return os.path.join(self.rootdir, "doc")
[docs] def get_in_doc_dir(self, filename: str) -> File: """ Returns the path to the input file in the documents' directory. """ return LocalFile(os.path.join(self.get_doc_dir(), filename))
[docs] def get_in_root(self, filename: str) -> File: """ Returns the path to the input file in the root directory. """ return LocalFile(os.path.join(self.rootdir, filename))
[docs] def get_prog_dir(self) -> str: """ Returns the path to the directory containing the problem's program files. """ return os.path.join(self.rootdir, "prog")
[docs] def get_in_prog_dir(self, filename: str) -> LocalFile: """ Returns the path to the input file in the program directory. """ return LocalFile(os.path.join(self.get_prog_dir(), filename))
[docs] def get_attachments_dir(self) -> str: """ Returns the path to the directory containing the problem's attachments. """ return os.path.join(self.rootdir, "attachments")
def _process_package(self): self._process_config_yml() self._detect_full_name() self._detect_full_name_translations() self._process_prog_files() self._process_extra_files() self._process_statements() self._process_attachments() self._process_existing_tests() def _process_config_yml(self): """ Process the config.yml file. If it exists, it will be loaded into the config attribute. """ try: config = self.get_in_root("config.yml") self.config = yaml.safe_load(config.read()) # Support for local packages self.short_name = self.config.get("sinol_task_id", self.short_name) except FileNotFoundError: self.config = {}
[docs] def reload_config(self): """ Process the config.yml file again in case it was modified. """ self._process_config_yml()
def _detect_full_name(self): """ Sets the problem's full name from the ``config.yml`` (key ``title``) or from the ``title`` tag in the LaTeX source file (backwards compatibility). The ``config.yml`` file takes precedence over the LaTeX source. Example of how the ``title`` tag may look like: \title{A problem} """ if "title" in self.config: self.full_name = self.config["title"] return try: source = self.get_in_doc_dir(self.short_name + "zad.tex") text = source.read() r = re.search(r"^[^%]*\\title{(.+)}", text, re.MULTILINE) if r is not None: self.full_name = r.group(1) except FileNotFoundError: pass
[docs] def get_title(self, lang: str | None = None) -> str: """ Returns the problem title for a given language code. """ if lang is None: return self.full_name return self.lang_titles.get(lang, self.full_name)
def _detect_full_name_translations(self): """Creates problem's full name translations from the ``config.yml`` (keys matching the pattern ``title_[a-z]{2}``, where ``[a-z]{2}`` represents two-letter language code defined in ``settings.py``), if any such key is given. """ self.lang_titles = {} for lang_code, _ in self._get_from_django_settings("LANGUAGES", [("en", "English")]): key = f"title_{lang_code}" if key in self.config: self.lang_titles[lang_code] = self.config[key]
[docs] def get_submittable_extensions(self): """ Returns a list of extensions that are submittable. """ return self.config.get( "submittable_langs", self._get_from_django_settings("SUBMITTABLE_LANGUAGES", ["c", "cpp", "cc", "cxx", "py"]), )
[docs] def get_model_solution_regex(self): """ Returns the regex used to determine model solutions. """ extensions = self.get_submittable_extensions() return rf"^{self.short_name}[0-9]*([bs]?)[0-9]*(_.*)?\.({'|'.join(extensions)})"
[docs] def main_model_solution_regex(self): """ Returns the regex used to determine main model solution. """ extensions = self.get_submittable_extensions() return rf"^{self.short_name}\.({'|'.join(extensions)})"
def _get_model_solutions(self) -> list[dict[str, Any]]: """ Returns a list of model solutions, where each element is a dict of model solution kind and filename. """ if not os.path.exists(self.get_prog_dir()): return [] regex = self.get_model_solution_regex() model_solutions = [] main_solution: File | None = None main_regex = self.main_model_solution_regex() for file in os.listdir(self.get_prog_dir()): match = re.match(regex, file) if match and os.path.isfile(os.path.join(self.get_prog_dir(), file)): file = LocalFile(os.path.join(self.get_prog_dir(), file)) model_solutions.append({"file": file, "kind": ModelSolutionKind.from_regex(match.group(1))}) if re.match(main_regex, file.filename): main_solution = file self.main_model_solution = main_solution return model_solutions
[docs] def sort_model_solutions(self, model_solutions: list[dict[str, Any]]) -> list[dict[str, Any]]: """ Sorts model solutions by kind. """ def sort_key(model_solution): kind: ModelSolutionKind = model_solution["kind"] file: LocalFile = model_solution["file"] return kind.value, naturalsort_key(file.filename[: file.filename.index(".")]) return list(sorted(model_solutions, key=sort_key))
[docs] def special_file_types(self) -> list[str]: """ Returns the list of special file types. """ return ["ingen", "inwer", "soc", "chk"]
def _get_all_files_from_list(self, filenames: list[str]) -> list[LocalFile]: files = [] for filename in filenames: files.append(self.get_in_prog_dir(filename)) return files def _process_prog_files(self): """ Process all files in the problem's program directory that are used. Saves all models solution files. If the problem has a custom workflow file, takes the files that are used in the workflow. Otherwise, ingen, inwer and files in `extra_compilation_files` and `extra_execution_files` from config are saved. """ # Process model solutions. self.model_solutions = self.sort_model_solutions(self._get_model_solutions()) self.additional_files = [] extra_files = [] extra_files.extend(self.config.get("extra_compilation_files", [])) for lang_extra_files in self.config.get("extra_execution_files", {}).values(): extra_files.extend(lang_extra_files) for file in extra_files: try: lf = LocalFile(os.path.join(self.get_prog_dir(), file)) self.additional_files.append(lf) except FileNotFoundError: where = ( "extra_compilation_files" if file in self.config.get("extra_compilation_files", []) else "extra_execution_files" ) raise ProcessPackageError( f"Extra file '{file}' from {where} not found.", f"The extra file '{file}' specified in the config.yml file under {where} does not exist in the " f"prog/ directory of the problem package. " f"Please check the package structure and ensure that the file is present or remove it from the config.", ) extensions = self.get_submittable_extensions() + ["sh"] self.special_files: dict[str, File | None] = {} for file in self.special_file_types(): try: lf = LocalFile.get_file_matching_extension(self.get_prog_dir(), self.short_name + file, extensions) self.additional_files.append(lf) self.special_files[file] = lf except FileNotFoundError: self.special_files[file] = None def _process_extra_files(self): """ Process extra files from the config.yml file. The files are stored in the `extra_files` attribute. """ self.extra_files = {} conf_extra_files = self.config.get("extra_files", []) if isinstance(conf_extra_files, str): conf_extra_files = [conf_extra_files] for file in conf_extra_files: try: lf = LocalFile(os.path.join(self.rootdir, file)) self.extra_files[file] = lf except FileNotFoundError: raise ProcessPackageError( f"Extra file '{file}' not found.", f"The extra file '{file}' specified in the config.yml file does not exist in the package. " f"Path to this file should be relative to the root directory of the package. " f"Please check the package structure and ensure that the file is present or remove it from the config.", )
[docs] def get_extra_file(self, package_path: str) -> File | None: """ Returns the extra file with the given package path. :param package_path: The path to the extra file in the package. :return: The extra file if it exists, otherwise None. """ if self.is_from_db: return self.django.get_extra_file(package_path) else: return self.extra_files.get(package_path, None)
[docs] def get_statement(self, lang: str | None = None) -> File | None: """ Returns the problem statement for a given language code. """ return self.lang_statements.get(lang or "", None)
def _process_statements(self): """ Creates a problem statement from html or pdf source. TODO: we have to support this somehow, but we can't use makefiles. Probably a job for sio3workers. If `USE_SINOLPACK_MAKEFILES` is set to True in the OIOIOI settings, the pdf file will be compiled from a LaTeX source. """ self.lang_statements = {} docdir = self.get_doc_dir() if not os.path.exists(docdir): return lang_prefs = [""] + [ f"-{lang}" for lang, _ in self._get_from_django_settings("LANGUAGES", [("en", "English"), ("pl", "Polish")]) ] for lang in lang_prefs: try: htmlzipfile = self.get_in_doc_dir(f"{self.short_name}zad{lang}.html.zip") # TODO: what to do with html? # if self._html_disallowed(): # raise ProblemPackageError( # _( # "You cannot upload package with " # "problem statement in HTML. " # "Try again using PDF format." # ) # ) # # self._force_index_encoding(htmlzipfile) # statement = ProblemStatement(problem=self.problem, language=lang[1:]) # statement.content.save( # self.short_name + lang + '.html.zip', LocalFile(open(htmlzipfile, 'rb')) # ) except FileNotFoundError: pass try: pdffile = self.get_in_doc_dir(f"{self.short_name}zad{lang}.pdf") if lang == "": self.lang_statements[""] = pdffile else: self.lang_statements[lang[1:]] = pdffile except FileNotFoundError: pass def _process_attachments(self): """ """ attachments_dir = self.get_attachments_dir() if not os.path.isdir(attachments_dir): self.attachments = [] return self.attachments = [ LocalFile(os.path.join(attachments_dir, attachment)) for attachment in os.listdir(attachments_dir) if os.path.isfile(os.path.join(attachments_dir, attachment)) ] def _get_test_regex(self) -> str: return rf"^{self.short_name}(([0-9]+)([a-z]?[a-z0-9]*)).(in|out)$"
[docs] def match_test_regex(self, filename: str) -> re.Match | None: """ Returns match object if the filename matches the test regex. """ return re.match(self._get_test_regex(), filename)
[docs] def get_test_id_from_filename(self, filename: str) -> str: """ Returns the test ID from the filename. """ match = self.match_test_regex(filename) if match: return match.group(1) raise ValueError(f"Invalid filename format: {filename}")
[docs] def get_group_from_filename(self, filename: str) -> str: """ Returns the group from the filename. """ match = self.match_test_regex(filename) if match: return match.group(2) raise ValueError(f"Invalid filename format: {filename}")
def _process_existing_tests(self): """ Process pre-existing input and output tests. """ # TODO: Rewrite this test_ids = set() for ext in ("in", "out"): for file in os.listdir(os.path.join(self.rootdir, ext)): match = self.match_test_regex(os.path.basename(file)) if match: test_name = os.path.splitext(os.path.basename(file))[0] test_id = match.group(1) group = match.group(2) test_ids.add((test_id, group, test_name)) elif not self.configuration.allow_unrecognized_files: raise ProcessPackageError( f"Unrecognized test in {ext} directory: {file}", f"All files in the {ext} directory should match the pattern: " f"{self._get_test_regex()}.", ) # TODO: Sort this properly test_ids = sorted(test_ids) self.tests = [] for test_id, group, test_name in test_ids: if os.path.exists(os.path.join(self.rootdir, "in", self.short_name + test_id + ".in")): in_file = LocalFile(os.path.join(self.rootdir, "in", self.short_name + test_id + ".in")) else: in_file = None if os.path.exists(os.path.join(self.rootdir, "out", self.short_name + test_id + ".out")): out_file = LocalFile(os.path.join(self.rootdir, "out", self.short_name + test_id + ".out")) else: out_file = None self.tests.append(Test(test_name, test_id, in_file, out_file, group))
[docs] def reload_tests(self): """ Updates `self.tests` variable with existing tests. """ self._process_existing_tests()
[docs] def get_input_tests(self) -> list[Test]: """ Returns the list of tests with input files. """ return [test for test in self.tests if test.in_file is not None]
[docs] def get_test(self, test_id: str) -> Test: """ Returns the test with the given ID. """ for test in self.tests: if test.test_id == test_id: return test raise ValueError(f"Test with ID {test_id} not found.")
[docs] def get_tests_with_inputs(self, tests: list[Test] = None) -> list[Test]: """ Returns the list of input tests. """ return [test for test in tests or self.tests if test.in_file is not None]
[docs] def get_corresponding_out_filename(self, in_test: str) -> str: """ Returns the corresponding output test for the given input test. """ # TODO: Better return in_test.replace(".in", ".out")
[docs] def get_outgen_path(self) -> str | None: return self.main_model_solution.path
def _get_special_file_path(self, file_type: str) -> str | None: """ Returns the path to the special file in the program directory. """ # TODO: This should be faster if self.special_files[file_type]: return self.special_files[file_type].path return None
[docs] def get_ingen_path(self) -> str | None: return self._get_special_file_path("ingen")
[docs] def get_inwer_path(self) -> str | None: return self._get_special_file_path("inwer")
[docs] def get_checker_file(self) -> File | None: """ Returns the checker file. """ return self.special_files["chk"]
[docs] def get_checker_path(self) -> str | None: return self._get_special_file_path("chk")
[docs] def get_unpack_operation(self, return_func: callable = None) -> WorkflowOperation | None: has_ingen = self.special_files["ingen"] is not None has_outgen = self.main_model_solution is not None has_inwer = self.special_files["inwer"] is not None return self.workflow_manager.get_unpack_operation(has_ingen, has_outgen, has_inwer, return_func)
def _unpack_return_data(self, data: dict): """ Adds data received from the unpack operation to the package. """ # TODO: implement. The unpack will probably return tests, so we need to process them. # After parsing new tests, verify them self._verify_tests() self._verify_limits() def _verify_tests(self): """ Verifies the tests in the package. This function should be called after unpacking new tests to ensure they are valid and conform to the expected structure. """ for test in self.tests: if not test.in_file: raise ProcessPackageError( f"Input test is missing for test {test.test_id}.", "All tests must have input and output files. The input file is missing for test " f"{test.test_id}. Please check the package structure and ingen.", ) if not test.out_file: raise ProcessPackageError( f"Output test is missing for test {test.test_id}.", "All tests must have input and output files. The output file is missing for test " f"{test.test_id}. Please check the package structure and outgen.", ) def _verify_limits(self): """ Verifies that sum of time limits for all tests does not exceed the maximum allowed time limit for the problem. """ limit = self._get_from_django_settings("MAX_TEST_TIME_LIMIT_PER_PROBLEM") if limit is None: return tl_sum = 0 for test in self.tests: tl_sum += self.get_time_limit_for_test(test, "cpp") # Assuming C++ as the default language if tl_sum > limit: tl_sum /= 1000 # Convert to seconds limit /= 1000 # Convert to seconds raise ProcessPackageError( "Sum of time limits for all tests exceeds the maximum allowed limit.", f"The sum of time limits for all tests ({tl_sum} seconds) exceeds the maximum allowed limit ({limit} seconds). " f"Please adjust the time limits in the config.yml file or reduce the number of tests.", )
[docs] def save_to_db(self, problem_id: int): """ Save the package to the database. If sio3pack isn't installed with Django support, it should raise an ImproperlyConfigured exception. """ self._setup_django_handler(problem_id) if not self.django_enabled: raise ImproperlyConfigured( "sio3pack is not installed with Django support.", "save_to_db function was used, but sio3pack isn't installed with Django support. " "Read the documentation to learn more.", ) self.django.save_to_db()
def _get_compiler_flags(self, lang: str) -> list[str]: """ Extends the compiler flags with the ones from the config.yml file. """ flags = super()._get_compiler_flags(lang) if "extra_compilation_args" in self.config and lang in self.config["extra_compilation_args"]: config_flags = self.config["extra_compilation_args"][lang] if isinstance(config_flags, str): config_flags = [config_flags] flags.extend(config_flags) return flags
[docs] def get_extra_execution_files(self) -> list[File]: """ Returns the list of extra execution files specified in the config.yml file. If no such files are specified, an empty list is returned. :return: List of extra execution files. """ if self.is_from_db: return self.django.extra_execution_files else: return [ LocalFile(os.path.join(self.rootdir, "prog", f)) for f in self.config.get("extra_execution_files", []) if os.path.isfile(os.path.join(self.rootdir, "prog", f)) ]
[docs] def get_extra_compilation_files(self) -> list[File]: """ Returns the list of extra compilation files specified in the config.yml file. If no such files are specified, an empty list is returned. :return: List of extra compilation files. """ if self.is_from_db: return self.django.extra_compilation_files else: return [ LocalFile(os.path.join(self.rootdir, "prog", f)) for f in self.config.get("extra_compilation_files", []) if os.path.isfile(os.path.join(self.rootdir, "prog", f)) ]
def _get_limit(self, test: Test, language: str, type: str) -> int: """ Helper function to get a time/memory limit for a test from the config. :param test: The test to get the limit for. :param language: The language of the program. :param type: The type of limit to get (time or memory). :return: The limit for the test in seconds or bytes. """ if type not in ("time", "memory"): raise ValueError("Type must be either 'time' or 'memory'") def get(conf) -> int: if f"{type}_limits" in conf: if test.test_id in conf[f"{type}_limits"]: return conf[f"{type}_limits"][test.test_id] if int(test.group) in conf[f"{type}_limits"]: return conf[f"{type}_limits"][int(test.group)] if f"{type}_limit" in conf: return conf[f"{type}_limit"] return None if "override_limits" in self.config and language in self.config["override_limits"]: limit = get(self.config["override_limits"][language]) if limit is not None: return limit limit = get(self.config) if limit is not None: return limit if type == "memory": return constants.DEFAULT_MEMORY_LIMIT else: return constants.DEFAULT_TIME_LIMIT
[docs] def get_time_limit_for_test(self, test: Test, language: str) -> int: """ Returns the time limit for the given test. Read the Sinolpack specification for more details. :param test: The test to get the time limit for. :param language: The language of the program. :return: The time limit for the test in seconds. """ return self._get_limit(test, language, "time")
[docs] def get_memory_limit_for_test(self, test: Test, language: str) -> int: """ Returns the memory limit for the given test. Read the Sinolpack specification for more details. :param test: The test to get the memory limit for. :param language: The language of the program. :return: The memory limit for the test in bytes. """ return self._get_limit(test, language, "memory")