ipnbdoctest.py 9.3 KB
Newer Older
1
2
3
4
5
6
#!/usr/bin/env python
"""
simple example script for running and testing notebooks.

Usage: `ipnbdoctest.py foo.ipynb [bar.ipynb [...]]`

7
8
Each cell is submitted to the kernel, and the outputs are compared
with those stored in the notebook.
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
"""

from __future__ import print_function

import os,sys,time
import base64
import re
from difflib import unified_diff as diff

from collections import defaultdict
try:
    from queue import Empty
except ImportError:
    print('Python 3.x is needed to run this script.')
    sys.exit(77)

25
import importlib.util
26
try:
27
    importlib.util.find_spec('IPython')
28
29
30
31
except:
    print('IPython is needed to run this script.')
    sys.exit(77)

32
try:
33
    from jupyter_client import KernelManager
34
except ImportError:
35
36
37
    try:
        from IPython.kernel import KernelManager
    except ImportError:
38
39
40
41
42
43
        try:
            from IPython.zmq.blockingkernelmanager \
              import BlockingKernelManager as KernelManager
        except:
            print('IPython is needed to run this script.')
            sys.exit(77)
44

45
46
47
48
49
# Until Debian Stable ships IPython >3.0, we stick to the v3 format.
try:
    from nbformat import v3 as nbformat
except ImportError:
    from IPython.nbformat import v3 as nbformat
50
51
52
53
54
55
56
57
58
59
60
61
62

def compare_png(a64, b64):
    """compare two b64 PNGs (incomplete)"""
    try:
        import Image
    except ImportError:
        pass
    adata = base64.decodestring(a64)
    bdata = base64.decodestring(b64)
    return True

def sanitize(s):
    """sanitize a string for comparison.
63
64
65

    fix universal newlines, strip trailing newlines, and normalize likely
    random values (memory addresses and UUIDs)
66
67
68
69
70
    """
    if not isinstance(s, str):
        return s
    # normalize newline:
    s = s.replace('\r\n', '\n')
71

72
73
    # ignore trailing newlines (but not space)
    s = s.rstrip('\n')
74

75
76
    # remove hex addresses:
    s = re.sub(r'at 0x[a-f0-9]+', 'object', s)
77

78
79
    # normalize UUIDs:
    s = re.sub(r'[a-f0-9]{8}(\-[a-f0-9]{4}){3}\-[a-f0-9]{12}', 'U-U-I-D', s)
80

81
82
83
    # normalize graphviz version
    s = re.sub(r'Generated by graphviz version.*', 'VERSION', s)

84
    # remove Spins verbose output version
85
    s = re.sub(r'SpinS Promela Compiler.*Compiled C .* to .*pml.spins',
86
87
               'SpinS output', s, flags=re.DOTALL)

88
89
90
    # SVG generated by graphviz may put note at different positions
    # depending on the graphviz build.  Let's just strip anything that
    # look like a position.
91
92
93
94
95
96
97
    s = re.sub(r'<path[^/]* d="[^"]*"', '<path', s)
    s = re.sub(r'points="[^"]*"', 'points=""', s)
    s = re.sub(r'x="[0-9.-]+"', 'x=""', s)
    s = re.sub(r'y="[0-9.-]+"', 'y=""', s)
    s = re.sub(r'width="[0-9.]+pt"', 'width=""', s)
    s = re.sub(r'height="[0-9.]+pt"', 'height=""', s)
    s = re.sub(r'viewBox="[0-9 .-]*"', 'viewbox=""', s)
98
    s = re.sub(r'transform="[^"]*"', 'transform=""', s)
99
100
    s = re.sub(r'id="edge[^"]*"', 'id="edge"', s)
    s = re.sub(r'text-anchor="[^"]*"', 'text-anchor=""', s)
101
102
103
104
105
106
107
108
109
110
111
    # The following patterns from graphviz 2.40 are rewritten as they used to
    # be in 2.38.
    s = re.sub(r'"#000000"', '"black"', s)
    s = re.sub(r'"#ffffff"', '"white"', s)
    s = re.sub(r'"#00ff00"', '"green"', s)
    s = re.sub(r'"#ff0000"', '"red"', s)
    s = re.sub(r'"#c0c0c0"', '"grey"', s)
    s = re.sub(r'"#ffa500"', '"orange"', s)
    s = re.sub(r' fill="black"', '', s)
    s = re.sub(r' stroke="transparent"', ' stroke="none"', s)
    s = re.sub(r'><title>', '>\n<title>', s)
112
113
114
    # Different Pandas versions produce different CSS styles.
    s = re.sub(r'<style[ a-z]*>.*</style>',
               '<style>...</style>', s, flags=re.DOTALL)
115
116
117

    # CalledProcessError message has a final dot in Python 3.6
    s = re.sub(r"(' returned non-zero exit status \d+)\.", r'\1', s)
118
119
120
121
122
123
124
125
    return s


def consolidate_outputs(outputs):
    """consolidate outputs into a summary dict (incomplete)"""
    data = defaultdict(list)
    data['stdout'] = ''
    data['stderr'] = ''
126

127
128
129
130
131
132
    for out in outputs:
        if out.type == 'stream':
            data[out.stream] += out.text
        elif out.type == 'pyerr':
            data['pyerr'] = dict(ename=out.ename, evalue=out.evalue)
        else:
133
134
            for key in ('png', 'svg', 'latex', 'html',
                        'javascript', 'text', 'jpeg',):
135
136
137
138
139
                if key in out:
                    data[key].append(out[key])
    return data


140
141
def compare_outputs(test, ref, skip_cmp=('png', 'traceback',
                                         'latex', 'prompt_number')):
142
143
144
145
    for key in ref:
        if key not in test:
            print("missing key: %s != %s" % (test.keys(), ref.keys()))
            return False
146
147
148
149
150
151
152
153
154
155
156
157
        elif key not in skip_cmp:
            exp = sanitize(ref[key])
            eff = sanitize(test[key])
            if exp != eff:
                print("mismatch %s:" % key)
                if exp[:-1] != '\n':
                    exp += '\n'
                if eff[:-1] != '\n':
                    eff += '\n'
                print(''.join(diff(exp.splitlines(1), eff.splitlines(1),
                                   fromfile='expected', tofile='effective')))
                return False
158
159
    return True

160
def _wait_for_ready_backport(kc):
161
162
163
164
165
166
167
168
169
170
171
172
173
    """Backport BlockingKernelClient.wait_for_ready from IPython 3"""
    # Wait for kernel info reply on shell channel
    kc.kernel_info()
    while True:
        msg = kc.get_shell_msg(block=True, timeout=30)
        if msg['msg_type'] == 'kernel_info_reply':
            break
    # Flush IOPub channel
    while True:
        try:
            msg = kc.get_iopub_msg(block=True, timeout=0.2)
        except Empty:
            break
174

175
def run_cell(kc, cell):
176
    # print cell.input
177
    kc.execute(cell.input)
178
    # wait for finish, maximum 20s
179
    reply = kc.get_shell_msg(timeout=20)
180
    outs = []
181

182
183
    while True:
        try:
184
            msg = kc.get_iopub_msg(timeout=0.2)
185
186
187
        except Empty:
            break
        msg_type = msg['msg_type']
188
        if msg_type in ('status', 'pyin', 'execute_input'):
189
190
191
192
            continue
        elif msg_type == 'clear_output':
            outs = []
            continue
193

194
        content = msg['content']
195
196
197
198
199
200
        # print (msg_type, content)
        if msg_type == 'execute_result':
            msg_type = 'pyout'
        elif msg_type == 'error':
            msg_type = 'pyerr'
        out = nbformat.NotebookNode(output_type=msg_type)
201

202
203
        if msg_type == 'stream':
            out.stream = content['name']
204
205
206
207
            if 'text' in content:
                out.text = content['text']
            else:
                out.text = content['data']
208
209
210
211
212
213
214
        elif msg_type in ('display_data', 'pyout'):
            out['metadata'] = content['metadata']
            for mime, data in content['data'].items():
                attr = mime.split('/')[-1].lower()
                # this gets most right, but fix svg+html, plain
                attr = attr.replace('+xml', '').replace('plain', 'text')
                setattr(out, attr, data)
215
            if 'execution_count' in content:
216
217
218
219
220
                out.prompt_number = content['execution_count']
        elif msg_type == 'pyerr':
            out.ename = content['ename']
            out.evalue = content['evalue']
            out.traceback = content['traceback']
221
222
223
224

            # sys.exit(77) is used to Skip the test.
            if out.ename == 'SystemExit' and out.evalue == '77':
                sys.exit(77)
225
226
        else:
            print("unhandled iopub msg:", msg_type)
227

228
229
        outs.append(out)
    return outs
230

231
232
233

def test_notebook(nb):
    km = KernelManager()
234
235
    # Do not save the history to disk, as it can yield spurious lock errors.
    # See https://github.com/ipython/ipython/issues/2845
236
    km.start_kernel(extra_arguments=['--HistoryManager.hist_file=:memory:'])
237
238
239
240

    kc = km.client()
    kc.start_channels()

241
    try:
242
        kc.wait_for_ready()
243
    except AttributeError:
244
        _wait_for_ready_backport(kc)
245

246
247
248
249
250
    successes = 0
    failures = 0
    errors = 0
    for ws in nb.worksheets:
        for i, cell in enumerate(ws.cells):
251
            if cell.cell_type != 'code' or cell.input.startswith('%timeit'):
252
253
                continue
            try:
254
                outs = run_cell(kc, cell)
255
256
257
258
259
            except Exception as e:
                print("failed to run cell:", repr(e))
                print(cell.input)
                errors += 1
                continue
260

261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
            failed = False
            if len(outs) != len(cell.outputs):
                print("output length mismatch (expected {}, got {})".format(
                      len(cell.outputs), len(outs)))
                failed = True
            for out, ref in zip(outs, cell.outputs):
                if not compare_outputs(out, ref):
                    failed = True
            print("cell %d: " % i, end="")
            if failed:
                print("FAIL")
                failures += 1
            else:
                print("OK")
                successes += 1

    print()
    print("tested notebook %s" % nb.metadata.name)
    print("    %3i cells successfully replicated" % successes)
    if failures:
        print("    %3i cells mismatched output" % failures)
    if errors:
        print("    %3i cells failed to complete" % errors)
    kc.stop_channels()
    km.shutdown_kernel()
    del km
    if failures | errors:
        sys.exit(1)

if __name__ == '__main__':
    for ipynb in sys.argv[1:]:
        print("testing %s" % ipynb)
        with open(ipynb) as f:
294
            nb = nbformat.reads_json(f.read())
295
        test_notebook(nb)