Skip to content

File preloader

The tool to check the availability or syntax of domain, IP or URL.

::

██████╗ ██╗   ██╗███████╗██╗   ██╗███╗   ██╗ ██████╗███████╗██████╗ ██╗     ███████╗
██╔══██╗╚██╗ ██╔╝██╔════╝██║   ██║████╗  ██║██╔════╝██╔════╝██╔══██╗██║     ██╔════╝
██████╔╝ ╚████╔╝ █████╗  ██║   ██║██╔██╗ ██║██║     █████╗  ██████╔╝██║     █████╗
██╔═══╝   ╚██╔╝  ██╔══╝  ██║   ██║██║╚██╗██║██║     ██╔══╝  ██╔══██╗██║     ██╔══╝
██║        ██║   ██║     ╚██████╔╝██║ ╚████║╚██████╗███████╗██████╔╝███████╗███████╗
╚═╝        ╚═╝   ╚═╝      ╚═════╝ ╚═╝  ╚═══╝ ╚═════╝╚══════╝╚═════╝ ╚══════╝╚══════╝

Provides the interface for the preloading of a given file.

Author: Nissar Chababy, @funilrys, contactTATAfunilrysTODTODcom

Special thanks: https://pyfunceble.github.io/#/special-thanks

Contributors: https://pyfunceble.github.io/#/contributors

Project link: https://github.com/funilrys/PyFunceble

Project documentation: https://docs.pyfunceble.com

Project homepage: https://pyfunceble.github.io/

License: ::

Copyright 2017, 2018, 2019, 2020, 2022, 2023, 2024 Nissar Chababy

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

FilePreloader

Provides an interface for the file preloading. The main idea behind this interface is to read all lines of the given file parse each lines into something our autocontinue dataset understand.

Once everything preloaded in the autocontinue dataset, one can use the :meth:PyFunceble.dataset.autocontinue.base.ContinueDataset.get_to_test to get the next subject to test.

By doing this, we don't have to re-read a file completely once we are sure that the hash of the file didn't changed.

Parameters:

Name Type Description Default
authorized Optional[bool]

The authorization to launch. If class:None is given, we will try to guess the best value.

None
protocol Optional[dict]

The protocol describing the file to test.

None
Source code in PyFunceble/cli/file_preloader.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
class FilePreloader:
    """
    Provides an interface for the file preloading.
    The main idea behind this interface is to read all lines of the given file
    parse each lines into something our autocontinue dataset understand.

    Once everything preloaded in the autocontinue dataset, one can use the
    :meth:`PyFunceble.dataset.autocontinue.base.ContinueDataset.get_to_test`
    to get the next subject to test.

    By doing this, we don't have to re-read a file completely once we are sure
    that the hash of the file didn't changed.

    :param authorized:
        The authorization to launch.
        If :py:class:`None` is given, we will try to guess the best value.

    :param protocol:
        The protocol describing the file to test.
    """

    STD_AUTHORIZED: bool = False

    _authorized: Optional[bool] = False
    _protocol: Optional[dict] = None

    __description: list = []
    __description_file: Optional[str] = None
    __matching_index: int = 0

    continuous_integration: Optional[ContinuousIntegrationBase] = None

    continue_dataset: Optional[ContinueDatasetBase] = None

    checker_type: Optional[str] = None

    subject2complements: Optional[Subject2Complements] = None
    inputline2subject: Optional[InputLine2Subject] = None
    adblock_inputline2subject: Optional[AdblockInputLine2Subject] = None
    wildcard2subject: Optional[Wildcard2Subject] = None
    rpz_policy2subject: Optional[RPZPolicy2Subject] = None
    rpz_inputline2subject: Optional[RPZInputLine2Subject] = None
    url2netloc: Optional[Url2Netloc] = None
    cidr2subject: Optional[CIDR2Subject] = None

    def __init__(
        self,
        *,
        authorized: Optional[bool] = None,
        protocol: Optional[dict] = None,
        continuous_integration: Optional[ContinuousIntegrationBase] = None,
        checker_type: Optional[str] = None,
        adblock_inputline2subject: Optional[AdblockInputLine2Subject] = None,
        wildcard2subject: Optional[Wildcard2Subject] = None,
        rpz_policy2subject: Optional[RPZPolicy2Subject] = None,
        rpz_inputline2subject: Optional[RPZInputLine2Subject] = None,
        inputline2subject: Optional[InputLine2Subject] = None,
        subject2complements: Optional[Subject2Complements] = None,
        url2netloc: Optional[Url2Netloc] = None,
        continue_dataset: Optional[ContinueDatasetBase] = None,
        inactive_dataset: Optional[InactiveDatasetBase] = None,
        cidr2subject: Optional[CIDR2Subject] = None,
    ) -> None:
        if authorized is not None:
            self.authorized = authorized
        else:
            self.guess_and_set_authorized()

        if protocol is not None:
            self.file_path = protocol

        self.continuous_integration = continuous_integration

        self.checker_type = checker_type
        self.adblock_inputline2subject = adblock_inputline2subject
        self.wildcard2subject = wildcard2subject
        self.rpz_policy2subject = rpz_policy2subject
        self.rpz_inputline2subject = rpz_inputline2subject
        self.inputline2subject = inputline2subject
        self.subject2complements = subject2complements
        self.url2netloc = url2netloc
        self.continue_dataset = continue_dataset
        self.inactive_dataset = inactive_dataset
        self.cidr2subject = cidr2subject

    def execute_if_authorized(default: Any = None):  # pylint: disable=no-self-argument
        """
        Executes the decorated method only if we are authorized to process.
        Otherwise, apply the given :code:`default`.

        .. warning::
            If :py:class:`None` is given as default value, this method will
            return the :code:`self` object.
        """

        def inner_method(func):
            @functools.wraps(func)
            def wrapper(self, *args, **kwargs):
                if self.authorized:
                    return func(self, *args, **kwargs)  # pylint: disable=not-callable
                return self if default is None else default

            return wrapper

        return inner_method

    def ensure_protocol_is_given(func):  # pylint: disable=no-self-argument
        """
        Ensures that the protocol is given before launching the decorated method.

        :raise RuntimeError:
            When the protocol is not declared yet.
        """

        @functools.wraps(func)
        def wrapper(self, *args, **kwargs):
            if self.protocol is None:
                raise RuntimeError("<self.protocol> is not given.")

            return func(self, *args, **kwargs)  # pylint: disable=not-callable

        return wrapper

    @property
    def authorized(self) -> Optional[bool]:
        """
        Provides the current state of the :code:`_authorized` attribute.
        """

        return self._authorized

    @authorized.setter
    def authorized(self, value: bool) -> None:
        """
        Sets the value of the :code:`_authorized` attribute.

        :param value:
            The value to set.

        :raise TypeError:
            When the given :code:`value` is not a :py:class:`bool`.
        """

        if not isinstance(value, bool):
            raise TypeError(f"<value> should be {bool}, {type(value)} given.")

        self._authorized = value

    def set_authorized(self, value: bool) -> "FilePreloader":
        """
        Sets the value of the :code:`_authorized` attribute.

        :param value:
            The value to set.
        """

        self.authorized = value

        return self

    @property
    def protocol(self) -> Optional[dict]:
        """
        Provides the current state of the :code:`_file_path` attribute.
        """

        return self._protocol

    @protocol.setter
    def protocol(self, value: dict) -> None:
        """
        Sets the value of the :code:`_protocol` attribute.

        :param value:
            The value to set.

        :raise TypeError:
            When the given :code:`value` is not a :py:class:`dict`.
        :raise ValueError:
            When the given :code:`value` is empty.
        """

        if not isinstance(value, dict):
            raise TypeError(f"<value> should be {str}, {type(value)} given.")

        self._protocol = value
        self.__description_file = os.path.join(
            self._protocol["output_dir"], PyFunceble.cli.storage.PRE_LOADER_FILE
        )

    def set_protocol(self, value: dict) -> "FilePreloader":
        """
        Sets the value of the :code:`_protocol` attribute.

        :param value:
            The value to set.
        """

        self.protocol = value

        return self

    def guess_and_set_authorized(self) -> "FilePreloader":
        """
        Try to guess and set the value of the :code:`_authorized` attribute.
        """

        if PyFunceble.facility.ConfigLoader.is_already_loaded():
            if bool(PyFunceble.storage.CONFIGURATION.cli_testing.autocontinue) and bool(
                PyFunceble.storage.CONFIGURATION.cli_testing.preload_file
            ):
                self.authorized = True
            else:
                self.authorized = self.STD_AUTHORIZED
        else:
            self.authorized = self.STD_AUTHORIZED

        return self

    @execute_if_authorized(False)
    @ensure_protocol_is_given
    def does_preloader_description_file_exists(self) -> bool:
        """
        Checks if our preloader file exists.
        """

        return FileHelper(self.__description_file).exists()

    def __load_description(self) -> "FilePreloader":
        """
        Loads the descriptoin into the interface.
        """

        def set_new_dataset(*, append: bool = False, new_index: int = 0) -> None:
            """
            Sets the default dataset into the given index.

            :param new_index:
                The index to write into.
            :param append:
                Append instead.
            """

            new_dataset = copy.deepcopy(self.protocol)
            new_dataset["previous_hash"] = None
            new_dataset["hash"] = None
            new_dataset["line_number"] = 1

            if append:
                self.__description.append(new_dataset)
            elif self.__description:
                self.__description[new_index] = new_dataset
            else:
                self.__description = [new_dataset]

        if self.does_preloader_description_file_exists():
            dataset = DictHelper().from_json_file(self.__description_file)

            if not isinstance(dataset, list):
                dataset = [dataset]

            found = False

            for index, descr in enumerate(dataset):
                if all(x in descr and descr[x] == y for x, y in self.protocol.items()):
                    self.__matching_index = index
                    found = True
                    break

            self.__description = dataset

            if not found:
                set_new_dataset(append=True)
                self.__matching_index = len(self.__description) - 1
        else:
            set_new_dataset()

        return self

    def __save_description(self) -> "FilePreloader":
        """
        Saves the description at its destination.
        """

        self.__description[self.__matching_index].update(self.protocol)

        DictHelper(self.__description).to_json_file(self.__description_file)

    @execute_if_authorized(None)
    @ensure_protocol_is_given
    def start(self, print_dots: bool = False) -> "FilePreloader":
        """
        Starts the pre-loading of the currently set file path.
        """

        self.__load_description()

        broken = False
        file_helper = FileHelper(self.protocol["subject"])
        self.__description[self.__matching_index]["hash"] = HashHelper().hash_file(
            file_helper.path
        )

        if isinstance(self.continue_dataset, CSVContinueDataset):
            self.continue_dataset.set_base_directory(self.protocol["output_dir"])

        if (
            self.__description[self.__matching_index]["checker_type"]
            != self.protocol["checker_type"]
            or self.__description[self.__matching_index]["subject_type"]
            != self.protocol["subject_type"]
        ):
            try:
                self.continue_dataset.cleanup()
            except TypeError:
                self.continue_dataset.cleanup(session_id=self.protocol["session_id"])

        if (
            self.__description[self.__matching_index]["previous_hash"]
            and self.__description[self.__matching_index]["hash"]
            != self.__description[self.__matching_index]["previous_hash"]
        ):
            # Forces the reading of each lines because there is literally no
            # way to know where something has been changed.
            self.__description[self.__matching_index]["line_number"] = 1

        if (
            self.__description[self.__matching_index]["checker_type"]
            != self.protocol["checker_type"]
            or self.__description[self.__matching_index]["subject_type"]
            != self.protocol["subject_type"]
            or self.__description[self.__matching_index]["hash"]
            != self.__description[self.__matching_index]["previous_hash"]
        ):
            try:
                with file_helper.open("r", encoding="utf-8") as file_stream:
                    line_num = 1

                    for line in file_stream:
                        if (
                            line_num
                            < self.__description[self.__matching_index]["line_number"]
                        ):
                            line_num += 1
                            continue

                        if (
                            self.continuous_integration
                            and self.continuous_integration.is_time_exceeded()
                        ):
                            broken = True
                            break

                        line = line.strip()

                        if self.rpz_policy2subject and "SOA" in line:
                            self.rpz_policy2subject.set_soa(line.split()[0])

                        for subject in get_subjects_from_line(
                            line,
                            self.checker_type,
                            subject_type=self.protocol["subject_type"],
                            adblock_inputline2subject=self.adblock_inputline2subject,
                            wildcard2subject=self.wildcard2subject,
                            rpz_policy2subject=self.rpz_policy2subject,
                            rpz_inputline2subject=self.rpz_inputline2subject,
                            inputline2subject=self.inputline2subject,
                            subject2complements=self.subject2complements,
                            url2netloc=self.url2netloc,
                            cidr2subject=self.cidr2subject,
                        ):
                            to_send = copy.deepcopy(self.protocol)
                            to_send["subject"] = subject
                            to_send["idna_subject"] = domain2idna(subject)
                            to_send["tested_at"] = datetime.now(
                                timezone.utc
                            ) - timedelta(days=365.25 * 20)

                            if self.inactive_dataset.exists(to_send):
                                print_single_line("I")
                                continue

                            if TesterWorker.should_be_ignored(
                                subject=to_send["idna_subject"]
                            ):
                                print_single_line("X")
                                continue

                            self.continue_dataset.update(to_send, ignore_if_exist=True)

                            if print_dots:
                                print_single_line()

                        self.__description[self.__matching_index]["line_number"] += 1
                        line_num += 1
            except KeyboardInterrupt as exception:
                self.__save_description()
                raise exception

        if not broken:
            self.__description[self.__matching_index]["previous_hash"] = (
                self.__description[self.__matching_index]["hash"]
            )

        self.__save_description()

        return self

authorized: Optional[bool] property writable

Provides the current state of the :code:_authorized attribute.

protocol: Optional[dict] property writable

Provides the current state of the :code:_file_path attribute.

__load_description()

Loads the descriptoin into the interface.

Source code in PyFunceble/cli/file_preloader.py
def __load_description(self) -> "FilePreloader":
    """
    Loads the descriptoin into the interface.
    """

    def set_new_dataset(*, append: bool = False, new_index: int = 0) -> None:
        """
        Sets the default dataset into the given index.

        :param new_index:
            The index to write into.
        :param append:
            Append instead.
        """

        new_dataset = copy.deepcopy(self.protocol)
        new_dataset["previous_hash"] = None
        new_dataset["hash"] = None
        new_dataset["line_number"] = 1

        if append:
            self.__description.append(new_dataset)
        elif self.__description:
            self.__description[new_index] = new_dataset
        else:
            self.__description = [new_dataset]

    if self.does_preloader_description_file_exists():
        dataset = DictHelper().from_json_file(self.__description_file)

        if not isinstance(dataset, list):
            dataset = [dataset]

        found = False

        for index, descr in enumerate(dataset):
            if all(x in descr and descr[x] == y for x, y in self.protocol.items()):
                self.__matching_index = index
                found = True
                break

        self.__description = dataset

        if not found:
            set_new_dataset(append=True)
            self.__matching_index = len(self.__description) - 1
    else:
        set_new_dataset()

    return self

__save_description()

Saves the description at its destination.

Source code in PyFunceble/cli/file_preloader.py
def __save_description(self) -> "FilePreloader":
    """
    Saves the description at its destination.
    """

    self.__description[self.__matching_index].update(self.protocol)

    DictHelper(self.__description).to_json_file(self.__description_file)

does_preloader_description_file_exists()

Checks if our preloader file exists.

Source code in PyFunceble/cli/file_preloader.py
@execute_if_authorized(False)
@ensure_protocol_is_given
def does_preloader_description_file_exists(self) -> bool:
    """
    Checks if our preloader file exists.
    """

    return FileHelper(self.__description_file).exists()

ensure_protocol_is_given(func)

Ensures that the protocol is given before launching the decorated method.

Raises:

Type Description
RuntimeError

When the protocol is not declared yet.

Source code in PyFunceble/cli/file_preloader.py
def ensure_protocol_is_given(func):  # pylint: disable=no-self-argument
    """
    Ensures that the protocol is given before launching the decorated method.

    :raise RuntimeError:
        When the protocol is not declared yet.
    """

    @functools.wraps(func)
    def wrapper(self, *args, **kwargs):
        if self.protocol is None:
            raise RuntimeError("<self.protocol> is not given.")

        return func(self, *args, **kwargs)  # pylint: disable=not-callable

    return wrapper

execute_if_authorized(default=None)

Executes the decorated method only if we are authorized to process. Otherwise, apply the given :code:default.

.. warning:: If class:None is given as default value, this method will return the :code:self object.

Source code in PyFunceble/cli/file_preloader.py
def execute_if_authorized(default: Any = None):  # pylint: disable=no-self-argument
    """
    Executes the decorated method only if we are authorized to process.
    Otherwise, apply the given :code:`default`.

    .. warning::
        If :py:class:`None` is given as default value, this method will
        return the :code:`self` object.
    """

    def inner_method(func):
        @functools.wraps(func)
        def wrapper(self, *args, **kwargs):
            if self.authorized:
                return func(self, *args, **kwargs)  # pylint: disable=not-callable
            return self if default is None else default

        return wrapper

    return inner_method

guess_and_set_authorized()

Try to guess and set the value of the :code:_authorized attribute.

Source code in PyFunceble/cli/file_preloader.py
def guess_and_set_authorized(self) -> "FilePreloader":
    """
    Try to guess and set the value of the :code:`_authorized` attribute.
    """

    if PyFunceble.facility.ConfigLoader.is_already_loaded():
        if bool(PyFunceble.storage.CONFIGURATION.cli_testing.autocontinue) and bool(
            PyFunceble.storage.CONFIGURATION.cli_testing.preload_file
        ):
            self.authorized = True
        else:
            self.authorized = self.STD_AUTHORIZED
    else:
        self.authorized = self.STD_AUTHORIZED

    return self

set_authorized(value)

Sets the value of the :code:_authorized attribute.

Parameters:

Name Type Description Default
value bool

The value to set.

required
Source code in PyFunceble/cli/file_preloader.py
def set_authorized(self, value: bool) -> "FilePreloader":
    """
    Sets the value of the :code:`_authorized` attribute.

    :param value:
        The value to set.
    """

    self.authorized = value

    return self

set_protocol(value)

Sets the value of the :code:_protocol attribute.

Parameters:

Name Type Description Default
value dict

The value to set.

required
Source code in PyFunceble/cli/file_preloader.py
def set_protocol(self, value: dict) -> "FilePreloader":
    """
    Sets the value of the :code:`_protocol` attribute.

    :param value:
        The value to set.
    """

    self.protocol = value

    return self

start(print_dots=False)

Starts the pre-loading of the currently set file path.

Source code in PyFunceble/cli/file_preloader.py
@execute_if_authorized(None)
@ensure_protocol_is_given
def start(self, print_dots: bool = False) -> "FilePreloader":
    """
    Starts the pre-loading of the currently set file path.
    """

    self.__load_description()

    broken = False
    file_helper = FileHelper(self.protocol["subject"])
    self.__description[self.__matching_index]["hash"] = HashHelper().hash_file(
        file_helper.path
    )

    if isinstance(self.continue_dataset, CSVContinueDataset):
        self.continue_dataset.set_base_directory(self.protocol["output_dir"])

    if (
        self.__description[self.__matching_index]["checker_type"]
        != self.protocol["checker_type"]
        or self.__description[self.__matching_index]["subject_type"]
        != self.protocol["subject_type"]
    ):
        try:
            self.continue_dataset.cleanup()
        except TypeError:
            self.continue_dataset.cleanup(session_id=self.protocol["session_id"])

    if (
        self.__description[self.__matching_index]["previous_hash"]
        and self.__description[self.__matching_index]["hash"]
        != self.__description[self.__matching_index]["previous_hash"]
    ):
        # Forces the reading of each lines because there is literally no
        # way to know where something has been changed.
        self.__description[self.__matching_index]["line_number"] = 1

    if (
        self.__description[self.__matching_index]["checker_type"]
        != self.protocol["checker_type"]
        or self.__description[self.__matching_index]["subject_type"]
        != self.protocol["subject_type"]
        or self.__description[self.__matching_index]["hash"]
        != self.__description[self.__matching_index]["previous_hash"]
    ):
        try:
            with file_helper.open("r", encoding="utf-8") as file_stream:
                line_num = 1

                for line in file_stream:
                    if (
                        line_num
                        < self.__description[self.__matching_index]["line_number"]
                    ):
                        line_num += 1
                        continue

                    if (
                        self.continuous_integration
                        and self.continuous_integration.is_time_exceeded()
                    ):
                        broken = True
                        break

                    line = line.strip()

                    if self.rpz_policy2subject and "SOA" in line:
                        self.rpz_policy2subject.set_soa(line.split()[0])

                    for subject in get_subjects_from_line(
                        line,
                        self.checker_type,
                        subject_type=self.protocol["subject_type"],
                        adblock_inputline2subject=self.adblock_inputline2subject,
                        wildcard2subject=self.wildcard2subject,
                        rpz_policy2subject=self.rpz_policy2subject,
                        rpz_inputline2subject=self.rpz_inputline2subject,
                        inputline2subject=self.inputline2subject,
                        subject2complements=self.subject2complements,
                        url2netloc=self.url2netloc,
                        cidr2subject=self.cidr2subject,
                    ):
                        to_send = copy.deepcopy(self.protocol)
                        to_send["subject"] = subject
                        to_send["idna_subject"] = domain2idna(subject)
                        to_send["tested_at"] = datetime.now(
                            timezone.utc
                        ) - timedelta(days=365.25 * 20)

                        if self.inactive_dataset.exists(to_send):
                            print_single_line("I")
                            continue

                        if TesterWorker.should_be_ignored(
                            subject=to_send["idna_subject"]
                        ):
                            print_single_line("X")
                            continue

                        self.continue_dataset.update(to_send, ignore_if_exist=True)

                        if print_dots:
                            print_single_line()

                    self.__description[self.__matching_index]["line_number"] += 1
                    line_num += 1
        except KeyboardInterrupt as exception:
            self.__save_description()
            raise exception

    if not broken:
        self.__description[self.__matching_index]["previous_hash"] = (
            self.__description[self.__matching_index]["hash"]
        )

    self.__save_description()

    return self