-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgha-whitelist-manager.py
117 lines (102 loc) · 4.15 KB
/
gha-whitelist-manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#!/usr/bin/env python3
import re
import sys
import yaml
import asfpy.messaging
import asfpy.pubsub
import argparse
import requests
import logging
import json
import smtplib
ORG = "apache"
PUBLIC_INTERFACE = "infrastructure-gha-whitelist-manager"
APPROVED_PATTERNS_FILEPATH = "approved_patterns.yml"
github_timewait = 60
class Log:
def __init__(self, config):
self.config = config
self.log = logging.getLogger(__name__)
self.verbosity = {
0: logging.INFO,
1: logging.CRITICAL,
2: logging.ERROR,
3: logging.WARNING,
4: logging.INFO,
5: logging.DEBUG,
}
self.stdout_fmt = logging.Formatter(
"{asctime} [{levelname}] {funcName}: {message}", style="{"
)
if self.config["logfile"] == "stdout":
self.to_stdout = logging.StreamHandler(sys.stdout)
self.to_stdout.setLevel(self.verbosity[self.config["verbosity"]])
self.to_stdout.setFormatter(self.stdout_fmt)
self.log.setLevel(self.verbosity[self.config["verbosity"]])
self.log.addHandler(self.to_stdout)
else:
self.log.setLevel(self.verbosity[self.config["verbosity"]])
logging.basicConfig(
format="%(asctime)s [%(levelname)s] %(funcName)s: %(message)s",
filename=self.config["logfile"],
)
class WhitelistUpdater:
""" Scans pubsub for changes to a defined whitelist, and Handles the API requests to GitHub """
def __init__(self, config):
self.config = config
self.action_url = f"https://api.github.com/orgs/{ORG}/actions/permissions/selected-actions"
self.raw_url = f"https://rawusercontent.github.com/{ORG}/{PUBLIC_INTERFACE}/refs/heads/main/{APPROVED_PATTERNS_FILEPATH}"
self.s = requests.Session()
# Fetch the mail map
self.mail_map = {}
raw_map = self.s.get("https://whimsy.apache.org/public/committee-info.json").json()['committees']
[ self.mail_map.update({ item: raw_map[item]['mail_list']}) for item in raw_map ]
# Add the GitHub Headers
self.s.headers.update(
{
"Accept": "application/vnd.github+json",
"Authorization": f"Bearer {self.config['gha_token']}",
"X-GitHub-Api-Version": "2022-11-28",
}
)
self.pubsub = f"https://pubsub.apache.org:2070/git/{PUBLIC_INTERFACE}"
self.logger = Log(config)
def scan(self):
self.logger.log.info("Connecting to %s" % self.pubsub)
asfpy.pubsub.listen_forever(self.handler, self.pubsub, raw=True)
def update(self, wlist):
"""Update the GitHub actions whitelist for the org"""
data = {
"github_owned_allowed": True,
"verified_allowed": False,
"patterns_allowed": wlist,
}
# r = s.put("%s/%s" % (self.action_url, ), data=json.dumps(data))
if results.status_code == 204:
print("Updated.")
def handler(self, data):
if "commit" in data and data["commit"]["project"] == PUBLIC_INTERFACE:
# Check if modified files are in path
p = re.compile(r"^{APPROVED_PATTERNS_FILEPATH}$")
results = [w for w in data["commit"].get("files", []) if p.match(w)]
print(results)
if len(results) > 0:
self.logger.log.debug("Updated whitelist detected")
self.s.get(self.raw)
# TODO trigger self.update with contents
wlist = self.s.get(self.raw_url)
print(wlist)
self.logger.log.debug(f"{wlist}")
else:
self.logger.log.info("Heartbeat Signal Detected")
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--config", help="Configuration file", default="gha-whitelist-manager.yml")
args = parser.parse_args()
setattr(args, "uri", "orgs/asf-transfer/actions/permissions/selected-actions")
return args
if __name__ == "__main__":
args = get_args()
config = yaml.safe_load(open(args.config, "r").read())
w = WhitelistUpdater(config)
w.scan()