huangcm
2025-02-24 69ed55dec4b2116a19e4cca4393cbc014fce5fb2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#!/usr/bin/python -u
import string, sys, time
try:
    from _thread import get_ident
except:
    from thread import get_ident
from threading import Thread, Lock
 
import libxml2
 
THREADS_COUNT = 15
 
failed = 0
 
class ErrorHandler:
 
    def __init__(self):
        self.errors = []
        self.lock = Lock()
 
    def handler(self,ctx,str):
        self.lock.acquire()
        self.errors.append(str)
        self.lock.release()
 
def getLineNumbersDefault():
    old = libxml2.lineNumbersDefault(0)
    libxml2.lineNumbersDefault(old)
    return old
 
def test(expectedLineNumbersDefault):
    time.sleep(1)
    global failed
    # check a per thread-global
    if expectedLineNumbersDefault != getLineNumbersDefault():
        failed = 1
        print("FAILED to obtain correct value for " \
              "lineNumbersDefault in thread %d" % get_ident())
    # check ther global error handler 
    # (which is NOT per-thread in the python bindings)
    try:
        doc = libxml2.parseFile("bad.xml")
    except:
        pass
    else:
        assert "failed"
 
# global error handler
eh = ErrorHandler()
libxml2.registerErrorHandler(eh.handler,"")
 
# set on the main thread only
libxml2.lineNumbersDefault(1) 
test(1)
ec = len(eh.errors)
if ec == 0:
    print("FAILED: should have obtained errors")
    sys.exit(1)
 
ts = []
for i in range(THREADS_COUNT):
    # expect 0 for lineNumbersDefault because
    # the new value has been set on the main thread only
    ts.append(Thread(target=test,args=(0,)))
for t in ts:
    t.start()
for t in ts:
    t.join()
 
if len(eh.errors) != ec+THREADS_COUNT*ec:
    print("FAILED: did not obtain the correct number of errors")
    sys.exit(1)
 
# set lineNumbersDefault for future new threads
libxml2.thrDefLineNumbersDefaultValue(1)
ts = []
for i in range(THREADS_COUNT):
    # expect 1 for lineNumbersDefault
    ts.append(Thread(target=test,args=(1,)))
for t in ts:
    t.start()
for t in ts:
    t.join()
 
if len(eh.errors) != ec+THREADS_COUNT*ec*2:
    print("FAILED: did not obtain the correct number of errors")
    sys.exit(1)
 
if failed:
    print("FAILED")
    sys.exit(1)
 
# Memory debug specific
libxml2.cleanupParser()
if libxml2.debugMemory(1) == 0:
    print("OK")
else:
    print("Memory leak %d bytes" % (libxml2.debugMemory(1)))
    libxml2.dumpMemory()