#!/usr/bin/env nix-shell #!nix-shell -i python3 -p "python3.withPackages(ps: with ps; [ requests ])" openssh import argparse import hashlib import os import requests import subprocess def parse_args(): parser = argparse.ArgumentParser(description="Generate SSH keys for Gitea repositories and add them as secrets") parser.add_argument("--org", required=True, help="The organization to generate keys for") parser.add_argument("--token-path", metavar='PATH', required=True, help="Path to a file containing the Gitea API token") parser.add_argument("--api-url", metavar='URL', help="The URL of the Gitea API", default="https://git.pvv.ntnu.no/api/v1") parser.add_argument("--key-dir", metavar='PATH', help="The directory to store the generated keys in", default="/run/gitea-web-secret-provider") parser.add_argument("--authorized-keys-path", metavar='PATH', help="The path to the resulting authorized_keys file", default="/etc/ssh/authorized_keys.d/gitea-web-secret-provider") parser.add_argument("--rrsync-path", metavar='PATH', help="The path to the rrsync binary", default="/run/current-system/sw/bin/rrsync") parser.add_argument("--web-dir", metavar='PATH', help="The directory to sync the repositories to", default="/var/www") parser.add_argument("--force", action="store_true", help="Overwrite existing keys") return parser.parse_args() def add_secret(args, token, repo, name, secret): result = requests.put( f"{args.api_url}/repos/{args.org}/{repo}/actions/secrets/{name}", json = { 'data': secret }, headers = { 'Authorization': 'token ' + token }, ) if result.status_code not in (201, 204): raise Exception(f"Failed to add secret: {result.json()}") def get_org_repo_list(args, token): result = requests.get( f"{args.api_url}/orgs/{args.org}/repos", headers = { 'Authorization': 'token ' + token }, ) return [repo["name"] for repo in result.json()] def generate_ssh_key(args, repository: str): keyname = hashlib.sha256(args.org.encode() + repository.encode()).hexdigest() if not os.path.exists(os.path.join(args.key_dir, keyname)) or args.force: subprocess.run( [ "ssh-keygen", *("-t", "ed25519"), *("-b", "4096"), *("-f", os.path.join(args.key_dir, keyname)), *("-N", ""), *("-C", f"{args.org}/{repository}"), ], check=True, stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) print(f"Generated SSH key for `{args.org}/{repository}`") with open(os.path.join(args.key_dir, keyname), "r") as f: private_key = f.read() with open(os.path.join(args.key_dir, keyname + ".pub"), "r") as f: public_key = f.read() return private_key, public_key def generate_authorized_keys(args, repo_public_keys: list[tuple[str, str]]): result = "" for repo, public_key in repo_public_keys: result += f""" command="{args.rrsync_path} -wo {args.web_dir}/{args.org}/{repo}",restrict,no-agent-forwarding,no-port-forwarding,no-pty,no-X11-forwarding {public_key} """.strip() + "\n" with open(args.authorized_keys_path, "w") as f: f.write(result) def main(): args = parse_args() with open(args.token_path, "r") as f: token = f.read().strip() os.makedirs(args.key_dir, 0o700, exist_ok=True) repos = get_org_repo_list(args, token) print(f'Found {len(repos)} repositories in `{args.org}`') repo_public_keys = [] for repo in repos: print(f"Locating key for `{args.org}/{repo}`") private_key, public_key = generate_ssh_key(args, repo) add_secret(args, token, repo, "WEB_SYNC_SSH_KEY", private_key) repo_public_keys.append((repo, public_key)) generate_authorized_keys(args, repo_public_keys) print(f"Wrote authorized_keys file to `{args.authorized_keys_path}`") if __name__ == "__main__": main()