Coverage for .nox/test-3-9/lib/python3.9/site-packages/nskit/vcs/repo.py: 95%

183 statements  

« prev     ^ index     » next       coverage.py v7.4.2, created at 2024-02-25 17:38 +0000

1"""Repo management class.""" 

2from __future__ import annotations 

3 

4from pathlib import Path 

5import shutil 

6import subprocess # nosec B404 

7import sys 

8import tempfile 

9from typing import Any, Optional, TYPE_CHECKING, Union 

10import warnings 

11 

12if sys.version_info.major <= 3 and sys.version_info.minor <= 8: 

13 from typing_extensions import Annotated 

14else: 

15 from typing import Annotated 

16 

17import git 

18from pydantic import Field, field_validator, model_validator, ValidationInfo 

19 

20from nskit._logging import logger_factory 

21from nskit.common.configuration import BaseConfiguration 

22from nskit.common.contextmanagers import ChDir 

23from nskit.common.io import yaml 

24from nskit.vcs.installer import InstallersEnum 

25from nskit.vcs.namespace_validator import ( 

26 NamespaceOptionsType, 

27 NamespaceValidator, 

28 ValidationEnum, 

29) 

30from nskit.vcs.providers import RepoClient 

31 

32if TYPE_CHECKING: 

33 from nskit.vcs.codebase import Codebase 

34 

35 

36logger = logger_factory.get_logger(__name__) 

37 

38 

39DEFAULT_REMOTE = 'origin' 

40# We should set the default dir in some form of env var/other logic in the cli 

41 

42_NAMESPACE_README = """ 

43# {name} 

44 

45Contains the approved namespaces as a `YAML` file - `namspaces.yaml` 

46 

47This is structured as a YAML object, with the options as a list of dictionary lists, with the approved roots at L0, approved L1s etc. 

48 

49e.g. 

50``` 

51options: 

52 - x: 

53 - y: 

54 - a 

55 - b 

56 - z: 

57 - w 

58 - q 

59delimiters: 

60 - - 

61 - . 

62 - , 

63repo_separator: - 

64``` 

65 

66Means we can have repos structured as: 

67``` 

68- x-y-a-<*> 

69- x-y-b-<*> 

70- x-z-w-<*> 

71- x-q-<*> 

72``` 

73but no others. 

74 

75The separator is set by the repo_separator parameter, in this example "-". 

76""" # noqa: E800 

77 

78 

79class _Repo(BaseConfiguration): 

80 

81 name: str 

82 local_dir: Path = Field(default_factory=Path.cwd) 

83 default_branch: str = 'main' 

84 provider_client: Annotated[RepoClient, Field(validate_default=True)] = None 

85 

86 __git_repo = None 

87 _git_repo_cls = git.Repo 

88 

89 @field_validator('provider_client', mode='before') 

90 @classmethod 

91 def _validate_client(cls, value): 

92 if value is None: 

93 from nskit.vcs.settings import CodebaseSettings 

94 value = CodebaseSettings().provider_settings.repo_client 

95 return value 

96 

97 @property 

98 def url(self): 

99 """Get the remote url.""" 

100 return self.provider_client.get_remote_url(self.name) 

101 

102 @property 

103 def clone_url(self): 

104 """Get the remote url.""" 

105 return self.provider_client.get_clone_url(self.name) 

106 

107 def create(self): 

108 """Create the repo.""" 

109 if not self.exists: 

110 self.provider_client.create(self.name) 

111 self.clone() 

112 if not self.exists_locally: 

113 self.local_dir.mkdir(exist_ok=True, parents=True) 

114 with ChDir(self.local_dir): 

115 self._git_repo_cls.init() 

116 

117 def delete(self, remote=True): 

118 """Delete the repo. 

119 

120 remote=True (default), deletes the repo on the remote (no confirmation). 

121 """ 

122 if remote and self.exists: 

123 self.provider_client.delete(self.name) 

124 if self.exists_locally: 

125 errors = [] 

126 

127 def on_exc(function, path, exc_info): # noqa: U100 

128 """Log errors when deleting.""" 

129 errors.append((path, exc_info)) 

130 if sys.version_info.major <= 3 and sys.version_info.minor < 12: 

131 shutil.rmtree(self.local_dir, onerror=on_exc) 

132 else: 

133 shutil.rmtree(self.local_dir, onexc=on_exc) 

134 if errors: 

135 error_info = "\n".join([str(u) for u in errors]) 

136 warnings.warn(f'Unable to delete some paths due to errors:\n{error_info}', stacklevel=2) 

137 

138 def clone(self): 

139 """Clone the repo.""" 

140 if not self.exists_locally: 

141 if not self.local_dir.parent.exists(): 

142 self.local_dir.parent.mkdir(exist_ok=True, parents=True) 

143 if self.clone_url: 

144 self._git_repo_cls.clone_from(url=self.clone_url, to_path=str(self.local_dir)) 

145 

146 def pull(self, remote=DEFAULT_REMOTE): 

147 """Pull the repo from the remote (defaults to origin).""" 

148 # This will use python git library 

149 if self._git_repo.remotes: 

150 getattr(self._git_repo.remotes, remote).pull() 

151 else: 

152 warnings.warn('No Remotes found', stacklevel=2) 

153 

154 def commit(self, message: str, paths: list[str | Path] | str | Path = '*', hooks=True): 

155 """Commit the paths with given message. 

156 

157 hooks=False disables running pre-commit hooks. 

158 """ 

159 # Because GitPython add cannot be set to use the .gitignore, we use subprocess here 

160 if not isinstance(paths, (list, tuple)): 

161 paths = [paths,] 

162 with ChDir(self._git_repo.working_tree_dir): 

163 subprocess.check_call(['git', 'add']+paths) # nosec B603, B607 

164 if hooks: 

165 hook_args = [] 

166 else: 

167 hook_args = ['--no-verify'] 

168 subprocess.check_call(['git', 'commit'] + hook_args + ['-m', message]) # nosec B603, B607 

169 

170 def push(self, remote=DEFAULT_REMOTE): 

171 """Push the repo to the remote (defaults to origin).""" 

172 if self._git_repo.remotes: 

173 getattr(self._git_repo.remotes, remote).push() 

174 else: 

175 warnings.warn('No Remotes found', stacklevel=2) 

176 

177 def tag(self, tag, message: str | None = None, force: bool = False): 

178 """Tag the repo (with a given name, and optional message.""" 

179 self._git_repo.create_tag(tag, message=message, force=force) 

180 

181 def checkout(self, branch, create=True): 

182 """Checkout the branch (create if true).""" 

183 self.fetch() 

184 try: 

185 getattr(self._git_repo.heads, branch).checkout() 

186 except AttributeError as e: 

187 if create: 

188 self._git_repo.create_head(branch).checkout() 

189 else: 

190 raise e from None 

191 

192 def fetch(self, remote=DEFAULT_REMOTE): 

193 """Fetch the repo from the remote (defaults to origin).""" 

194 getattr(self._git_repo.remotes, remote).fetch() 

195 

196 @property 

197 def _git_repo(self): 

198 if self.__git_repo is None or Path(self.__git_repo.working_tree_dir) != self.local_dir: 

199 self.__git_repo = self._git_repo_cls(self.local_dir) 

200 return self.__git_repo 

201 

202 @property 

203 def exists_locally(self): 

204 """Check if the repo exists locally (and .git initialised).""" 

205 return self.local_dir.exists() and (self.local_dir/'.git').exists() 

206 

207 @property 

208 def exists(self): 

209 """Check if the repo exists on the remote.""" 

210 return self.provider_client.check_exists(self.name) 

211 

212 def install(self, codebase: Codebase | None = None, deps: bool = True): 

213 """Install the repo into a codebase. 

214 

215 To make it easy to extend to new languages/installation methods, this uses an entrypoint to handle it. 

216 

217 The default installer is for python (uses a virtualenv), but can be disabled using NSKIT_PYTHON_INSTALLER_ENABLED=False if you 

218 want to provide a custom Python Installer (e.g. using poetry or hatch). 

219 

220 Other installers can be added through the nskit.vcs.installers entry_point. 

221 """ 

222 # Loop through available installers and check - if they check True, then install them 

223 for installer in InstallersEnum: 

224 logger.info(f'Trying {installer.value}, all matching languages will be installed.') 

225 if installer.extension: 

226 language_installer = installer.extension() 

227 if language_installer.check(self.local_dir): 

228 logger.info(f'Matched {installer.value}, installing.') 

229 language_installer.install(path=self.local_dir, codebase=codebase, deps=deps) 

230 

231 

232class NamespaceValidationRepo(_Repo): 

233 """Repo for managing namespace validation.""" 

234 # # This is not ideal behaviour, but due to the issue highlighted in 

235 # # https://github.com/pydantic/pydantic-settings/issues/245 and the 

236 # # non-semver compliant versioning in pydantic-settings, we need to add this behaviour 

237 # # this now changes the API behaviour for these objects as they will 

238 # # also ignore additional inputs in the python initialisation 

239 # # We will pin to version < 2.1.0 instead of allowing 2.2.0+ as it requires the code below: 

240 # model_config = ConfigDict(extra='ignore') noqa: E800 

241 name: str = '.namespaces' 

242 namespaces_filename: Union[str, Path] = 'namespaces.yaml' 

243 local_dir: Annotated[Path, Field(validate_default=True)] = None 

244 

245 _validator: NamespaceValidator = None 

246 

247 @property 

248 def validator(self): 

249 """Get the namespace validator.""" 

250 if self._validator is None: 

251 self._validator = self._load_namespace_validator() 

252 return self._validator 

253 

254 def validate_name(self, proposed_name: str): 

255 """Validate the proposed name.""" 

256 return self.validator.validate_name(proposed_name) 

257 

258 # Validate default for local_dir 

259 @field_validator('local_dir', mode='before') 

260 @classmethod 

261 def _validate_local_dir(cls, value: Any, info: ValidationInfo): 

262 if value is None: 

263 value = Path(tempfile.gettempdir())/info.data['name'] 

264 return value 

265 

266 def _download_namespaces(self): 

267 # Into a .namespaces "hidden" directory that we check and pull if necessary 

268 self.clone() 

269 self.checkout(self.default_branch) 

270 

271 def _load_namespace_validator(self): 

272 if not self.exists_locally: 

273 self._download_namespaces() 

274 self.pull() 

275 with (self.local_dir/self.namespaces_filename).open() as f: 

276 namespace_validator = NamespaceValidator(**yaml.load(f)) 

277 return namespace_validator 

278 

279 def create( 

280 self, 

281 *, 

282 namespace_options: NamespaceOptionsType | NamespaceValidator, 

283 delimiters: list[str] | None = None, 

284 repo_separator: str | None = None): 

285 """Create and populate the validator repo.""" 

286 # Provide either namespace_validator or namespaceOptions 

287 kwargs = {} 

288 if delimiters: 

289 kwargs['delimiters'] = delimiters 

290 if repo_separator: 

291 kwargs['repo_separator'] = repo_separator 

292 if not isinstance(namespace_options, NamespaceValidator): 

293 namespace_validator = NamespaceValidator( 

294 options=namespace_options, 

295 **kwargs 

296 ) 

297 else: 

298 # namespace_options is a NamespaceValidator 

299 namespace_validator = namespace_options.model_copy(update=kwargs) 

300 # Create the repo 

301 super().create() 

302 with ChDir(self.local_dir): 

303 # Write the Config 

304 with open(self.namespaces_filename, 'w') as f: 

305 f.write(namespace_validator.model_dump_yaml()) 

306 with open('README.md', 'w') as f: 

307 f.write(_NAMESPACE_README) 

308 # Commit it 

309 self.commit('Initial Namespaces Commit', [self.namespaces_filename, 'README.md']) 

310 # Push it 

311 self.push() 

312 

313 

314class Repo(_Repo): 

315 """Repo with namespace validator.""" 

316 

317 namespace_validation_repo: Optional[NamespaceValidationRepo] = None 

318 validation_level: ValidationEnum = ValidationEnum.none 

319 name: str 

320 

321 @model_validator(mode='after') 

322 def _validate_name(self): 

323 value = self.name 

324 if self.namespace_validation_repo and self.validation_level in [ValidationEnum.strict, ValidationEnum.warn]: 

325 namespace_validator = self.namespace_validation_repo.validator 

326 result, message = namespace_validator.validate_name(value) 

327 if not result: 

328 message = (f'{value} {message.format(key="<root>")}') 

329 value = namespace_validator.to_repo_name(value) 

330 if self.validation_level == ValidationEnum.strict and not result: 

331 raise ValueError(message) 

332 elif not result: 

333 warnings.warn(message, stacklevel=2) 

334 self.name = value