Coverage for .nox/test-3-9/lib/python3.9/site-packages/nskit/vcs/repo.py: 95%
183 statements
« prev ^ index » next coverage.py v7.4.2, created at 2024-02-25 17:38 +0000
« prev ^ index » next coverage.py v7.4.2, created at 2024-02-25 17:38 +0000
1"""Repo management class."""
2from __future__ import annotations
4from pathlib import Path
5import shutil
6import subprocess # nosec B404
7import sys
8import tempfile
9from typing import Any, Optional, TYPE_CHECKING, Union
10import warnings
12if sys.version_info.major <= 3 and sys.version_info.minor <= 8:
13 from typing_extensions import Annotated
14else:
15 from typing import Annotated
17import git
18from pydantic import Field, field_validator, model_validator, ValidationInfo
20from nskit._logging import logger_factory
21from nskit.common.configuration import BaseConfiguration
22from nskit.common.contextmanagers import ChDir
23from nskit.common.io import yaml
24from nskit.vcs.installer import InstallersEnum
25from nskit.vcs.namespace_validator import (
26 NamespaceOptionsType,
27 NamespaceValidator,
28 ValidationEnum,
29)
30from nskit.vcs.providers import RepoClient
32if TYPE_CHECKING:
33 from nskit.vcs.codebase import Codebase
36logger = logger_factory.get_logger(__name__)
39DEFAULT_REMOTE = 'origin'
40# We should set the default dir in some form of env var/other logic in the cli
42_NAMESPACE_README = """
43# {name}
45Contains the approved namespaces as a `YAML` file - `namspaces.yaml`
47This is structured as a YAML object, with the options as a list of dictionary lists, with the approved roots at L0, approved L1s etc.
49e.g.
50```
51options:
52 - x:
53 - y:
54 - a
55 - b
56 - z:
57 - w
58 - q
59delimiters:
60 - -
61 - .
62 - ,
63repo_separator: -
64```
66Means we can have repos structured as:
67```
68- x-y-a-<*>
69- x-y-b-<*>
70- x-z-w-<*>
71- x-q-<*>
72```
73but no others.
75The separator is set by the repo_separator parameter, in this example "-".
76""" # noqa: E800
79class _Repo(BaseConfiguration):
81 name: str
82 local_dir: Path = Field(default_factory=Path.cwd)
83 default_branch: str = 'main'
84 provider_client: Annotated[RepoClient, Field(validate_default=True)] = None
86 __git_repo = None
87 _git_repo_cls = git.Repo
89 @field_validator('provider_client', mode='before')
90 @classmethod
91 def _validate_client(cls, value):
92 if value is None:
93 from nskit.vcs.settings import CodebaseSettings
94 value = CodebaseSettings().provider_settings.repo_client
95 return value
97 @property
98 def url(self):
99 """Get the remote url."""
100 return self.provider_client.get_remote_url(self.name)
102 @property
103 def clone_url(self):
104 """Get the remote url."""
105 return self.provider_client.get_clone_url(self.name)
107 def create(self):
108 """Create the repo."""
109 if not self.exists:
110 self.provider_client.create(self.name)
111 self.clone()
112 if not self.exists_locally:
113 self.local_dir.mkdir(exist_ok=True, parents=True)
114 with ChDir(self.local_dir):
115 self._git_repo_cls.init()
117 def delete(self, remote=True):
118 """Delete the repo.
120 remote=True (default), deletes the repo on the remote (no confirmation).
121 """
122 if remote and self.exists:
123 self.provider_client.delete(self.name)
124 if self.exists_locally:
125 errors = []
127 def on_exc(function, path, exc_info): # noqa: U100
128 """Log errors when deleting."""
129 errors.append((path, exc_info))
130 if sys.version_info.major <= 3 and sys.version_info.minor < 12:
131 shutil.rmtree(self.local_dir, onerror=on_exc)
132 else:
133 shutil.rmtree(self.local_dir, onexc=on_exc)
134 if errors:
135 error_info = "\n".join([str(u) for u in errors])
136 warnings.warn(f'Unable to delete some paths due to errors:\n{error_info}', stacklevel=2)
138 def clone(self):
139 """Clone the repo."""
140 if not self.exists_locally:
141 if not self.local_dir.parent.exists():
142 self.local_dir.parent.mkdir(exist_ok=True, parents=True)
143 if self.clone_url:
144 self._git_repo_cls.clone_from(url=self.clone_url, to_path=str(self.local_dir))
146 def pull(self, remote=DEFAULT_REMOTE):
147 """Pull the repo from the remote (defaults to origin)."""
148 # This will use python git library
149 if self._git_repo.remotes:
150 getattr(self._git_repo.remotes, remote).pull()
151 else:
152 warnings.warn('No Remotes found', stacklevel=2)
154 def commit(self, message: str, paths: list[str | Path] | str | Path = '*', hooks=True):
155 """Commit the paths with given message.
157 hooks=False disables running pre-commit hooks.
158 """
159 # Because GitPython add cannot be set to use the .gitignore, we use subprocess here
160 if not isinstance(paths, (list, tuple)):
161 paths = [paths,]
162 with ChDir(self._git_repo.working_tree_dir):
163 subprocess.check_call(['git', 'add']+paths) # nosec B603, B607
164 if hooks:
165 hook_args = []
166 else:
167 hook_args = ['--no-verify']
168 subprocess.check_call(['git', 'commit'] + hook_args + ['-m', message]) # nosec B603, B607
170 def push(self, remote=DEFAULT_REMOTE):
171 """Push the repo to the remote (defaults to origin)."""
172 if self._git_repo.remotes:
173 getattr(self._git_repo.remotes, remote).push()
174 else:
175 warnings.warn('No Remotes found', stacklevel=2)
177 def tag(self, tag, message: str | None = None, force: bool = False):
178 """Tag the repo (with a given name, and optional message."""
179 self._git_repo.create_tag(tag, message=message, force=force)
181 def checkout(self, branch, create=True):
182 """Checkout the branch (create if true)."""
183 self.fetch()
184 try:
185 getattr(self._git_repo.heads, branch).checkout()
186 except AttributeError as e:
187 if create:
188 self._git_repo.create_head(branch).checkout()
189 else:
190 raise e from None
192 def fetch(self, remote=DEFAULT_REMOTE):
193 """Fetch the repo from the remote (defaults to origin)."""
194 getattr(self._git_repo.remotes, remote).fetch()
196 @property
197 def _git_repo(self):
198 if self.__git_repo is None or Path(self.__git_repo.working_tree_dir) != self.local_dir:
199 self.__git_repo = self._git_repo_cls(self.local_dir)
200 return self.__git_repo
202 @property
203 def exists_locally(self):
204 """Check if the repo exists locally (and .git initialised)."""
205 return self.local_dir.exists() and (self.local_dir/'.git').exists()
207 @property
208 def exists(self):
209 """Check if the repo exists on the remote."""
210 return self.provider_client.check_exists(self.name)
212 def install(self, codebase: Codebase | None = None, deps: bool = True):
213 """Install the repo into a codebase.
215 To make it easy to extend to new languages/installation methods, this uses an entrypoint to handle it.
217 The default installer is for python (uses a virtualenv), but can be disabled using NSKIT_PYTHON_INSTALLER_ENABLED=False if you
218 want to provide a custom Python Installer (e.g. using poetry or hatch).
220 Other installers can be added through the nskit.vcs.installers entry_point.
221 """
222 # Loop through available installers and check - if they check True, then install them
223 for installer in InstallersEnum:
224 logger.info(f'Trying {installer.value}, all matching languages will be installed.')
225 if installer.extension:
226 language_installer = installer.extension()
227 if language_installer.check(self.local_dir):
228 logger.info(f'Matched {installer.value}, installing.')
229 language_installer.install(path=self.local_dir, codebase=codebase, deps=deps)
232class NamespaceValidationRepo(_Repo):
233 """Repo for managing namespace validation."""
234 # # This is not ideal behaviour, but due to the issue highlighted in
235 # # https://github.com/pydantic/pydantic-settings/issues/245 and the
236 # # non-semver compliant versioning in pydantic-settings, we need to add this behaviour
237 # # this now changes the API behaviour for these objects as they will
238 # # also ignore additional inputs in the python initialisation
239 # # We will pin to version < 2.1.0 instead of allowing 2.2.0+ as it requires the code below:
240 # model_config = ConfigDict(extra='ignore') noqa: E800
241 name: str = '.namespaces'
242 namespaces_filename: Union[str, Path] = 'namespaces.yaml'
243 local_dir: Annotated[Path, Field(validate_default=True)] = None
245 _validator: NamespaceValidator = None
247 @property
248 def validator(self):
249 """Get the namespace validator."""
250 if self._validator is None:
251 self._validator = self._load_namespace_validator()
252 return self._validator
254 def validate_name(self, proposed_name: str):
255 """Validate the proposed name."""
256 return self.validator.validate_name(proposed_name)
258 # Validate default for local_dir
259 @field_validator('local_dir', mode='before')
260 @classmethod
261 def _validate_local_dir(cls, value: Any, info: ValidationInfo):
262 if value is None:
263 value = Path(tempfile.gettempdir())/info.data['name']
264 return value
266 def _download_namespaces(self):
267 # Into a .namespaces "hidden" directory that we check and pull if necessary
268 self.clone()
269 self.checkout(self.default_branch)
271 def _load_namespace_validator(self):
272 if not self.exists_locally:
273 self._download_namespaces()
274 self.pull()
275 with (self.local_dir/self.namespaces_filename).open() as f:
276 namespace_validator = NamespaceValidator(**yaml.load(f))
277 return namespace_validator
279 def create(
280 self,
281 *,
282 namespace_options: NamespaceOptionsType | NamespaceValidator,
283 delimiters: list[str] | None = None,
284 repo_separator: str | None = None):
285 """Create and populate the validator repo."""
286 # Provide either namespace_validator or namespaceOptions
287 kwargs = {}
288 if delimiters:
289 kwargs['delimiters'] = delimiters
290 if repo_separator:
291 kwargs['repo_separator'] = repo_separator
292 if not isinstance(namespace_options, NamespaceValidator):
293 namespace_validator = NamespaceValidator(
294 options=namespace_options,
295 **kwargs
296 )
297 else:
298 # namespace_options is a NamespaceValidator
299 namespace_validator = namespace_options.model_copy(update=kwargs)
300 # Create the repo
301 super().create()
302 with ChDir(self.local_dir):
303 # Write the Config
304 with open(self.namespaces_filename, 'w') as f:
305 f.write(namespace_validator.model_dump_yaml())
306 with open('README.md', 'w') as f:
307 f.write(_NAMESPACE_README)
308 # Commit it
309 self.commit('Initial Namespaces Commit', [self.namespaces_filename, 'README.md'])
310 # Push it
311 self.push()
314class Repo(_Repo):
315 """Repo with namespace validator."""
317 namespace_validation_repo: Optional[NamespaceValidationRepo] = None
318 validation_level: ValidationEnum = ValidationEnum.none
319 name: str
321 @model_validator(mode='after')
322 def _validate_name(self):
323 value = self.name
324 if self.namespace_validation_repo and self.validation_level in [ValidationEnum.strict, ValidationEnum.warn]:
325 namespace_validator = self.namespace_validation_repo.validator
326 result, message = namespace_validator.validate_name(value)
327 if not result:
328 message = (f'{value} {message.format(key="<root>")}')
329 value = namespace_validator.to_repo_name(value)
330 if self.validation_level == ValidationEnum.strict and not result:
331 raise ValueError(message)
332 elif not result:
333 warnings.warn(message, stacklevel=2)
334 self.name = value