# -*- coding: utf-8 -*-
# pylint: disable=bad-continuation, bad-whitespace
""" 'docs' tasks.
"""
# Copyright ⓒ 2015 Jürgen Hermann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# The full LICENSE file and source are available at
# https://github.com/jhermann/rituals
from __future__ import absolute_import, unicode_literals, print_function
import io
import os
import re
import sys
import time
import shutil
import tempfile
import textwrap
import webbrowser
from contextlib import contextmanager
try:
from configparser import ConfigParser, Error as ConfigError
except ImportError:
from ConfigParser import RawConfigParser as ConfigParser, Error as ConfigError # pylint: disable=import-error
import requests
from . import Collection, task
from .. import config
from ..util import notify
from ..util.filesys import pushd
from ..util.shell import capture
[docs]def get_pypi_auth(configfile='~/.pypirc'):
"""Read auth from pip config."""
pypi_cfg = ConfigParser()
if pypi_cfg.read(os.path.expanduser(configfile)):
try:
user = pypi_cfg.get('pypi', 'username')
pwd = pypi_cfg.get('pypi', 'password')
return user, pwd
except ConfigError:
notify.warning("No PyPI credentials in '{}',"
" will fall back to '~/.netrc'...".format(configfile))
return None
[docs]def watchdogctl(ctx, kill=False, verbose=True):
"""Control / check a running Sphinx autobuild process."""
tries = 40 if kill else 0
cmd = 'lsof -i TCP:{} -s TCP:LISTEN -S -Fp 2>/dev/null'.format(ctx.rituals.docs.watchdog.port)
pidno = 0
pidinfo = capture(cmd, ignore_failures=True)
while pidinfo:
pidline = next(filter(None, [re.match(r'^p(\d+)$', x) for x in pidinfo.splitlines()]))
if not pidline:
raise ValueError("Standard lsof output expected (got {!r})".format(pidinfo))
pidno = int(pidline.group(1), 10)
if verbose:
ctx.run("ps uw {}".format(pidno), echo=False)
verbose = False
tries -= 1
if tries <= 0:
break
else:
try:
os.kill(pidno, 0)
#except ProcessLookupError: # XXX Python3 only
# break
except OSError as exc: # Python2 has no ProcessLookupError
if exc.errno == 3:
break
raise
else:
notify.info("Killing PID {}".format(pidno))
ctx.run("kill {}".format(pidno), echo=False)
time.sleep(.25)
pid = capture(cmd, ignore_failures=True)
return pidno
@task(default=True, help={
'browse': "Open index page in browser tab",
'clean': "Start with a clean build area",
'watchdog': "Start autobuild watchdog?",
'kill': "Stop autobuild watchdog (and do nothing else)",
'status': "Show autobuild watchdog process state",
'opts': "Extra flags for Sphinx builder",
})
def sphinx(ctx, browse=False, clean=False, watchdog=False, kill=False, status=False, opts=''):
"""Build Sphinx docs."""
cfg = config.load()
if kill or status:
if not watchdogctl(ctx, kill=kill):
notify.info("No process bound to port {}".format(ctx.rituals.docs.watchdog.port))
return
if clean:
ctx.run("invoke clean --docs")
# Convert markdown files, if applicable
for basename in ('README', 'CONTRIBUTING'):
markdown = cfg.rootjoin(basename + '.md')
if os.path.exists(markdown):
try:
import pypandoc
except ImportError as exc:
notify.warning("Can't import 'pandoc' ({})".format(exc))
break
else:
pypandoc.convert(markdown, 'rst', outputfile=os.path.join(ctx.rituals.docs.sources, basename + '.rst'))
# LICENSE file
if os.path.exists('LICENSE'):
with io.open('LICENSE', 'r') as inp:
license_text = inp.read()
try:
_, copyright_text = cfg.project['long_description'].split('Copyright', 1)
except (KeyError, ValueError):
copyright_text = cfg.project.get('license', 'N/A')
with io.open(os.path.join(ctx.rituals.docs.sources, 'LICENSE.rst'), 'w') as out:
out.write(
'Software License\n'
'================\n'
'\n'
' Copyright {}\n'
'\n'
'Full License Text\n'
'-----------------\n'
'\n'
'::\n'
'\n'
.format(copyright_text)
)
license_text = textwrap.dedent(license_text)
license_text = '\n '.join(license_text.splitlines())
out.write(' {}\n'.format(license_text))
# Build API docs
if cfg.project.get('packages'):
cmd = ['sphinx-apidoc', '-o', 'api', '-f', '-M']
for package in cfg.project.packages:
if '.' not in package:
cmd.append(cfg.srcjoin(package))
with pushd(ctx.rituals.docs.sources):
ctx.run(' '.join(cmd))
# Auto build?
cmd = ['sphinx-build', '-b', 'html']
if opts:
cmd.append(opts)
cmd.extend(['.', ctx.rituals.docs.build])
index_url = index_file = os.path.join(ctx.rituals.docs.sources, ctx.rituals.docs.build, 'index.html')
if watchdog:
watchdogctl(ctx, kill=True)
cmd[0:1] = ['nohup', 'sphinx-autobuild']
cmd.extend([
'-H', ctx.rituals.docs.watchdog.host,
'-p', '{}'.format(ctx.rituals.docs.watchdog.port),
"-i'{}'".format('*~'),
"-i'{}'".format('.*'),
"-i'{}'".format('*.log'),
">watchdog.log", "2>&1", "&",
])
index_url = "http://{}:{}/".format(ctx.rituals.docs.watchdog.host, ctx.rituals.docs.watchdog.port)
# Build docs
notify.info("Starting Sphinx {}build...".format('auto' if watchdog else ''))
with pushd(ctx.rituals.docs.sources):
ctx.run(' '.join(cmd), pty=not watchdog)
# Wait for watchdog to bind to listening port
if watchdog:
def activity(what=None, i=None):
"Helper"
if i is None:
sys.stdout.write(what + '\n')
else:
sys.stdout.write(' {} Waiting for {}\r'.format(r'\|/-'[i % 4], what or 'something'))
sys.stdout.flush()
for i in range(60):
activity('server start', i)
if watchdogctl(ctx):
activity('OK')
break
time.sleep(1)
else:
activity('ERR')
# trigger first build
os.utime(os.path.join(ctx.rituals.docs.sources, 'index.rst'), None)
for i in range(60):
activity('HTML index file', i)
if os.path.exists(index_file):
activity('OK')
break
time.sleep(1)
else:
activity('ERR')
# Open in browser?
if browse:
time.sleep(1)
webbrowser.open_new_tab(index_url)
@task(help={
'no-publish': "Do not publish to Confluence, just build",
'clean': "Start with a clean build area",
'opts': "Extra flags for Sphinx builder",
})
def confluence(ctx, no_publish=False, clean=False, opts=''):
"""Build Sphinx docs and publish to Confluence."""
cfg = config.load()
if clean:
ctx.run("invoke clean --docs")
cmd = ['sphinx-build', '-b', 'confluence']
cmd.extend(['-E', '-a']) # force a full rebuild
if opts:
cmd.append(opts)
cmd.extend(['.', ctx.rituals.docs.build + '_cf'])
if no_publish:
cmd.extend(['-Dconfluence_publish=False'])
# Build docs
notify.info("Starting Sphinx build...")
with pushd(ctx.rituals.docs.sources):
ctx.run(' '.join(cmd), pty=True)
try:
import sphinxcontrib.confluencebuilder
except ImportError:
del confluence
[docs]class DocsUploader(object):
"""Helper to perform an upload of pre-built docs."""
def __init__(self, ctx, cfg, target):
self.ctx = ctx
self.cfg = cfg
self.target = target or ctx.rituals.docs.upload.method
self.params = getattr(ctx.rituals.docs.upload.targets, self.target, None)
if self.params is None:
notify.failure("Unknown upload target '{}'!".format(self.target))
if not self.params.get('url'):
notify.failure("You must provide an upload URL for target '{}', e.g. via the environment:\n"
" export INVOKE_RITUALS_DOCS_UPLOAD_TARGETS_{}_URL='http://.../{{name}}-{{version}}.zip'"
.format(self.target, self.target.upper()))
@contextmanager
def _zipped(self, docs_base):
""" Provide a zipped stream of the docs tree."""
with pushd(docs_base):
with tempfile.NamedTemporaryFile(prefix='pythonhosted-', delete=False) as ziphandle:
pass
zip_name = shutil.make_archive(ziphandle.name, 'zip')
notify.info("Uploading {:.1f} MiB from '{}' to '{}'..."
.format(os.path.getsize(zip_name) / 1024.0, zip_name, self.target))
with io.open(zip_name, 'rb') as zipread:
try:
yield zipread
finally:
os.remove(ziphandle.name)
os.remove(ziphandle.name + '.zip')
def _to_pypi(self, docs_base, release):
"""Upload to PyPI."""
url = None
with self._zipped(docs_base) as handle:
reply = requests.post(self.params['url'], auth=get_pypi_auth(), allow_redirects=False,
files=dict(content=(self.cfg.project.name + '.zip', handle, 'application/zip')),
data={':action': 'doc_upload', 'name': self.cfg.project.name})
if reply.status_code in range(200, 300):
notify.info("{status_code} {reason}".format(**vars(reply)))
elif reply.status_code == 301:
url = reply.headers['location']
else:
data = self.cfg.copy()
data.update(self.params)
data.update(vars(reply))
notify.error("{status_code} {reason} for POST to {url}".format(**data))
return url
def _to_webdav(self, docs_base, release):
"""Upload to WebDAV store."""
url = None
with self._zipped(docs_base) as handle:
reply = requests.put(self.params['url'].format(name=self.cfg.project.name, version=release),
data=handle.read(), headers={'Accept': 'application/json'})
if reply.status_code in range(200, 300):
notify.info("{status_code} {reason}".format(**vars(reply)))
try:
data = reply.json()
except ValueError as exc:
notify.warning("Didn't get a JSON response! ({})".format(exc))
else:
if 'downloadUri' in data: # Artifactory
url = data['downloadUri'] + '!/index.html'
elif reply.status_code == 301:
url = reply.headers['location']
else:
data = self.cfg.copy()
data.update(self.params)
data.update(vars(reply))
notify.error("{status_code} {reason} for PUT to {url}".format(**data))
if not url:
notify.warning("Couldn't get URL from upload response!")
return url
[docs] def upload(self, docs_base, release):
"""Upload docs in ``docs_base`` to the target of this uploader."""
return getattr(self, '_to_' + self.target)(docs_base, release)
@task(help={
'browse': "Open index page on successful upload",
'target': "Upload target name (default: pypi)",
'release': "Version for upload path (default: latest)",
})
def upload(ctx, browse=False, target=None, release='latest'):
"""Upload a ZIP of built docs (by default to PyPI, else a WebDAV URL)."""
cfg = config.load()
uploader = DocsUploader(ctx, cfg, target)
html_dir = os.path.join(ctx.rituals.docs.sources, ctx.rituals.docs.build)
if not os.path.isdir(html_dir):
notify.failure("No HTML docs dir found at '{}'!".format(html_dir))
url = uploader.upload(html_dir, release)
notify.info("Uploaded docs to '{url}'!".format(url=url or 'N/A'))
if url and browse: # Open in browser?
webbrowser.open_new_tab(url)
namespace = Collection.from_module(sys.modules[__name__], name='docs', config={'rituals': dict(
docs = dict(
sources = 'docs',
build = '_build',
watchdog = dict(
host = '127.0.0.1',
port = 8840,
),
upload = dict(
method = 'pypi',
targets = dict(
pypi = dict(url='https://pypi.python.org/pypi'),
webdav = dict(url=None), # must be set in the environment
),
),
),
)})