lin
2025-07-30 fcd736bf35fd93b563e9bbf594f2aa7b62028cc9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
## This file is part of Scapy
## See http://www.secdev.org/projects/scapy for more informations
## Copyright (C) Philippe Biondi <phil@secdev.org>
## This program is published under a GPLv2 license
 
"""
VoIP (Voice over IP) related functions
"""
 
from __future__ import absolute_import
import os
###################
##  Listen VoIP  ##
###################
 
from scapy.sendrecv import sniff
from scapy.layers.inet import IP,UDP
from scapy.layers.rtp import RTP
from scapy.consts import WINDOWS
from scapy.config import conf
from scapy.modules.six.moves import range
 
 
sox_base = "sox -t .ul %s - -t ossdsp /dev/dsp"
 
if WINDOWS:
    if conf.prog.sox is None:
        raise OSError("Sox must be installed to play VoIP packets")
    sox_base = "\"" + conf.prog.sox + "\" -t .ul %s - -t waveaudio"
 
def _merge_sound_bytes(x,y,sample_size=2):
    # TODO: find a better way to merge sound bytes
    # This will only add them one next to each other:
    # \xff + \xff ==> \xff\xff
    m = ""
    ss=sample_size
    min_ = 0
    if len(x) >= len(y):
        min_ = y
    elif len(x) < len(y):
        min_ = x
    r_ = len(min_)
    for i in range(r_/ss):
        m += x[ss*i:ss*(i+1)]+y[ss*i:ss*(i+1)]
    return x[r_:], y[r_:], m
 
 
def voip_play(s1, lst=None, **kargs):
    """Play VoIP packets with RAW data that
    are either sniffed either from an IP, or
    specified as a list.
 
    It will play only the incoming packets !
    
    :param s1: The IP of the src of all VoIP packets.
    :param lst: (optional) A list of packets to load
    :type s1: string
    :type lst: list
 
    :Example:
 
    >>> voip_play("64.2.142.189")
    while calling '411@ideasip.com'
 
    >>> voip_play("64.2.142.189", lst)
    with list a list of packets with VoIP data
    in their RAW layer
 
    .. seealso:: voip_play2
    to play both the outcoming and incoming packets
    at the same time.
 
    .. seealso:: voip_play3
    to read RTP VoIP packets
    """
    
    dsp, rd = os.popen2(sox_base % "")
    def play(pkt):
        if not pkt:
            return 
        if not pkt.haslayer(UDP) or not pkt.haslayer(IP):
            return 
        ip=pkt.getlayer(IP)
        if s1 == ip.src:
            dsp.write(pkt.getlayer(conf.raw_layer).load[12:])
    try:
        if lst is None:
            sniff(store=0, prn=play, **kargs)
        else:
            for p in lst:
                play(p)
    finally:
        dsp.close()
        rd.close()
 
def voip_play1(s1, lst=None, **kargs):
    """Same than voip_play, backward compatibility
    """
    return voip_play(s1, lst, **kargs)
 
def voip_play2(s1,**kargs):
    """
    Same than voip_play, but will play
    both incoming and outcoming packets.
    The sound will surely suffer distortion.
 
    Only supports sniffing.
 
    .. seealso:: voip_play
    to play only incoming packets.
    """
    dsp,rd = os.popen2(sox_base % "-c 2")
    global x1, x2
    x1 = ""
    x2 = ""
    def play(pkt):
        global x1, x2
        if not pkt:
            return 
        if not pkt.haslayer(UDP) or not pkt.haslayer(IP):
            return 
        ip=pkt.getlayer(IP)
        if s1 in [ip.src, ip.dst]:
            if ip.dst == s1:
                x1 += pkt.getlayer(conf.raw_layer).load[12:]
            else:
                x2 += pkt.getlayer(conf.raw_layer).load[12:]
            x1, x2, r = _merge_sound_bytes(x1, x2)
            dsp.write(r)
            
    sniff(store=0, prn=play, **kargs)
 
def voip_play3(lst=None,**kargs):
    """Same than voip_play, but made to
    read and play VoIP RTP packets, without
    checking IP.
    
    .. seealso:: voip_play
    for basic VoIP packets
    """
    dsp,rd = os.popen2(sox_base % "")
    def play(pkt, dsp=dsp):
        if pkt and pkt.haslayer(UDP) and pkt.haslayer(RTP):
            dsp.write(pkt.getlayer(RTP).load)
    try:
        if lst is None:
            sniff(store=0, prn=play, **kargs)
        else:
            for p in lst:
                play(p)
    finally:
        try:
            dsp.close()
            rd.close()
        except:
            pass