Source code for rituals.acts.documentation

# -*- coding: utf-8 -*-
# pylint: disable=bad-continuation, bad-whitespace
""" 'docs' tasks.
# Copyright ⓒ  2015 Jürgen Hermann
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
# The full LICENSE file and source are available at
from __future__ import absolute_import, unicode_literals, print_function

import io
import os
import re
import sys
import time
import shutil
import tempfile
import textwrap
import webbrowser
from contextlib import contextmanager

    from configparser import ConfigParser, Error as ConfigError
except ImportError:
    from ConfigParser import RawConfigParser as ConfigParser, Error as ConfigError  # pylint: disable=import-error

import requests

from . import Collection, task
from .. import config
from ..util import notify
from ..util.filesys import pushd
from import capture

[docs]def get_pypi_auth(configfile='~/.pypirc'): """Read auth from pip config.""" pypi_cfg = ConfigParser() if try: user = pypi_cfg.get('pypi', 'username') pwd = pypi_cfg.get('pypi', 'password') return user, pwd except ConfigError: notify.warning("No PyPI credentials in '{}'," " will fall back to '~/.netrc'...".format(configfile)) return None
[docs]def watchdogctl(ctx, kill=False, verbose=True): """Control / check a running Sphinx autobuild process.""" tries = 40 if kill else 0 cmd = 'lsof -i TCP:{} -s TCP:LISTEN -S -Fp 2>/dev/null'.format( pidno = 0 pidinfo = capture(cmd, ignore_failures=True) while pidinfo: pidline = next(filter(None, [re.match(r'^p(\d+)$', x) for x in pidinfo.splitlines()])) if not pidline: raise ValueError("Standard lsof output expected (got {!r})".format(pidinfo)) pidno = int(, 10) if verbose:"ps uw {}".format(pidno), echo=False) verbose = False tries -= 1 if tries <= 0: break else: try: os.kill(pidno, 0) #except ProcessLookupError: # XXX Python3 only # break except OSError as exc: # Python2 has no ProcessLookupError if exc.errno == 3: break raise else:"Killing PID {}".format(pidno))"kill {}".format(pidno), echo=False) time.sleep(.25) pid = capture(cmd, ignore_failures=True) return pidno
@task(default=True, help={ 'browse': "Open index page in browser tab", 'clean': "Start with a clean build area", 'watchdog': "Start autobuild watchdog?", 'kill': "Stop autobuild watchdog (and do nothing else)", 'status': "Show autobuild watchdog process state", 'opts': "Extra flags for Sphinx builder", }) def sphinx(ctx, browse=False, clean=False, watchdog=False, kill=False, status=False, opts=''): """Build Sphinx docs.""" cfg = config.load() if kill or status: if not watchdogctl(ctx, kill=kill):"No process bound to port {}".format( return if clean:"invoke clean --docs") # Convert markdown files, if applicable for basename in ('README', 'CONTRIBUTING'): markdown = cfg.rootjoin(basename + '.md') if os.path.exists(markdown): try: import pypandoc except ImportError as exc: notify.warning("Can't import 'pandoc' ({})".format(exc)) break else: pypandoc.convert(markdown, 'rst', outputfile=os.path.join(, basename + '.rst')) # LICENSE file if os.path.exists('LICENSE'): with'LICENSE', 'r') as inp: license_text = try: _, copyright_text = cfg.project['long_description'].split('Copyright', 1) except (KeyError, ValueError): copyright_text = cfg.project.get('license', 'N/A') with, 'LICENSE.rst'), 'w') as out: out.write( 'Software License\n' '================\n' '\n' ' Copyright {}\n' '\n' 'Full License Text\n' '-----------------\n' '\n' '::\n' '\n' .format(copyright_text) ) license_text = textwrap.dedent(license_text) license_text = '\n '.join(license_text.splitlines()) out.write(' {}\n'.format(license_text)) # Build API docs if cfg.project.get('packages'): cmd = ['sphinx-apidoc', '-o', 'api', '-f', '-M'] for package in cfg.project.packages: if '.' not in package: cmd.append(cfg.srcjoin(package)) with pushd(' '.join(cmd)) # Auto build? cmd = ['sphinx-build', '-b', 'html'] if opts: cmd.append(opts) cmd.extend(['.',]) index_url = index_file = os.path.join(,, 'index.html') if watchdog: watchdogctl(ctx, kill=True) cmd[0:1] = ['nohup', 'sphinx-autobuild'] cmd.extend([ '-H',, '-p', '{}'.format(, "-i'{}'".format('*~'), "-i'{}'".format('.*'), "-i'{}'".format('*.log'), ">watchdog.log", "2>&1", "&", ]) index_url = "http://{}:{}/".format(, # Build docs"Starting Sphinx {}build...".format('auto' if watchdog else '')) with pushd(' '.join(cmd), pty=not watchdog) # Wait for watchdog to bind to listening port if watchdog: def activity(what=None, i=None): "Helper" if i is None: sys.stdout.write(what + '\n') else: sys.stdout.write(' {} Waiting for {}\r'.format(r'\|/-'[i % 4], what or 'something')) sys.stdout.flush() for i in range(60): activity('server start', i) if watchdogctl(ctx): activity('OK') break time.sleep(1) else: activity('ERR') # trigger first build os.utime(os.path.join(, 'index.rst'), None) for i in range(60): activity('HTML index file', i) if os.path.exists(index_file): activity('OK') break time.sleep(1) else: activity('ERR') # Open in browser? if browse: time.sleep(1) webbrowser.open_new_tab(index_url) @task(help={ 'no-publish': "Do not publish to Confluence, just build", 'clean': "Start with a clean build area", 'opts': "Extra flags for Sphinx builder", }) def confluence(ctx, no_publish=False, clean=False, opts=''): """Build Sphinx docs and publish to Confluence.""" cfg = config.load() if clean:"invoke clean --docs") cmd = ['sphinx-build', '-b', 'confluence'] cmd.extend(['-E', '-a']) # force a full rebuild if opts: cmd.append(opts) cmd.extend(['.', + '_cf']) if no_publish: cmd.extend(['-Dconfluence_publish=False']) # Build docs"Starting Sphinx build...") with pushd(' '.join(cmd), pty=True) try: import sphinxcontrib.confluencebuilder except ImportError: del confluence
[docs]class DocsUploader(object): """Helper to perform an upload of pre-built docs.""" def __init__(self, ctx, cfg, target): self.ctx = ctx self.cfg = cfg = target or self.params = getattr(,, None) if self.params is None: notify.failure("Unknown upload target '{}'!".format( if not self.params.get('url'): notify.failure("You must provide an upload URL for target '{}', e.g. via the environment:\n" " export INVOKE_RITUALS_DOCS_UPLOAD_TARGETS_{}_URL='http://.../{{name}}-{{version}}.zip'" .format(, @contextmanager def _zipped(self, docs_base): """ Provide a zipped stream of the docs tree.""" with pushd(docs_base): with tempfile.NamedTemporaryFile(prefix='pythonhosted-', delete=False) as ziphandle: pass zip_name = shutil.make_archive(, 'zip')"Uploading {:.1f} MiB from '{}' to '{}'..." .format(os.path.getsize(zip_name) / 1024.0, zip_name, with, 'rb') as zipread: try: yield zipread finally: os.remove( os.remove( + '.zip') def _to_pypi(self, docs_base, release): """Upload to PyPI.""" url = None with self._zipped(docs_base) as handle: reply =['url'], auth=get_pypi_auth(), allow_redirects=False, files=dict(content=( + '.zip', handle, 'application/zip')), data={':action': 'doc_upload', 'name':}) if reply.status_code in range(200, 300):"{status_code} {reason}".format(**vars(reply))) elif reply.status_code == 301: url = reply.headers['location'] else: data = self.cfg.copy() data.update(self.params) data.update(vars(reply)) notify.error("{status_code} {reason} for POST to {url}".format(**data)) return url def _to_webdav(self, docs_base, release): """Upload to WebDAV store.""" url = None with self._zipped(docs_base) as handle: reply = requests.put(self.params['url'].format(, version=release),, headers={'Accept': 'application/json'}) if reply.status_code in range(200, 300):"{status_code} {reason}".format(**vars(reply))) try: data = reply.json() except ValueError as exc: notify.warning("Didn't get a JSON response! ({})".format(exc)) else: if 'downloadUri' in data: # Artifactory url = data['downloadUri'] + '!/index.html' elif reply.status_code == 301: url = reply.headers['location'] else: data = self.cfg.copy() data.update(self.params) data.update(vars(reply)) notify.error("{status_code} {reason} for PUT to {url}".format(**data)) if not url: notify.warning("Couldn't get URL from upload response!") return url
[docs] def upload(self, docs_base, release): """Upload docs in ``docs_base`` to the target of this uploader.""" return getattr(self, '_to_' +, release)
@task(help={ 'browse': "Open index page on successful upload", 'target': "Upload target name (default: pypi)", 'release': "Version for upload path (default: latest)", }) def upload(ctx, browse=False, target=None, release='latest'): """Upload a ZIP of built docs (by default to PyPI, else a WebDAV URL).""" cfg = config.load() uploader = DocsUploader(ctx, cfg, target) html_dir = os.path.join(, if not os.path.isdir(html_dir): notify.failure("No HTML docs dir found at '{}'!".format(html_dir)) url = uploader.upload(html_dir, release)"Uploaded docs to '{url}'!".format(url=url or 'N/A')) if url and browse: # Open in browser? webbrowser.open_new_tab(url) namespace = Collection.from_module(sys.modules[__name__], name='docs', config={'rituals': dict( docs = dict( sources = 'docs', build = '_build', watchdog = dict( host = '', port = 8840, ), upload = dict( method = 'pypi', targets = dict( pypi = dict(url=''), webdav = dict(url=None), # must be set in the environment ), ), ), )})