lin
2025-08-14 dae8bad597b6607a449b32bf76c523423f7720ed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (C) 2018 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A BPF compiler for the Minijail policy file."""
 
from __future__ import print_function
 
import enum
 
import bpf
import parser  # pylint: disable=wrong-import-order
 
 
class OptimizationStrategy(enum.Enum):
    """The available optimization strategies."""
 
    # Generate a linear chain of syscall number checks. Works best for policies
    # with very few syscalls.
    LINEAR = 'linear'
 
    # Generate a binary search tree for the syscalls. Works best for policies
    # with a lot of syscalls, where no one syscall dominates.
    BST = 'bst'
 
    def __str__(self):
        return self.value
 
 
class SyscallPolicyEntry:
    """The parsed version of a seccomp policy line."""
 
    def __init__(self, name, number, frequency):
        self.name = name
        self.number = number
        self.frequency = frequency
        self.accumulated = 0
        self.filter = None
 
    def __repr__(self):
        return ('SyscallPolicyEntry<name: %s, number: %d, '
                'frequency: %d, filter: %r>') % (
                    self.name, self.number, self.frequency,
                    self.filter.instructions if self.filter else None)
 
    def simulate(self, arch, syscall_number, *args):
        """Simulate the policy with the given arguments."""
        if not self.filter:
            return (0, 'ALLOW')
        return bpf.simulate(self.filter.instructions, arch, syscall_number,
                            *args)
 
 
class SyscallPolicyRange:
    """A contiguous range of SyscallPolicyEntries that have the same action."""
 
    def __init__(self, *entries):
        self.numbers = (entries[0].number, entries[-1].number + 1)
        self.frequency = sum(e.frequency for e in entries)
        self.accumulated = 0
        self.filter = entries[0].filter
 
    def __repr__(self):
        return 'SyscallPolicyRange<numbers: %r, frequency: %d, filter: %r>' % (
            self.numbers, self.frequency,
            self.filter.instructions if self.filter else None)
 
    def simulate(self, arch, syscall_number, *args):
        """Simulate the policy with the given arguments."""
        if not self.filter:
            return (0, 'ALLOW')
        return self.filter.simulate(arch, syscall_number, *args)
 
 
def _convert_to_ranges(entries):
    entries = list(sorted(entries, key=lambda r: r.number))
    lower = 0
    while lower < len(entries):
        upper = lower + 1
        while upper < len(entries):
            if entries[upper - 1].filter != entries[upper].filter:
                break
            if entries[upper - 1].number + 1 != entries[upper].number:
                break
            upper += 1
        yield SyscallPolicyRange(*entries[lower:upper])
        lower = upper
 
 
def _compile_single_range(entry,
                          accept_action,
                          reject_action,
                          lower_bound=0,
                          upper_bound=1e99):
    action = accept_action
    if entry.filter:
        action = entry.filter
    if entry.numbers[1] - entry.numbers[0] == 1:
        # Single syscall.
        # Accept if |X == nr|.
        return (1,
                bpf.SyscallEntry(
                    entry.numbers[0], action, reject_action, op=bpf.BPF_JEQ))
    elif entry.numbers[0] == lower_bound:
        # Syscall range aligned with the lower bound.
        # Accept if |X < nr[1]|.
        return (1,
                bpf.SyscallEntry(
                    entry.numbers[1], reject_action, action, op=bpf.BPF_JGE))
    elif entry.numbers[1] == upper_bound:
        # Syscall range aligned with the upper bound.
        # Accept if |X >= nr[0]|.
        return (1,
                bpf.SyscallEntry(
                    entry.numbers[0], action, reject_action, op=bpf.BPF_JGE))
    # Syscall range in the middle.
    # Accept if |nr[0] <= X < nr[1]|.
    upper_entry = bpf.SyscallEntry(
        entry.numbers[1], reject_action, action, op=bpf.BPF_JGE)
    return (2,
            bpf.SyscallEntry(
                entry.numbers[0], upper_entry, reject_action, op=bpf.BPF_JGE))
 
 
def _compile_ranges_linear(ranges, accept_action, reject_action):
    # Compiles the list of ranges into a simple linear list of comparisons. In
    # order to make the generated code a bit more efficient, we sort the
    # ranges by frequency, so that the most frequently-called syscalls appear
    # earlier in the chain.
    cost = 0
    accumulated_frequencies = 0
    next_action = reject_action
    for entry in sorted(ranges, key=lambda r: r.frequency):
        current_cost, next_action = _compile_single_range(
            entry, accept_action, next_action)
        accumulated_frequencies += entry.frequency
        cost += accumulated_frequencies * current_cost
    return (cost, next_action)
 
 
def _compile_entries_linear(entries, accept_action, reject_action):
    return _compile_ranges_linear(
        _convert_to_ranges(entries), accept_action, reject_action)[1]
 
 
def _compile_entries_bst(entries, accept_action, reject_action):
    # Instead of generating a linear list of comparisons, this method generates
    # a binary search tree, where some of the leaves can be linear chains of
    # comparisons.
    #
    # Even though we are going to perform a binary search over the syscall
    # number, we would still like to rotate some of the internal nodes of the
    # binary search tree so that more frequently-used syscalls can be accessed
    # more cheaply (i.e. fewer internal nodes need to be traversed to reach
    # them).
    #
    # This uses Dynamic Programming to generate all possible BSTs efficiently
    # (in O(n^3)) so that we can get the absolute minimum-cost tree that matches
    # all syscall entries. It does so by considering all of the O(n^2) possible
    # sub-intervals, and for each one of those try all of the O(n) partitions of
    # that sub-interval. At each step, it considers putting the remaining
    # entries in a linear comparison chain as well as another BST, and chooses
    # the option that minimizes the total overall cost.
    #
    # Between every pair of non-contiguous allowed syscalls, there are two
    # locally optimal options as to where to set the partition for the
    # subsequent ranges: aligned to the end of the left subrange or to the
    # beginning of the right subrange. The fact that these two options have
    # slightly different costs, combined with the possibility of a subtree to
    # use the linear chain strategy (which has a completely different cost
    # model), causes the target cost function that we are trying to optimize to
    # not be unimodal / convex. This unfortunately means that more clever
    # techniques like using ternary search (which would reduce the overall
    # complexity to O(n^2 log n)) do not work in all cases.
    ranges = list(_convert_to_ranges(entries))
 
    accumulated = 0
    for entry in ranges:
        accumulated += entry.frequency
        entry.accumulated = accumulated
 
    # Memoization cache to build the DP table top-down, which is easier to
    # understand.
    memoized_costs = {}
 
    def _generate_syscall_bst(ranges, indices, bounds=(0, 2**64 - 1)):
        assert bounds[0] <= ranges[indices[0]].numbers[0], (indices, bounds)
        assert ranges[indices[1] - 1].numbers[1] <= bounds[1], (indices,
                                                                bounds)
 
        if bounds in memoized_costs:
            return memoized_costs[bounds]
        if indices[1] - indices[0] == 1:
            if bounds == ranges[indices[0]].numbers:
                # If bounds are tight around the syscall, it costs nothing.
                memoized_costs[bounds] = (0, ranges[indices[0]].filter
                                          or accept_action)
                return memoized_costs[bounds]
            result = _compile_single_range(ranges[indices[0]], accept_action,
                                           reject_action)
            memoized_costs[bounds] = (result[0] * ranges[indices[0]].frequency,
                                      result[1])
            return memoized_costs[bounds]
 
        # Try the linear model first and use that as the best estimate so far.
        best_cost = _compile_ranges_linear(ranges[slice(*indices)],
                                           accept_action, reject_action)
 
        # Now recursively go through all possible partitions of the interval
        # currently being considered.
        previous_accumulated = ranges[indices[0]].accumulated - ranges[
            indices[0]].frequency
        bst_comparison_cost = (
            ranges[indices[1] - 1].accumulated - previous_accumulated)
        for i, entry in enumerate(ranges[slice(*indices)]):
            candidates = [entry.numbers[0]]
            if i:
                candidates.append(ranges[i - 1 + indices[0]].numbers[1])
            for cutoff_bound in candidates:
                if not bounds[0] < cutoff_bound < bounds[1]:
                    continue
                if not indices[0] < i + indices[0] < indices[1]:
                    continue
                left_subtree = _generate_syscall_bst(
                    ranges, (indices[0], i + indices[0]),
                    (bounds[0], cutoff_bound))
                right_subtree = _generate_syscall_bst(
                    ranges, (i + indices[0], indices[1]),
                    (cutoff_bound, bounds[1]))
                best_cost = min(
                    best_cost,
                    (bst_comparison_cost + left_subtree[0] + right_subtree[0],
                     bpf.SyscallEntry(
                         cutoff_bound,
                         right_subtree[1],
                         left_subtree[1],
                         op=bpf.BPF_JGE)))
 
        memoized_costs[bounds] = best_cost
        return memoized_costs[bounds]
 
    return _generate_syscall_bst(ranges, (0, len(ranges)))[1]
 
 
class PolicyCompiler:
    """A parser for the Minijail seccomp policy file format."""
 
    def __init__(self, arch):
        self._arch = arch
 
    def compile_file(self,
                     policy_filename,
                     *,
                     optimization_strategy,
                     kill_action,
                     include_depth_limit=10,
                     override_default_action=None):
        """Return a compiled BPF program from the provided policy file."""
        policy_parser = parser.PolicyParser(
            self._arch,
            kill_action=kill_action,
            include_depth_limit=include_depth_limit,
            override_default_action=override_default_action)
        parsed_policy = policy_parser.parse_file(policy_filename)
        entries = [
            self.compile_filter_statement(
                filter_statement, kill_action=kill_action)
            for filter_statement in parsed_policy.filter_statements
        ]
 
        visitor = bpf.FlatteningVisitor(
            arch=self._arch, kill_action=kill_action)
        accept_action = bpf.Allow()
        reject_action = parsed_policy.default_action
        if entries:
            if optimization_strategy == OptimizationStrategy.BST:
                next_action = _compile_entries_bst(entries, accept_action,
                                                   reject_action)
            else:
                next_action = _compile_entries_linear(entries, accept_action,
                                                      reject_action)
            next_action.accept(bpf.ArgFilterForwardingVisitor(visitor))
            reject_action.accept(visitor)
            accept_action.accept(visitor)
            bpf.ValidateArch(next_action).accept(visitor)
        else:
            reject_action.accept(visitor)
            bpf.ValidateArch(reject_action).accept(visitor)
        return visitor.result
 
    def compile_filter_statement(self, filter_statement, *, kill_action):
        """Compile one parser.FilterStatement into BPF."""
        policy_entry = SyscallPolicyEntry(filter_statement.syscall.name,
                                          filter_statement.syscall.number,
                                          filter_statement.frequency)
        # In each step of the way, the false action is the one that is taken if
        # the immediate boolean condition does not match. This means that the
        # false action taken here is the one that applies if the whole
        # expression fails to match.
        false_action = filter_statement.filters[-1].action
        if false_action == bpf.Allow():
            return policy_entry
        # We then traverse the list of filters backwards since we want
        # the root of the DAG to be the very first boolean operation in
        # the filter chain.
        for filt in filter_statement.filters[:-1][::-1]:
            for disjunction in filt.expression:
                # This is the jump target of the very last comparison in the
                # conjunction. Given that any conjunction that succeeds should
                # make the whole expression succeed, make the very last
                # comparison jump to the accept action if it succeeds.
                true_action = filt.action
                for atom in disjunction:
                    block = bpf.Atom(atom.argument_index, atom.op, atom.value,
                                     true_action, false_action)
                    true_action = block
                false_action = true_action
        policy_filter = false_action
 
        # Lower all Atoms into WideAtoms.
        lowering_visitor = bpf.LoweringVisitor(arch=self._arch)
        policy_filter = lowering_visitor.process(policy_filter)
 
        # Flatten the IR DAG into a single BasicBlock.
        flattening_visitor = bpf.FlatteningVisitor(
            arch=self._arch, kill_action=kill_action)
        policy_filter.accept(flattening_visitor)
        policy_entry.filter = flattening_visitor.result
        return policy_entry