Child: [45cae1] (diff)

Download this file

test-branches-against-tickets.py    140 lines (111 with data), 5.1 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
from ConfigParser import ConfigParser, NoOptionError
import json
import oauth2 as oauth
import os
import re
import shlex
import subprocess
import sys
import urlparse
import webbrowser
CP = ConfigParser()
re_ticket_branch = re.compile('^\s*origin/.*/(\d+)$')
def main():
git('remote prune origin')
branches_for_tickets = dict() # maps ticket numbers to the actual branch e.g., int(42) -> 'origin/rc/42'
ticket_nums = dict() # maps ticket numbers to 'merged' or 'unmerged' according to the matching branch
merged_branches = [ branch[2:] for branch in git('branch -r --merged dev') if re_ticket_branch.match(branch) ]
unmerged_branches = [ branch[2:] for branch in git('branch -r --no-merged dev') if re_ticket_branch.match(branch) ]
for branch in merged_branches:
tn = int(re_ticket_branch.match(branch).group(1))
branches_for_tickets[tn] = branch
ticket_nums[tn] = 'merged'
for branch in unmerged_branches:
# we'll consider it merged if `git cherry` thinks it is
commits = ''.join(git('cherry', 'dev', branch, strip_eol=False))
tn = int(re_ticket_branch.match(branch).group(1))
branches_for_tickets[tn] = branch
ticket_nums[tn] = 'merged' if commits.find('+')==-1 else 'unmerged'
failure = False
CP.read(os.path.join(os.environ['HOME'], '.forgepushrc'))
oauth_client = make_oauth_client()
for tn in ticket_nums:
resp = oauth_client.request('http://sourceforge.net/rest/p/allura/tickets/%s/' % tn)
#assert resp[0]['status'] == '200', (resp, tn)
if resp[0]['status'] != '200':
continue
ticket = json.loads(resp[1])['ticket']
if ticket is None:
continue
is_closed = ticket['status'] in ('closed', 'validation')
is_merged = ticket_nums[tn] == 'merged'
if is_closed != is_merged:
print('<http://sourceforge.net/p/allura/tickets/%s/> is status:"%s", but the branch "%s" is %s' % (tn, ticket['status'], branches_for_tickets[tn], ticket_nums[tn]))
failure = True
if failure:
sys.exit(1)
def make_oauth_client():
"""
Build an oauth.Client with which callers can query Allura.
See format_changes for an example use.
Uses global CP, a ConfigParser
Re-usable - copy & pasted between Allura, sfpy, and sfx push scripts
"""
# https://sourceforge.net/p/forge/documentation/API%20-%20Beta/
REQUEST_TOKEN_URL = 'http://sourceforge.net/rest/oauth/request_token'
AUTHORIZE_URL = 'https://sourceforge.net/rest/oauth/authorize'
ACCESS_TOKEN_URL = 'http://sourceforge.net/rest/oauth/access_token'
oauth_key = option('re', 'oauth_key', 'Forge API OAuth Key (https://sourceforge.net/auth/oauth/): ')
oauth_secret = option('re', 'oauth_secret', 'Forge API Oauth Secret: ')
consumer = oauth.Consumer(oauth_key, oauth_secret)
try:
oauth_token = CP.get('re', 'oauth_token')
oauth_token_secret = CP.get('re', 'oauth_token_secret')
except NoOptionError:
client = oauth.Client(consumer)
resp, content = client.request(REQUEST_TOKEN_URL, 'GET')
assert resp['status'] == '200', resp
request_token = dict(urlparse.parse_qsl(content))
pin_url = "%s?oauth_token=%s" % (AUTHORIZE_URL, request_token['oauth_token'])
if getattr(webbrowser.get(), 'name', '') == 'links':
# sandboxes
print("Go to %s" % pin_url)
else:
webbrowser.open(pin_url)
oauth_verifier = raw_input('What is the PIN? ')
token = oauth.Token(request_token['oauth_token'], request_token['oauth_token_secret'])
token.set_verifier(oauth_verifier)
client = oauth.Client(consumer, token)
resp, content = client.request(ACCESS_TOKEN_URL, "GET")
access_token = dict(urlparse.parse_qsl(content))
oauth_token = access_token['oauth_token']
oauth_token_secret = access_token['oauth_token_secret']
CP.set('re', 'oauth_token', oauth_token)
CP.set('re', 'oauth_token_secret', oauth_token_secret)
access_token = oauth.Token(oauth_token, oauth_token_secret)
return oauth.Client(consumer, access_token)
def git(*args, **kw):
if len(args)==1 and isinstance(args[0], basestring):
argv = shlex.split(args[0])
else:
argv = list(args)
if argv[0] != 'git':
argv.insert(0, 'git')
p = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
p.wait()
output = p.stdout.readlines()
if kw.get('strip_eol', True):
output = [ line.rstrip('\n') for line in output ]
return output
def option(section, key, prompt=None):
""" shared (copy/paste) between Allura & sfpy """
if not CP.has_section(section):
CP.add_section(section)
if CP.has_option(section, key):
value = CP.get(section, key)
else:
value = raw_input(prompt or ('%s: ' % key))
CP.set(section, key, value)
return value
if __name__ == '__main__':
main()