Coverage for .nox/test-3-12/lib/python3.12/site-packages/nskit/vcs/repo.py: 96%

180 statements  

« prev     ^ index     » next       coverage.py v7.3.3, created at 2023-12-19 17:42 +0000

1"""Repo management class.""" 

2from __future__ import annotations 

3 

4from pathlib import Path 

5import shutil 

6import subprocess # nosec B404 

7import sys 

8import tempfile 

9from typing import Any, Optional, TYPE_CHECKING, Union 

10import warnings 

11 

12if sys.version_info.major <= 3 and sys.version_info.minor <= 8: 

13 from typing_extensions import Annotated 

14else: 

15 from typing import Annotated 

16 

17import git 

18from pydantic import Field, field_validator, model_validator, ValidationInfo 

19 

20from nskit._logging import logger_factory 

21from nskit.common.configuration import BaseConfiguration 

22from nskit.common.contextmanagers import ChDir 

23from nskit.common.io import yaml 

24from nskit.vcs.installer import InstallersEnum 

25from nskit.vcs.namespace_validator import ( 

26 NamespaceOptionsType, 

27 NamespaceValidator, 

28 ValidationEnum, 

29) 

30from nskit.vcs.providers import RepoClient 

31 

32if TYPE_CHECKING: 

33 from nskit.vcs.codebase import Codebase 

34 

35 

36logger = logger_factory.get_logger(__name__) 

37 

38 

39DEFAULT_REMOTE = 'origin' 

40# We should set the default dir in some form of env var/other logic in the cli 

41 

42_NAMESPACE_README = """ 

43# {name} 

44 

45Contains the approved namespaces as a `YAML` file - `namspaces.yaml` 

46 

47This is structured as a YAML object, with the options as a list of dictionary lists, with the approved roots at L0, approved L1s etc. 

48 

49e.g. 

50``` 

51options: 

52 - x: 

53 - y: 

54 - a 

55 - b 

56 - z: 

57 - w 

58 - q 

59delimiters: 

60 - - 

61 - . 

62 - , 

63repo_separator: - 

64``` 

65 

66Means we can have repos structured as: 

67``` 

68- x-y-a-<*> 

69- x-y-b-<*> 

70- x-z-w-<*> 

71- x-q-<*> 

72``` 

73but no others. 

74 

75The separator is set by the repo_separator parameter, in this example "-". 

76""" # noqa: E800 

77 

78 

79class _Repo(BaseConfiguration): 

80 

81 name: str 

82 local_dir: Path = Field(default_factory=Path.cwd) 

83 default_branch: str = 'main' 

84 provider_client: Annotated[RepoClient, Field(validate_default=True)] = None 

85 

86 __git_repo = None 

87 _git_repo_cls = git.Repo 

88 

89 @field_validator('provider_client', mode='before') 

90 @classmethod 

91 def _validate_client(cls, value): 

92 if value is None: 

93 from nskit.vcs.settings import CodebaseSettings 

94 value = CodebaseSettings().provider_settings.repo_client 

95 return value 

96 

97 @property 

98 def url(self): 

99 """Get the remote url.""" 

100 return self.provider_client.get_remote_url(self.name) 

101 

102 def create(self): 

103 """Create the repo.""" 

104 if not self.exists: 

105 self.provider_client.create(self.name) 

106 self.clone() 

107 if not self.exists_locally: 

108 self.local_dir.mkdir(exist_ok=True, parents=True) 

109 with ChDir(self.local_dir): 

110 self._git_repo_cls.init() 

111 

112 def delete(self, remote=True): 

113 """Delete the repo. 

114 

115 remote=True (default), deletes the repo on the remote (no confirmation). 

116 """ 

117 if remote and self.exists: 

118 self.provider_client.delete(self.name) 

119 if self.exists_locally: 

120 errors = [] 

121 

122 def on_exc(function, path, exc_info): # noqa: U100 

123 """Log errors when deleting.""" 

124 errors.append((path, exc_info)) 

125 if sys.version_info.major <= 3 and sys.version_info.minor < 12: 

126 shutil.rmtree(self.local_dir, onerror=on_exc) 

127 else: 

128 shutil.rmtree(self.local_dir, onexc=on_exc) 

129 if errors: 

130 error_info = "\n".join([str(u) for u in errors]) 

131 warnings.warn(f'Unable to delete some paths due to errors:\n{error_info}', stacklevel=2) 

132 

133 def clone(self): 

134 """Clone the repo.""" 

135 if not self.exists_locally: 

136 if not self.local_dir.parent.exists(): 

137 self.local_dir.parent.mkdir(exist_ok=True, parents=True) 

138 if self.url: 

139 self._git_repo_cls.clone_from(url=self.url, to_path=str(self.local_dir)) 

140 

141 def pull(self, remote=DEFAULT_REMOTE): 

142 """Pull the repo from the remote (defaults to origin).""" 

143 # This will use python git library 

144 if self._git_repo.remotes: 

145 getattr(self._git_repo.remotes, remote).pull() 

146 else: 

147 warnings.warn('No Remotes found', stacklevel=2) 

148 

149 def commit(self, message: str, paths: list[str | Path] | str | Path = '*', hooks=True): 

150 """Commit the paths with given message. 

151 

152 hooks=False disables running pre-commit hooks. 

153 """ 

154 # Because GitPython add cannot be set to use the .gitignore, we use subprocess here 

155 if not isinstance(paths, (list, tuple)): 

156 paths = [paths,] 

157 with ChDir(self._git_repo.working_tree_dir): 

158 subprocess.check_call(['git', 'add']+paths) # nosec B603, B607 

159 if hooks: 

160 hook_args = [] 

161 else: 

162 hook_args = ['--no-verify'] 

163 subprocess.check_call(['git', 'commit'] + hook_args + ['-m', message]) # nosec B603, B607 

164 

165 def push(self, remote=DEFAULT_REMOTE): 

166 """Push the repo to the remote (defaults to origin).""" 

167 if self._git_repo.remotes: 

168 getattr(self._git_repo.remotes, remote).push() 

169 else: 

170 warnings.warn('No Remotes found', stacklevel=2) 

171 

172 def tag(self, tag, message: str | None = None, force: bool = False): 

173 """Tag the repo (with a given name, and optional message.""" 

174 self._git_repo.create_tag(tag, message=message, force=force) 

175 

176 def checkout(self, branch, create=True): 

177 """Checkout the branch (create if true).""" 

178 self.fetch() 

179 try: 

180 getattr(self._git_repo.heads, branch).checkout() 

181 except AttributeError as e: 

182 if create: 

183 self._git_repo.create_head(branch).checkout() 

184 else: 

185 raise e from None 

186 

187 def fetch(self, remote=DEFAULT_REMOTE): 

188 """Fetch the repo from the remote (defaults to origin).""" 

189 getattr(self._git_repo.remotes, remote).fetch() 

190 

191 @property 

192 def _git_repo(self): 

193 if self.__git_repo is None or Path(self.__git_repo.working_tree_dir) != self.local_dir: 

194 self.__git_repo = self._git_repo_cls(self.local_dir) 

195 return self.__git_repo 

196 

197 @property 

198 def exists_locally(self): 

199 """Check if the repo exists locally (and .git initialised).""" 

200 return self.local_dir.exists() and (self.local_dir/'.git').exists() 

201 

202 @property 

203 def exists(self): 

204 """Check if the repo exists on the remote.""" 

205 return self.provider_client.check_exists(self.name) 

206 

207 def install(self, codebase: Codebase | None = None, deps: bool = True): 

208 """Install the repo into a codebase. 

209 

210 To make it easy to extend to new languages/installation methods, this uses an entrypoint to handle it. 

211 

212 The default installer is for python (uses a virtualenv), but can be disabled using NSKIT_PYTHON_INSTALLER_ENABLED=False if you 

213 want to provide a custom Python Installer (e.g. using poetry or hatch). 

214 

215 Other installers can be added through the nskit.vcs.installers entry_point. 

216 """ 

217 # Loop through available installers and check - if they check True, then install them 

218 for installer in InstallersEnum: 

219 logger.info(f'Trying {installer.value}, all matching languages will be installed.') 

220 if installer.extension: 

221 language_installer = installer.extension() 

222 if language_installer.check(self.local_dir): 

223 logger.info(f'Matched {installer.value}, installing.') 

224 language_installer.install(path=self.local_dir, codebase=codebase, deps=deps) 

225 

226 

227class NamespaceValidationRepo(_Repo): 

228 """Repo for managing namespace validation.""" 

229 name: str = '.namespaces' 

230 namespaces_filename: Union[str, Path] = 'namespaces.yaml' 

231 local_dir: Annotated[Path, Field(validate_default=True)] = None 

232 

233 _validator: NamespaceValidator = None 

234 

235 @property 

236 def validator(self): 

237 """Get the namespace validator.""" 

238 if self._validator is None: 

239 self._validator = self._load_namespace_validator() 

240 return self._validator 

241 

242 def validate_name(self, proposed_name: str): 

243 """Validate the proposed name.""" 

244 return self.validator.validate_name(proposed_name) 

245 

246 # Validate default for local_dir 

247 @field_validator('local_dir', mode='before') 

248 @classmethod 

249 def _validate_local_dir(cls, value: Any, info: ValidationInfo): 

250 if value is None: 

251 value = Path(tempfile.tempdir)/info.data['name'] 

252 return value 

253 

254 def _download_namespaces(self): 

255 # Into a .namespaces "hidden" directory that we check and pull if necessary 

256 self.clone() 

257 self.checkout(self.default_branch) 

258 

259 def _load_namespace_validator(self): 

260 if not self.exists_locally: 

261 self._download_namespaces() 

262 self.pull() 

263 with (self.local_dir/self.namespaces_filename).open() as f: 

264 namespace_validator = NamespaceValidator(**yaml.load(f)) 

265 return namespace_validator 

266 

267 def create( 

268 self, 

269 *, 

270 namespace_options: NamespaceOptionsType | NamespaceValidator, 

271 delimiters: list[str] | None = None, 

272 repo_separator: str | None = None): 

273 """Create and populate the validator repo.""" 

274 # Provide either namespace_validator or namespaceOptions 

275 kwargs = {} 

276 if delimiters: 

277 kwargs['delimiters'] = delimiters 

278 if repo_separator: 

279 kwargs['repo_separator'] = repo_separator 

280 if not isinstance(namespace_options, NamespaceValidator): 

281 namespace_validator = NamespaceValidator( 

282 options=namespace_options, 

283 **kwargs 

284 ) 

285 else: 

286 # namespace_options is a NamespaceValidator 

287 namespace_validator = namespace_options.model_copy(update=kwargs) 

288 # Create the repo 

289 super().create() 

290 with ChDir(self.local_dir): 

291 # Write the Config 

292 with open(self.namespaces_filename, 'w') as f: 

293 f.write(namespace_validator.model_dump_yaml()) 

294 with open('README.md', 'w') as f: 

295 f.write(_NAMESPACE_README) 

296 # Commit it 

297 self.commit('Initial Namespaces Commit', [self.namespaces_filename, 'README.md']) 

298 # Push it 

299 self.push() 

300 

301 

302class Repo(_Repo): 

303 """Repo with namespace validator.""" 

304 

305 namespace_validation_repo: Optional[NamespaceValidationRepo] = None 

306 validation_level: ValidationEnum = ValidationEnum.none 

307 name: str 

308 

309 @model_validator(mode='after') 

310 def _validate_name(self): 

311 value = self.name 

312 if self.namespace_validation_repo and self.validation_level in [ValidationEnum.strict, ValidationEnum.warn]: 

313 namespace_validator = self.namespace_validation_repo.validator 

314 result, message = namespace_validator.validate_name(value) 

315 if not result: 

316 message = (f'{value} {message.format(key="<root>")}') 

317 value = namespace_validator.to_repo_name(value) 

318 if self.validation_level == ValidationEnum.strict and not result: 

319 raise ValueError(message) 

320 elif not result: 

321 warnings.warn(message, stacklevel=2) 

322 self.name = value