Python源码示例:github3.exceptions.NotFoundError()
示例1
def retrieve_pr_template_text(self):
'''Return the content for the PR template file looking in predefined directories.
If no template is found None is returned.
'''
pattern = re.compile(r"(pull_request_template)(\..{1,3})?$")
directories = ['.github', '.', 'docs']
for directory in directories:
try:
for filename, content in self.repository.directory_contents(directory):
if pattern.match(filename):
content.refresh()
return content.decoded.decode('utf-8')
except github3.exceptions.NotFoundError:
pass # directory does not exists
return None
示例2
def retrieve_asset_contents(self, release_tag: str, asset_label: str):
ci.util.not_none(release_tag)
ci.util.not_none(asset_label)
release = self.repository.release_from_tag(release_tag)
for asset in release.assets():
if asset.label == asset_label or asset.name == asset_label:
break
else:
response = requests.Response()
response.status_code = 404
response.json = lambda: {'message':'no asset with label {} found'.format(asset_label)}
raise NotFoundError(resp=response)
buffer = io.BytesIO()
asset.download(buffer)
return buffer.getvalue().decode()
示例3
def revert(self):
# Fetch release
try:
release = self.github_helper.repository.release_from_tag(self.release_version)
except NotFoundError:
release = None
if release:
info(f"Deleting Release {self.release_version}")
if not release.delete():
raise RuntimeError("Release could not be deleted")
try:
tag = self.github_helper.repository.ref(f"tags/{self.release_version}")
except NotFoundError:
# Ref wasn't created
return
if not tag.delete():
raise RuntimeError("Tag could not be deleted")
示例4
def upload_fw(file, version, codename, today, variant):
"""
Upload files to GitHub release
"""
print("uploading: " + file)
codename = codename.split('-')[0]
folder = set_folder(file)
subprocess.call(['rclone', 'copy', file, 'osdn:/storage/groups/x/xi/xiaomifirmwareupdater/'
+ folder + '/' + version + '/' + codename + '/', '-v'])
repository = GIT.repository('XiaomiFirmwareUpdater', f'firmware_xiaomi_{codename}')
tag = f'{variant}-{today}'
try:
release = repository.release_from_tag(tag) # release exist already
except exceptions.NotFoundError:
# create new release
release = repository.create_release(tag, name=tag,
body=
f"Extracted Firmware from MIUI {file.split('_')[4]}",
draft=False, prerelease=False)
try:
asset = release.upload_asset(content_type='application/binary',
name=file, asset=open(file, 'rb'))
print(f'Uploaded {asset.name} Successfully to release {release.name}')
except exceptions.UnprocessableEntity:
print(f'{file} is already uploaded')
示例5
def get_protection(access_token, branch_name, owner, repo_name):
gh = login(token=access_token)
if gh is None:
print(f"Could not login. Have you provided credentials?")
raise exit(1)
try:
repo = gh.repository(owner, repo_name)
except NotFoundError:
print(f"Could not find repo https://github.com/{owner}/{repo_name}")
raise
branch = repo.branch(branch_name)
protection = branch.protection()
return protection
示例6
def _migrate_project(self, project):
print("Migrating project %s/%s" % (self.gh_org, project))
# Create new branch
repo = self.github.repository(self.gh_org, project)
try:
repo.branch(self.gh_source_branch)
except NotFoundError:
print("Source branch non existing. Skipping...")
return
try:
repo.branch(self.gh_target_branch)
except NotFoundError:
pass
else:
print("Branch already exists. Skipping...")
return
root_contents = repo.directory_contents(
'', self.gh_source_branch, return_as=dict,
)
modules = self._get_modules_list(repo, root_contents)
commit = self._create_metafiles(repo, root_contents)
repo.create_ref(
'refs/heads/%s' % self.gh_target_branch, commit.sha,
)
# TODO: GitHub is returning 404
# self._make_default_branch(repo)
milestone = self._create_branch_milestone(repo)
self._create_migration_issue(repo, sorted(modules), milestone)
示例7
def _migrate_project(self, project):
print("Migrating project %s/%s" % (self.gh_org, project))
# Create new branch
repo = self.github.repository(self.gh_org, project)
try:
source_branch = repo.branch(self.gh_source_branch)
except NotFoundError:
print("Source branch non existing. Skipping...")
return
try:
repo.branch(self.gh_target_branch)
except NotFoundError:
pass
else:
print("Branch already exists. Skipping...")
return
repo.create_ref(
'refs/heads/%s' % self.gh_target_branch,
source_branch.commit.sha)
root_contents = repo.directory_contents(
'', self.gh_target_branch, return_as=dict,
)
modules = self._mark_modules_uninstallable(repo, root_contents)
if self.gh_target_branch == '10.0':
self._rename_manifests(repo, root_contents)
self._delete_unported_dir(repo, root_contents)
# TODO: Is this really needed?
# self._delete_setup_dirs(repo, root_contents, modules)
self._update_metafiles(repo, root_contents)
# TODO: GitHub is returning 404
# self._make_default_branch(repo)
milestone = self._create_branch_milestone(repo)
self._create_migration_issue(repo, sorted(modules), milestone)
示例8
def directory_contents(self, path, **kw):
try:
return self._contents[path]
except KeyError:
raise NotFoundError(
DummyResponse(f"Accessed unexpected directory: {path}", 404)
)
示例9
def latest_release(self):
for release in self._releases:
if release.tag_name.startswith("release/"):
return release
raise NotFoundError(DummyResponse("", 404))
示例10
def release_from_tag(self, tag_name):
for release in self._releases:
if release.tag_name == tag_name:
return release
raise NotFoundError(DummyResponse("", 404))
示例11
def get_latest_tag(self, beta=False):
""" Query Github Releases to find the latest production or beta tag """
repo = self._get_repo()
if not beta:
try:
release = repo.latest_release()
except github3.exceptions.NotFoundError:
raise GithubException(f"No release found for repo {self.repo_url}")
prefix = self.project__git__prefix_release
if not release.tag_name.startswith(prefix):
return self._get_latest_tag_for_prefix(repo, prefix)
return release.tag_name
else:
return self._get_latest_tag_for_prefix(repo, self.project__git__prefix_beta)
示例12
def get_ref_for_dependency(self, repo, dependency, include_beta=None):
release = None
if "ref" in dependency:
ref = dependency["ref"]
else:
if "tag" in dependency:
try:
# Find the github release corresponding to this tag.
release = repo.release_from_tag(dependency["tag"])
except NotFoundError:
raise DependencyResolutionError(
f"No release found for tag {dependency['tag']}"
)
else:
release = find_latest_release(repo, include_beta)
if release:
ref = repo.tag(
repo.ref("tags/" + release.tag_name).object.sha
).object.sha
else:
self.logger.info(
f"No release found; using the latest commit from the {repo.default_branch} branch."
)
ref = repo.branch(repo.default_branch).commit.sha
return (release, ref)
示例13
def _create_repository(self, owner: str, name: str):
try:
repository = self.github.repository(
owner=owner,
repository=name
)
return repository
except NotFoundError as nfe:
raise RuntimeError(
'failed to retrieve repository {o}/{r}'.format(
o=owner,
r=name,
),
nfe
)
示例14
def create_or_update_file(
self,
file_path: str,
file_contents: str,
commit_message: str,
branch: str=None,
) -> str:
if branch is None:
branch = self.default_branch
try:
contents = self.retrieve_file_contents(file_path=file_path, branch=branch)
except NotFoundError:
contents = None # file did not yet exist
if contents:
decoded_contents = contents.decoded.decode('utf-8')
if decoded_contents == file_contents:
# Nothing to do
return ci.util.info(
'Repository file contents are identical to passed file contents.'
)
else:
response = contents.update(
message=commit_message,
content=file_contents.encode('utf-8'),
branch=branch,
)
else:
response = self.repository.create_file(
path=file_path,
message=commit_message,
content=file_contents.encode('utf-8'),
branch=branch,
)
return response['commit'].sha
示例15
def tag_exists(
self,
tag_name: str,
):
ci.util.not_empty(tag_name)
try:
self.repository.ref('tags/' + tag_name)
return True
except NotFoundError:
return False
示例16
def _retrieve_existing_org_hook_or_none(
self,
organization: Organization,
hook_name: str,
webhook_url: str
):
'''
@raises github3.exceptions.NotFoundError in case of missing privileges to enumerate webhooks
'''
def _webhook_set_by_us(org_hook: OrganizationHook, webhook_url: str):
org_hook_url = urlparse(org_hook.config['url'])
org_hook_query_key = parse_qs(org_hook_url.query).get(
DEFAULT_ORG_HOOK_QUERY_KEY
)
if not org_hook_query_key:
return False
webhook_url = urlparse(webhook_url)
if org_hook_url.path != webhook_url.path:
return False
return True
hooks = filter(lambda org_hook: org_hook.name == hook_name, organization.hooks())
hooks = filter(lambda org_hook: _webhook_set_by_us(org_hook, webhook_url), hooks)
hooks = list(hooks)
if len(hooks) == 1:
return hooks[0]
elif len(hooks) == 0:
return None
raise RuntimeError('found two similar webhooks - what to do now?')
示例17
def validate(self):
version.parse_to_semver(self.release_version)
existing_dir(self.repo_dir)
# check whether a release with the given version exists
try:
self.github_helper.repository.release_from_tag(self.release_version)
except NotFoundError:
raise RuntimeError(f'No release with tag {self.release_version} found')
示例18
def _branch_cfg_or_none(
self,
repository,
):
try:
branch_cfg = repository.file_contents(
path='branch.cfg',
ref='refs/meta/ci',
).decoded.decode('utf-8')
info(f'Linting branch cfg for {repository}')
lint_yaml(branch_cfg)
except NotFoundError:
return None # no branch cfg present
return BranchCfg(raw_dict=load_yaml(branch_cfg))
示例19
def starred(username, token, sort, repository, message):
"""GitHub starred
creating your own Awesome List used GitHub stars!
example:
starred --username maguowei --sort > README.md
"""
if repository:
if not token:
click.secho('Error: create repository need set --token', fg='red')
return
file = BytesIO()
sys.stdout = file
else:
file = None
gh = GitHub(token=token)
stars = gh.starred_by(username)
click.echo(desc)
repo_dict = {}
for s in stars:
language = s.language or 'Others'
description = html_escape(s.description).replace('\n', '') if s.description else ''
if language not in repo_dict:
repo_dict[language] = []
repo_dict[language].append([s.name, s.html_url, description.strip()])
if sort:
repo_dict = OrderedDict(sorted(repo_dict.items(), key=lambda l: l[0]))
for language in repo_dict.keys():
data = u' - [{}](#{})'.format(language, '-'.join(language.lower().split()))
click.echo(data)
click.echo('')
for language in repo_dict:
click.echo('## {} \n'.format(language.replace('#', '# #')))
for repo in repo_dict[language]:
data = u'- [{}]({}) - {}'.format(*repo)
click.echo(data)
click.echo('')
click.echo(license_.format(username=username))
if file:
try:
rep = gh.repository(username, repository)
readme = rep.readme()
readme.update(message, file.getvalue())
except NotFoundError:
rep = gh.create_repository(repository, 'A curated list of my GitHub stars!')
rep.create_file('README.md', 'starred initial commit', file.getvalue())
click.launch(rep.html_url)
示例20
def create_or_update_org_hook(
self,
organization_name: str,
webhook_url: str,
hook_name: str=DEFAULT_ORG_HOOK_NAME,
events: list=DEFAULT_ORG_HOOK_EVENTS,
content_type: str=DEFAULT_ORG_HOOK_CONTENT_TYPE,
active: bool=True,
skip_ssl_validation=False
):
organization = self.github.organization(
username=organization_name,
)
if not organization:
raise RuntimeError(
f'failed to access "{organization_name}". Verify credentials and organization name'
)
try:
hook = self._retrieve_existing_org_hook_or_none(
organization=organization,
hook_name=hook_name,
webhook_url=webhook_url
)
except NotFoundError:
raise RuntimeError(
f'failed to retrieve webhooks for "{organization_name}". Verify credentials'
)
hook_kwargs = {}
if not hook:
create_or_update = organization.create_hook
hook_kwargs['name'] = hook_name
else:
create_or_update = hook.edit
config = {
'url': webhook_url,
'content_type': content_type,
'insecure_ssl': '1' if skip_ssl_validation else '0'
}
result = create_or_update(
config=config,
events=events,
active=active,
**hook_kwargs
)
if not result:
raise RuntimeError(
'failed to update or create org webhook for {o}'.format(
o=organization,
)
)