Source code for mobly.controllers.sniffer_lib.local.local_base

# Copyright 2016 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Class for Local sniffers - i.e. running on the local machine.

This class provides configuration for local interfaces but leaves
the actual capture (sniff) to sub-classes.
"""

import os
import shutil
import signal
import subprocess
import tempfile
from mobly import logger
from mobly import utils
from mobly.controllers import sniffer


[docs]class SnifferLocalBase(sniffer.Sniffer): """This class defines the common behaviors of WLAN sniffers running on WLAN interfaces of the local machine. Specific mechanisms to capture packets over the local WLAN interfaces are implemented by sub-classes of this class - i.e. it is not a final class. """ def __init__(self, interface, logger, base_configs=None): """See base class documentation """ self._base_configs = None self._capture_file_path = "" self._interface = "" self._logger = logger self._process = None self._temp_capture_file_path = "" if interface == "": raise sniffer.InvalidDataError("Empty interface provided") self._interface = interface self._base_configs = base_configs try: subprocess.check_call(['ifconfig', self._interface, 'down']) subprocess.check_call( ['iwconfig', self._interface, 'mode', 'monitor']) subprocess.check_call(['ifconfig', self._interface, 'up']) except Exception as err: raise sniffer.ExecutionError(err)
[docs] def get_interface(self): """See base class documentation """ return self._interface
[docs] def get_type(self): """See base class documentation """ return "local"
[docs] def get_capture_file(self): return self._capture_file_path
def _pre_capture_config(self, override_configs=None): """Utility function which configures the wireless interface per the specified configurations. Operation is performed before every capture start using baseline configurations (specified when sniffer initialized) and override configurations specified here. """ final_configs = {} if self._base_configs: final_configs.update(self._base_configs) if override_configs: final_configs.update(override_configs) if sniffer.Sniffer.CONFIG_KEY_CHANNEL in final_configs: try: subprocess.check_call([ 'iwconfig', self._interface, 'channel', str(final_configs[sniffer.Sniffer.CONFIG_KEY_CHANNEL]) ]) except Exception as err: raise sniffer.ExecutionError(err) def _get_command_line(self, additional_args=None, duration=None, packet_count=None): """Utility function to be implemented by every child class - which are the concrete sniffer classes. Each sniffer-specific class should derive the command line to execute its sniffer based on the specified arguments. """ raise NotImplementedError("Base class should not be called directly!") def _post_process(self): """Utility function which is executed after a capture is done. It moves the capture file to the requested location. """ self._process = None shutil.move(self._temp_capture_file_path, self._capture_file_path)
[docs] def start_capture(self, override_configs=None, additional_args=None, duration=None, packet_count=None): """See base class documentation """ if self._process is not None: raise sniffer.InvalidOperationError( "Trying to start a sniff while another is still running!") capture_dir = os.path.join(self._logger.log_path, "Sniffer-{}".format(self._interface)) os.makedirs(capture_dir, exist_ok=True) self._capture_file_path = os.path.join( capture_dir, "capture_{}.pcap".format(logger.get_log_file_timestamp())) self._pre_capture_config(override_configs) _, self._temp_capture_file_path = tempfile.mkstemp(suffix=".pcap") cmd = self._get_command_line(additional_args=additional_args, duration=duration, packet_count=packet_count) self._process = utils.start_standing_subprocess(cmd) return sniffer.ActiveCaptureContext(self, duration)
[docs] def stop_capture(self): """See base class documentation """ if self._process is None: raise sniffer.InvalidOperationError( "Trying to stop a non-started process") utils.stop_standing_subprocess(self._process) self._post_process()
[docs] def wait_for_capture(self, timeout=None): """See base class documentation """ if self._process is None: raise sniffer.InvalidOperationError( "Trying to wait on a non-started process") try: utils.wait_for_standing_subprocess(self._process, timeout) self._post_process() except subprocess.TimeoutExpired: self.stop_capture()