pvv-nixos-config/hosts/bekkalokk/services/gitea/web-secret-provider/gitea-web-secret-provider.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

108 lines
4.1 KiB
Python
Raw Normal View History

#!/usr/bin/env nix-shell
#!nix-shell -i python3 -p "python3.withPackages(ps: with ps; [ requests ])" openssh
import argparse
import hashlib
import os
import requests
import subprocess
from pathlib import Path
def parse_args():
parser = argparse.ArgumentParser(description="Generate SSH keys for Gitea repositories and add them as secrets")
parser.add_argument("--org", required=True, help="The organization to generate keys for")
parser.add_argument("--token-path", metavar='PATH', required=True, help="Path to a file containing the Gitea API token")
parser.add_argument("--api-url", metavar='URL', help="The URL of the Gitea API", default="https://git.pvv.ntnu.no/api/v1")
parser.add_argument("--key-dir", metavar='PATH', help="The directory to store the generated keys in", default="/run/gitea-web-secret-provider")
parser.add_argument("--authorized-keys-path", metavar='PATH', help="The path to the resulting authorized_keys file", default="/etc/ssh/authorized_keys.d/gitea-web-secret-provider")
parser.add_argument("--rrsync-path", metavar='PATH', help="The path to the rrsync binary", default="/run/current-system/sw/bin/rrsync")
parser.add_argument("--web-dir", metavar='PATH', help="The directory to sync the repositories to", default="/var/www")
parser.add_argument("--force", action="store_true", help="Overwrite existing keys")
return parser.parse_args()
def add_secret(args, token, repo, name, secret):
result = requests.put(
f"{args.api_url}/repos/{args.org}/{repo}/actions/secrets/{name}",
json = { 'data': secret },
headers = { 'Authorization': 'token ' + token },
)
if result.status_code not in (201, 204):
raise Exception(f"Failed to add secret: {result.json()}")
def get_org_repo_list(args, token):
result = requests.get(
f"{args.api_url}/orgs/{args.org}/repos",
headers = { 'Authorization': 'token ' + token },
)
return [repo["name"] for repo in result.json()]
def generate_ssh_key(args, repository: str):
keyname = hashlib.sha256(args.org.encode() + repository.encode()).hexdigest()
if not os.path.exists(os.path.join(args.key_dir, keyname)) or args.force:
subprocess.run(
[
"ssh-keygen",
*("-t", "ed25519"),
*("-b", "4096"),
*("-f", os.path.join(args.key_dir, keyname)),
*("-N", ""),
*("-C", f"{args.org}/{repository}"),
],
check=True,
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
print(f"Generated SSH key for `{args.org}/{repository}`")
with open(os.path.join(args.key_dir, keyname), "r") as f:
private_key = f.read()
with open(os.path.join(args.key_dir, keyname + ".pub"), "r") as f:
public_key = f.read()
return private_key, public_key
def generate_authorized_keys(args, repo_public_keys: list[tuple[str, str]]):
result = ""
for repo, public_key in repo_public_keys:
result += f"""
command="{args.rrsync_path} -wo {args.web_dir}/{args.org}/{repo}",restrict,no-agent-forwarding,no-port-forwarding,no-pty,no-X11-forwarding {public_key}
""".strip() + "\n"
with open(args.authorized_keys_path, "w") as f:
f.write(result)
def main():
args = parse_args()
with open(args.token_path, "r") as f:
token = f.read().strip()
os.makedirs(args.key_dir, 0o700, exist_ok=True)
os.makedirs(Path(args.authorized_keys_path).parent, 0o700, exist_ok=True)
repos = get_org_repo_list(args, token)
print(f'Found {len(repos)} repositories in `{args.org}`')
repo_public_keys = []
for repo in repos:
print(f"Locating key for `{args.org}/{repo}`")
private_key, public_key = generate_ssh_key(args, repo)
add_secret(args, token, repo, "WEB_SYNC_SSH_KEY", private_key)
repo_public_keys.append((repo, public_key))
generate_authorized_keys(args, repo_public_keys)
print(f"Wrote authorized_keys file to `{args.authorized_keys_path}`")
if __name__ == "__main__":
main()