User:RussBot/dabmaintbot.py
Appearance
#!/usr/bin/python
"""
dabmaintbot - Bot to update link counts on
[[en:Wikipedia:Disambiguation pages maintenance]]
"""
import datetime
import locale
import re, sys, traceback
import simplejson
import urllib
import wikipedia, pagegenerators
locale.setlocale(locale.LC_ALL, '')
s#Constants:
ACTIVE_CUTOFF = 100
HISTORY_LEN = 6
import datetime
started_at = datetime.datetime.now()
# cache page objects to reduce server load
pagecache = {}
def getPage(title):
global pagecache
if '#' in title:
sf_title = title[:title.index('#')]
else:
sf_title = title
return pagecache.setdefault(sf_title, wikipedia.Page(site, title))
def cacheput(page):
global pagecache
title = page.sectionFreeTitle()
pagecache[title] = page
def prefetch(page):
while True:
try:
page.get(get_redirect=True)
return
except wikipedia.BadTitle:
wikipedia.output("Got BadTitle exception on %s; retrying."
% page.title())
continue
except wikipedia.Error:
return
def refcount(page):
if hasattr(page, "refcount"):
return page.refcount
data = {'action': 'query',
'generator': 'backlinks',
'gbltitle': page.sectionFreeTitle(),
'gblnamespace': '0',
'gbllimit': '500',
'redirects': 'redirects',
'format': 'json',
}
count = 0
while True:
wikipedia.get_throttle()
try:
# wikipedia.output("Getting references to %s" % page.aslink())
reflist = site.getUrl(site.api_address(), data=data)
except:
traceback.print_exc(file=sys.stderr)
continue
try:
result = simplejson.loads(reflist)
except ValueError:
continue
if type(result) is not dict or 'query' not in result:
return 0
if 'redirects' in result['query']:
for redirect in result['query']['redirects']:
if redirect['to'] == page.sectionFreeTitle():
count += refcount(wikipedia.Page(site, redirect['from']))
if 'pages' in result['query']:
for ref_id in result['query']['pages']:
refpage = result['query']['pages'][ref_id]
if refpage['title'] != page.sectionFreeTitle():
count += 1
if "query-continue" in result:
data.update(result['query-continue']['backlinks'])
else:
return count
def increasing(seq):
'''Return True if seq is uniformly increasing (from last to first),
False otherwise'''
for index in range( len(seq) - 1 ):
if seq[index] <= seq[index+1]:
return False
return True
def fmt(num):
return locale.format("%i", num, grouping=True)
try:
site = wikipedia.getSite()
#input pages
maint_page = wikipedia.Page(site,
"Wikipedia:Disambiguation pages with links/Current list")
dump_page = wikipedia.Page(site,
"User:RussBot/DPL")
problem_page = wikipedia.Page(site,
"Wikipedia:Disambiguation pages with links/problems")
#output pages
result_page = wikipedia.Page(site,
"Wikipedia:Disambiguation pages with links/Current list")
problem_result = wikipedia.Page(site,
"Wikipedia:Disambiguation pages with links/problems")
for arg in sys.argv[1:]:
arg = wikipedia.argHandler(arg, 'dabmaintbot')
if arg:
print "Unrecognized command line argument: %s" % arg
# show help text and exit
wikipedia.argHandler("-help", "dabmaintbot")
mylang = site.language()
fixed_pages = 0
fixed_links = 0
problems = []
m_text = maint_page.get()
active_r = re.compile(
r"^# (?:'''• )?\[\[(.+)\]\] *\(([0-9]*) *" +
r"\[\[Special:Whatlinkshere/(?:.+)\|links\]\]\) *" +
r"(?:\((?:(?:new)|(?:[-+][0-9]+))\))? *" +
r"(?:<!-- history (.*?)-->)? *(.*?) *(?:''')? *$", re.M)
# the groups matched by this regex are:
# 1. the title of a disambiguation page
# 2. the number of links found last time the bot ran (may be empty)
# 3. the history of the page's link count (may be empty), consisting of a
# space-separated string of numbers
# 4. any notes added by users at the end of the line
inactive_r = re.compile(
r'^# \[\[(.+)\]\] \(([0-9]+)\) history ([0-9 ]*):(.*) *$', re.M)
# the groups matched by this regex are the same as for active_r
# lists are demarcated by HTML comments
# Step 1: Collect all links and histories from the last scan
start_mark = u"<!-- section title="
end_mark = u"<!-- end section -->"
marker = 0
new_text = []
disambiglinks = {}
total_count = [0, 0, 0, 0]
sections = []
diffs = []
while True:
section_start = m_text.find(start_mark, marker)
if section_start == -1:
break
title_mark = section_start + len(start_mark)
section_title = m_text[title_mark:
m_text.find(u" -->\n", title_mark)]
section_marker = title_mark + len(section_title) + len(" -->\n")
if section_marker >= len(m_text):
wikipedia.output(
u"ERROR: cannot locate section title in %s" % section_title)
raise RuntimeError
section_end = m_text.find(end_mark, section_marker)
if section_end == -1:
wikipedia.output(
u"ERROR: cannot locate end of section %s" % section_title)
raise RuntimeError
marker = section_end
sections.append((section_title, section_marker, section_end))
sectionnumber = len(sections) - 1
for item in active_r.finditer(m_text, section_marker, section_end):
link_page_title = item.group(1)
link_page = getPage(link_page_title)
try:
prefetch(link_page)
while link_page.isRedirectPage():
link_page = link_page.getRedirectTarget()
prefetch(link_page)
if not link_page.isDisambig():
continue
except wikipedia.NoPage:
continue
link_page_title = link_page.sectionFreeTitle()
if link_page_title in disambiglinks.keys():
continue
count = refcount(link_page)
wikipedia.output(u"%s [%i]" % (link_page.title(), count))
if item.group(3):
history = item.group(3)
else:
history = u''
disambiglinks[link_page_title] = {
'section': sectionnumber,
'title': link_page_title,
'count': count,
'history_text': history,
'trailing_text': item.group(4).strip()
}
# search for inactive listings, which should always follow active ones
for item in inactive_r.finditer(m_text, section_marker, section_end):
link_page_title = item.group(1)
link_page = getPage(link_page_title)
try:
prefetch(link_page)
while link_page.isRedirectPage():
link_page = link_page.getRedirectTarget()
prefetch(link_page)
if not link_page.isDisambig():
continue
except wikipedia.NoPage:
continue
link_page_title = link_page.title()
if link_page_title in disambiglinks.keys():
continue
count = refcount(link_page)
wikipedia.output(u"%s [%i]" % (link_page.title(), count))
if item.group(3):
history = item.group(3)
else:
history = u''
disambiglinks[link_page_title] = {
'section': sectionnumber,
'title': link_page_title,
'count': count,
'history_text': history,
'trailing_text': item.group(4).strip()
}
# Step 2. Collect links from data dump output page and add any that
# aren't already in the collection
for link_page in dump_page.linkedPages():
try:
prefetch(link_page)
while link_page.isRedirectPage():
link_page = link_page.getRedirectTarget()
prefetch(link_page)
if not link_page.isDisambig():
continue
except wikipedia.NoPage:
continue
link_page_title = link_page.sectionFreeTitle()
if link_page_title in disambiglinks.keys():
continue
count = refcount(link_page)
wikipedia.output(u"%s [%i]" % (link_page.title(), count))
history = u''
disambiglinks[link_page_title] = {
'section': 0, # All new articles go into 'general' until classified
'title': link_page_title,
'count': count,
'history_text': history,
'trailing_text': u''
}
# Step 3. Sort links by section and count, and output page
marker = 0
for (number, (section_name, section_marker, section_end)
) in enumerate(sections):
section_links = [link for link in disambiglinks.values()
if link['section'] == number]
section_links.sort(key=lambda i:i['count'], reverse=True)
section_count = [0, 0]
new_text.append(m_text[marker:section_marker])
active = True
for link in section_links:
if link['count'] < ACTIVE_CUTOFF and active:
active = False
new_text.append(u"<!-- Inactive articles:\n")
if link['history_text']:
history = [int(n) for n in link['history_text'].split(" ")]
else:
history = []
history = [link['count']] + history
while len(history) > HISTORY_LEN:
del history[-1]
if len(history) == 1:
link['diff'] = 'new'
else:
link['diff'] = "%+i" % (history[0] - history[1])
diffs.append( (history[0]-history[1], link['title']) )
if history[0] < history[1]:
fixed_pages += 1
fixed_links += (history[1] - history[0])
link['history_text'] = " ".join(str(x) for x in history)
## print link[1]+":", history
if max(history) < ACTIVE_CUTOFF / 4:
# discard items that have no significant history
continue
if active:
section_count[0] += 1
section_count[1] += link['count']
item = (
u"[[%(title)s]] (%(count)i [[Special:Whatlinkshere/%(title)s|links]]) " +
u"(%(diff)s)<!-- history %(history_text)s--> %(trailing_text)s") % link
# bullet items that have shown unusual or persistent increases
if (len(history) > 1 and
history[0]-history[1] > ACTIVE_CUTOFF / 2
) or (
len(history) == HISTORY_LEN and
increasing(history) and
history[0] - history[-1] > ACTIVE_CUTOFF
):
prefix = "'''• "
suffix = "'''"
item.rstrip("'")
problems.append(
u"* [[%(title)s]] (%(count)i [[Special:Whatlinkshere/%(title)s|links]]) (%(diff)s)\n"
% link)
else:
prefix = suffix = ""
new_text.append("# %s%s%s\n" % (prefix, item, suffix))
else:
total_count[2] += 1
total_count[3] += link['count']
new_text.append(
u"# [[%(title)s]] (%(count)i) history %(history_text)s: %(trailing_text)s\n"
% link)
if not active:
new_text.append("-->\n")
marker = section_end
new_text.append(
u"\n Section '%s' contains %i links to %i active articles.\n" %
(section_name, section_count[1], section_count[0]))
total_count[0] += section_count[0]
total_count[1] += section_count[1]
diffs.sort()
statistics_point = m_text.find(u"|}")
if statistics_point >= 0:
text = m_text[marker:statistics_point]
text = re.sub(r"(?s)<!--banner-->.*?<!--/banner-->",
"""<!--banner-->
'''''Since last week, at least %s links to %s pages have been fixed!'''''
<!--/banner-->"""
% (fmt(fixed_links), fmt(fixed_pages)), text)
top10 = ["\n===Top 10 increases==="]
for item in reversed(diffs[-10:]):
top10.append("# [[%s]] (%i)" % (item[1], item[0]))
top10.append("===Top 10 decreases===")
for item in diffs[:10]:
top10.append("# [[%s]] (%i)" % (item[1], item[0]))
top10.append("<!--/banner-->")
text = text.replace("<!--/banner-->", "\n".join(top10))
new_text.append(text)
marker = statistics_point
new_text.append(u"|-\n")
today = datetime.date.today()
new_text.append(u"| %4i-%02i-%02i || %s || %s || %s || %s\n"
% (today.year, today.month, today.day,
fmt(total_count[0]+total_count[2]),
fmt(total_count[0]),
fmt(total_count[1]+total_count[3]),
fmt(total_count[1])))
new_text.append(m_text[marker:])
wikipedia.setAction(u"Disambiguation page maintenance script")
result_page.put(u"".join(new_text))
prob_text = problem_page.get()
header_start = prob_text.index("<noinclude>")
header_end = prob_text.index("</noinclude>") + len("</noinclude>")
problem_result.put(prob_text[header_start:header_end] + "\n" +
u"".join(problems))
finally:
elapsed = datetime.datetime.now() - started_at
print "elapsed time = " + str(elapsed)
wikipedia.stopme()