Coverage for .nox/test-3-12/lib/python3.12/site-packages/nskit/vcs/repo.py: 96%
180 statements
« prev ^ index » next coverage.py v7.3.3, created at 2023-12-19 17:42 +0000
« prev ^ index » next coverage.py v7.3.3, created at 2023-12-19 17:42 +0000
1"""Repo management class."""
2from __future__ import annotations
4from pathlib import Path
5import shutil
6import subprocess # nosec B404
7import sys
8import tempfile
9from typing import Any, Optional, TYPE_CHECKING, Union
10import warnings
12if sys.version_info.major <= 3 and sys.version_info.minor <= 8:
13 from typing_extensions import Annotated
14else:
15 from typing import Annotated
17import git
18from pydantic import Field, field_validator, model_validator, ValidationInfo
20from nskit._logging import logger_factory
21from nskit.common.configuration import BaseConfiguration
22from nskit.common.contextmanagers import ChDir
23from nskit.common.io import yaml
24from nskit.vcs.installer import InstallersEnum
25from nskit.vcs.namespace_validator import (
26 NamespaceOptionsType,
27 NamespaceValidator,
28 ValidationEnum,
29)
30from nskit.vcs.providers import RepoClient
32if TYPE_CHECKING:
33 from nskit.vcs.codebase import Codebase
36logger = logger_factory.get_logger(__name__)
39DEFAULT_REMOTE = 'origin'
40# We should set the default dir in some form of env var/other logic in the cli
42_NAMESPACE_README = """
43# {name}
45Contains the approved namespaces as a `YAML` file - `namspaces.yaml`
47This is structured as a YAML object, with the options as a list of dictionary lists, with the approved roots at L0, approved L1s etc.
49e.g.
50```
51options:
52 - x:
53 - y:
54 - a
55 - b
56 - z:
57 - w
58 - q
59delimiters:
60 - -
61 - .
62 - ,
63repo_separator: -
64```
66Means we can have repos structured as:
67```
68- x-y-a-<*>
69- x-y-b-<*>
70- x-z-w-<*>
71- x-q-<*>
72```
73but no others.
75The separator is set by the repo_separator parameter, in this example "-".
76""" # noqa: E800
79class _Repo(BaseConfiguration):
81 name: str
82 local_dir: Path = Field(default_factory=Path.cwd)
83 default_branch: str = 'main'
84 provider_client: Annotated[RepoClient, Field(validate_default=True)] = None
86 __git_repo = None
87 _git_repo_cls = git.Repo
89 @field_validator('provider_client', mode='before')
90 @classmethod
91 def _validate_client(cls, value):
92 if value is None:
93 from nskit.vcs.settings import CodebaseSettings
94 value = CodebaseSettings().provider_settings.repo_client
95 return value
97 @property
98 def url(self):
99 """Get the remote url."""
100 return self.provider_client.get_remote_url(self.name)
102 def create(self):
103 """Create the repo."""
104 if not self.exists:
105 self.provider_client.create(self.name)
106 self.clone()
107 if not self.exists_locally:
108 self.local_dir.mkdir(exist_ok=True, parents=True)
109 with ChDir(self.local_dir):
110 self._git_repo_cls.init()
112 def delete(self, remote=True):
113 """Delete the repo.
115 remote=True (default), deletes the repo on the remote (no confirmation).
116 """
117 if remote and self.exists:
118 self.provider_client.delete(self.name)
119 if self.exists_locally:
120 errors = []
122 def on_exc(function, path, exc_info): # noqa: U100
123 """Log errors when deleting."""
124 errors.append((path, exc_info))
125 if sys.version_info.major <= 3 and sys.version_info.minor < 12:
126 shutil.rmtree(self.local_dir, onerror=on_exc)
127 else:
128 shutil.rmtree(self.local_dir, onexc=on_exc)
129 if errors:
130 error_info = "\n".join([str(u) for u in errors])
131 warnings.warn(f'Unable to delete some paths due to errors:\n{error_info}', stacklevel=2)
133 def clone(self):
134 """Clone the repo."""
135 if not self.exists_locally:
136 if not self.local_dir.parent.exists():
137 self.local_dir.parent.mkdir(exist_ok=True, parents=True)
138 if self.url:
139 self._git_repo_cls.clone_from(url=self.url, to_path=str(self.local_dir))
141 def pull(self, remote=DEFAULT_REMOTE):
142 """Pull the repo from the remote (defaults to origin)."""
143 # This will use python git library
144 if self._git_repo.remotes:
145 getattr(self._git_repo.remotes, remote).pull()
146 else:
147 warnings.warn('No Remotes found', stacklevel=2)
149 def commit(self, message: str, paths: list[str | Path] | str | Path = '*', hooks=True):
150 """Commit the paths with given message.
152 hooks=False disables running pre-commit hooks.
153 """
154 # Because GitPython add cannot be set to use the .gitignore, we use subprocess here
155 if not isinstance(paths, (list, tuple)):
156 paths = [paths,]
157 with ChDir(self._git_repo.working_tree_dir):
158 subprocess.check_call(['git', 'add']+paths) # nosec B603, B607
159 if hooks:
160 hook_args = []
161 else:
162 hook_args = ['--no-verify']
163 subprocess.check_call(['git', 'commit'] + hook_args + ['-m', message]) # nosec B603, B607
165 def push(self, remote=DEFAULT_REMOTE):
166 """Push the repo to the remote (defaults to origin)."""
167 if self._git_repo.remotes:
168 getattr(self._git_repo.remotes, remote).push()
169 else:
170 warnings.warn('No Remotes found', stacklevel=2)
172 def tag(self, tag, message: str | None = None, force: bool = False):
173 """Tag the repo (with a given name, and optional message."""
174 self._git_repo.create_tag(tag, message=message, force=force)
176 def checkout(self, branch, create=True):
177 """Checkout the branch (create if true)."""
178 self.fetch()
179 try:
180 getattr(self._git_repo.heads, branch).checkout()
181 except AttributeError as e:
182 if create:
183 self._git_repo.create_head(branch).checkout()
184 else:
185 raise e from None
187 def fetch(self, remote=DEFAULT_REMOTE):
188 """Fetch the repo from the remote (defaults to origin)."""
189 getattr(self._git_repo.remotes, remote).fetch()
191 @property
192 def _git_repo(self):
193 if self.__git_repo is None or Path(self.__git_repo.working_tree_dir) != self.local_dir:
194 self.__git_repo = self._git_repo_cls(self.local_dir)
195 return self.__git_repo
197 @property
198 def exists_locally(self):
199 """Check if the repo exists locally (and .git initialised)."""
200 return self.local_dir.exists() and (self.local_dir/'.git').exists()
202 @property
203 def exists(self):
204 """Check if the repo exists on the remote."""
205 return self.provider_client.check_exists(self.name)
207 def install(self, codebase: Codebase | None = None, deps: bool = True):
208 """Install the repo into a codebase.
210 To make it easy to extend to new languages/installation methods, this uses an entrypoint to handle it.
212 The default installer is for python (uses a virtualenv), but can be disabled using NSKIT_PYTHON_INSTALLER_ENABLED=False if you
213 want to provide a custom Python Installer (e.g. using poetry or hatch).
215 Other installers can be added through the nskit.vcs.installers entry_point.
216 """
217 # Loop through available installers and check - if they check True, then install them
218 for installer in InstallersEnum:
219 logger.info(f'Trying {installer.value}, all matching languages will be installed.')
220 if installer.extension:
221 language_installer = installer.extension()
222 if language_installer.check(self.local_dir):
223 logger.info(f'Matched {installer.value}, installing.')
224 language_installer.install(path=self.local_dir, codebase=codebase, deps=deps)
227class NamespaceValidationRepo(_Repo):
228 """Repo for managing namespace validation."""
229 name: str = '.namespaces'
230 namespaces_filename: Union[str, Path] = 'namespaces.yaml'
231 local_dir: Annotated[Path, Field(validate_default=True)] = None
233 _validator: NamespaceValidator = None
235 @property
236 def validator(self):
237 """Get the namespace validator."""
238 if self._validator is None:
239 self._validator = self._load_namespace_validator()
240 return self._validator
242 def validate_name(self, proposed_name: str):
243 """Validate the proposed name."""
244 return self.validator.validate_name(proposed_name)
246 # Validate default for local_dir
247 @field_validator('local_dir', mode='before')
248 @classmethod
249 def _validate_local_dir(cls, value: Any, info: ValidationInfo):
250 if value is None:
251 value = Path(tempfile.tempdir)/info.data['name']
252 return value
254 def _download_namespaces(self):
255 # Into a .namespaces "hidden" directory that we check and pull if necessary
256 self.clone()
257 self.checkout(self.default_branch)
259 def _load_namespace_validator(self):
260 if not self.exists_locally:
261 self._download_namespaces()
262 self.pull()
263 with (self.local_dir/self.namespaces_filename).open() as f:
264 namespace_validator = NamespaceValidator(**yaml.load(f))
265 return namespace_validator
267 def create(
268 self,
269 *,
270 namespace_options: NamespaceOptionsType | NamespaceValidator,
271 delimiters: list[str] | None = None,
272 repo_separator: str | None = None):
273 """Create and populate the validator repo."""
274 # Provide either namespace_validator or namespaceOptions
275 kwargs = {}
276 if delimiters:
277 kwargs['delimiters'] = delimiters
278 if repo_separator:
279 kwargs['repo_separator'] = repo_separator
280 if not isinstance(namespace_options, NamespaceValidator):
281 namespace_validator = NamespaceValidator(
282 options=namespace_options,
283 **kwargs
284 )
285 else:
286 # namespace_options is a NamespaceValidator
287 namespace_validator = namespace_options.model_copy(update=kwargs)
288 # Create the repo
289 super().create()
290 with ChDir(self.local_dir):
291 # Write the Config
292 with open(self.namespaces_filename, 'w') as f:
293 f.write(namespace_validator.model_dump_yaml())
294 with open('README.md', 'w') as f:
295 f.write(_NAMESPACE_README)
296 # Commit it
297 self.commit('Initial Namespaces Commit', [self.namespaces_filename, 'README.md'])
298 # Push it
299 self.push()
302class Repo(_Repo):
303 """Repo with namespace validator."""
305 namespace_validation_repo: Optional[NamespaceValidationRepo] = None
306 validation_level: ValidationEnum = ValidationEnum.none
307 name: str
309 @model_validator(mode='after')
310 def _validate_name(self):
311 value = self.name
312 if self.namespace_validation_repo and self.validation_level in [ValidationEnum.strict, ValidationEnum.warn]:
313 namespace_validator = self.namespace_validation_repo.validator
314 result, message = namespace_validator.validate_name(value)
315 if not result:
316 message = (f'{value} {message.format(key="<root>")}')
317 value = namespace_validator.to_repo_name(value)
318 if self.validation_level == ValidationEnum.strict and not result:
319 raise ValueError(message)
320 elif not result:
321 warnings.warn(message, stacklevel=2)
322 self.name = value