oc-multi_migrate_to_gitea/migrate_to_gitea.py

206 lines
7.6 KiB
Python

import requests
import json
import click
__provider_git = {
'bitbucket' : {
'host' : 'https://api.bitbucket.org',
'page_url' : '%s/2.0/repositories/%s?pagelen=100&fields=next,values.slug,values.links',
'next_page_url' : None,
'repo_name' : 'slug',
'repo_url' : None,},
'github' : {
'host' : 'https://api.github.com',
'page_url' : '%s/orgs/%s/repos?&per_page=100&page=1',
'next_page_url' : '%s/orgs/%s/repos?&per_page=100&page=%s',
'repo_name' : 'name',
'repo_url' : 'clone_url',},
'github_user' : {
'host' : 'https://api.github.com',
'page_url' : '%s/users/%s/repos?&per_page=100&page=1',
'next_page_url' : '%s/users/%s/repos?&per_page=100&page=%s',
'repo_name' : 'name',
'repo_url' : 'clone_url',},
'gitlab' : {
'host' : 'https://gitlab.com/api/v4/groups/',
'page_url' : '%s/%s/projects?per_page=100&page=1',
'next_page_url' : '%s/%s/projects?per_page=100&page=%s',
'repo_name' : 'name',
'repo_url' : 'web_url', },
}
@click.group()
def cli():
pass
def listRepos(provider_git, team_org, verbose = False):
count = 1
host = __provider_git[provider_git]['host']
repo_name = __provider_git[provider_git]['repo_name']
page_url = __provider_git[provider_git]['page_url'] % (host, team_org)
response = requests.get(page_url)
page_json = response.json()
repos = {}
if provider_git == 'bitbucket':
while page_url is not None:
response = requests.get(page_url)
page_json = response.json()
# Parse repositories from the JSON
if "error" not in page_json:
for repo in page_json['values']:
if verbose:
print(repo[repo_name])
repos[repo[repo_name]] = repo['links']['clone'][0]['href']
page_url = page_json.get('next', None)
else:
page_url = None
else:
while len(page_json) >= count:
for repo in page_json:
if isinstance(repo, dict):
if verbose:
print(repo[repo_name])
repo_url = repo[__provider_git[provider_git]['repo_url']]
repos[repo[repo_name]] = repo_url
count += 1
next_page_url = __provider_git[provider_git]['next_page_url'] % (host, team_org, count)
response = requests.get(next_page_url)
page_json = response.json()
return repos
class Gitea():
def listORGS(host, headers = None):
ORGS_URI = "/orgs"
ENDPOINT_ORGS = "%s%s" % (host, ORGS_URI)
try:
r = requests.get(url=url, headers=headers)
if r.status_code != 200:
return r.content
else:
return "Done: %s" % (r.content)
except Not_Connect:
print("Fail")
def listRepo(host, team, headers, verbose = False):
next_page_url = '%s/orgs/%s/repos?page=1&limit=50' % (host, team)
response = requests.get(next_page_url, headers = headers)
page_json = response.json()
count = 1
repos = {}
while len(page_json) >= count:
for repo in page_json:
if isinstance(repo, dict):
if verbose:
print(repo['name'])
repos[repo['name']] = repo['clone_url']
count += 1
next_page_url = '%s/orgs/%s/repos?limit=50&page=%s' % (host, team, count)
response = requests.get(next_page_url, headers = headers)
page_json = response.json()
return repos
def foundBranch(url, branch, headers = None, repo = None):
url_branch = "%s/%s/branches/%s" % (url, repo, branch)
try:
r = requests.get(url=url_branch, headers=headers)
if r.status_code == 200:
return repo
except Not_Connect:
print("Fail")
def createMirror(team, clone_addr, repo_name, uid, private, url, headers = None):
migrate_data = {}
migrate_data["mirror"] = True
migrate_data["UID"] = uid
migrate_data["repo_name"] = repo_name
migrate_data["clone_addr"] = clone_addr
migrate_data["private"] = private
migrate_data["repo_owner"] = team
try:
r = requests.post(url=url, data=json.dumps(migrate_data), headers=headers)
if r.status_code != 200 or r.status_code != 201:
return "Non-OK Response: %s" % (r.content)
else:
return "Done: %s" % (r.status_code)
except Not_Connect:
print("Fail")
@click.command()
@click.option('--provider_git',
type=click.Choice([*__provider_git],
case_sensitive=False),
required=True)
@click.option('--team_org', required=True)
@click.option('--verbose', default=False)
def list_repo(provider_git, team_org, verbose):
listRepos(provider_git, team_org, verbose)
@click.command()
@click.option('--domain', required=True)
@click.option('--team_org', required=True)
@click.option('--access_token', default=None)
@click.option('--verbose', default=False)
def list_repo_token(domain, team_org,
access_token, verbose):
HOST = "https://%s/api/v1" % (domain)
HEADERS = { "accept": "application/json", "content-type": "application/json" }
HEADERS["Authorization"] = "token %s" % (access_token)
Gitea.listRepo(HOST, team_org, HEADERS, verbose)
@click.command()
@click.option('--domain', required=True)
@click.option('--provider_git',
type=click.Choice([*__provider_git],
case_sensitive=False),
required=True)
@click.option('--team_org_source', required=True)
@click.option('--team_org_dest', required=True)
@click.option('--access_token', default=None)
@click.option('--private', default=True)
@click.option('--verbose', default=False)
def mirror_repo(domain, provider_git, team_org_source,
team_org_dest, access_token, private, verbose):
repos = listRepos(provider_git, team_org_source, verbose)
HOST = "https://%s/api/v1" % (domain)
MIGRATE_URI = "/repos/migrate"
ENDPOINT_MIGRATE = "%s%s" % (HOST, MIGRATE_URI)
HEADERS = { "accept": "application/json", "content-type": "application/json" }
HEADERS["Authorization"] = "token %s" % (access_token)
count = 0
for repo in repos:
count += 1
print(Gitea.createMirror(team_org_dest, repos[repo],
repo, 1, private, ENDPOINT_MIGRATE, headers = HEADERS))
@click.command()
@click.option('--domain', required=True)
@click.option('--team_org', required=True)
@click.option('--access_token', default=None)
@click.option('--private', default=True)
@click.option('--branch', required=True)
def get_branch_repo_gitea(domain, team_org, access_token,
private, branch):
HOST = "https://%s/api/v1" % (domain)
BRANCH_URI = "/repos/%s" % (team_org)
ENDPOINT_BRANCH = "%s%s" % (HOST, BRANCH_URI)
HEADERS = { "accept": "application/json", "content-type": "application/json" }
HEADERS["Authorization"] = "token %s" % (access_token)
repos = Gitea.listRepo(HOST, team_org, HEADERS)
for repo in repos:
if Gitea.foundBranch(ENDPOINT_BRANCH, branch, HEADERS, repo):
print(repo)
cli.add_command(list_repo)
cli.add_command(list_repo_token)
cli.add_command(mirror_repo)
cli.add_command(get_branch_repo_gitea)
if __name__ == '__main__':
cli()