Source code for mozvpn.wireguard

"""
Functions for interacting with wireguard command line tools.
"""
import shutil
import logging
import tempfile
import subprocess

import requests

logger = logging.getLogger(__name__)

# ### The following setup currently works for Linux (and probably MacOS) only.
# ### This should be refactored in order to support Windows as well.
# Wireguard commands:
WIREGUARD_QUICK_CMD = 'sudo -n wg-quick {cmd} {cfg}'
# 'wg show' requires sudo:
# WIREGUARD_SHOW_CMD = 'sudo -n wg show'
# 'wg show interfaces' does not require sudo:
WIREGUARD_SHOW_INTERFACES_CMD = 'wg show interfaces'
WIREGUARD_ETC_DIR = '/etc/wireguard'
WIREGUARD_LOCATIONS_FILE = '/etc/wireguard/locations.csv'


[docs]class WireguardError(Exception): """Raised when external wireguard or wg-quick command reported an error."""
[docs]class CommandError(Exception): def __init__(self, msg, cmd): super().__init__(msg) self.cmd = cmd @property def msg(self): return f'Command failed:\n"{self.cmd}"\nMessage: {str(self)}'
[docs]class ControlledExit(Exception): """Raise when error handling is finished and program can gracefully exit."""
[docs]def run_command(cmd: str, shell: bool = False, verbose: bool = False, dry_run: bool = False) -> str: """Run external command, and collect results or errors. Args: cmd: The command to be executed shell: if True run command via shell. verbose: if True print command to stdout. dry_run: if True then the commands will only be written to stdout only. and not executed. Raises: CommandError in case of failling command execution. """ run_cmd = cmd if shell else cmd.split() if verbose or dry_run: print(cmd) if dry_run: return try: proc = subprocess.run(run_cmd, timeout=20, shell=shell, capture_output=True) except FileNotFoundError as exc: logger.exception(f'Running "{cmd}" failed. Details:') raise CommandError(exc, cmd) from exc except subprocess.SubprocessError as exc: logger.exception(exc) raise CommandError(f'Unexpected error: {str(exc)}', cmd) from exc if proc.returncode: err = proc.stderr.decode('utf8') logger.error('Command "%s" failed: %s', cmd, err) raise CommandError(err, cmd) return proc.stdout.decode('utf8').strip()
[docs]def connect(conf_or_if: str): """Establish connection to VPN server via wg-quick command. Args: conf_or_if: Either - Name of wireguard conf file, e.g. "/path/to/us122-wireguard.conf" - Name of wireguard interface, e.g. "us122-wireguard" In this case the corresponding configuration file has to exist in the /etc/wireguard/ directory. """ wg_quick_cmd = WIREGUARD_QUICK_CMD.format(cmd='up', cfg=conf_or_if) run_command(wg_quick_cmd)
[docs]def disconnect(conf_or_if: str): """Shut down connection to VPN server via wg-quick command. Args: conf_or_if: Either - Path to wireguard conf file, e.g. "/path/to/us122-wireguard.conf" - Name of wireguard interface, e.g. "us122-wireguard" In this case the corresponding configuration file has to exist at /etc/wireguard/INTERFACE.conf. """ wg_quick_cmd = WIREGUARD_QUICK_CMD.format(cmd='down', cfg=conf_or_if) run_command(wg_quick_cmd)
[docs]def ipinfo(): """Obtain externally visible IP information from https://ipinfo.io Returns: JSON like { "ip": "185.213.155.160", "city": "Frankfurt am Main", "region": "Hesse", "country": "DE", "loc": "50.1155,8.6842", "org": "AS39351 31173 Services AB", "postal": "60311", "timezone": "Europe/Berlin", "readme": "https://ipinfo.io/missingauth" } """ res = requests.get('https://ipinfo.io') return res.json()
[docs]def mullvad_info(): """Obtain externally visible IP information from https://am.i.mullvad.net/json Note: Mullvad is the provider behind MozillaVPN. Returns: JSON like { "ip": "212.14.256.33", "country": "Germany", "city": "Stadt", "longitude": 8.2, "latitude": 44.4, "mullvad_exit_ip": false, "blacklisted": { ... }, "organization": "Telecom" } """ res = requests.get('https://am.i.mullvad.net/json') return res.json()
[docs]def status(ip: bool = False) -> str: """Show status of VPN connection. Args: ip: if given add currenlty visible external IP address to connection status. Returns: String telling if VPN connection is up, and if so, which server is currently is used. The IP address will optionally be added. Examples: - 'Not connected' - 'Connected to de10-wireguard' - 'Connected to de10-wireguard, ip: 234.12.642.0' """ iface = interface() if iface: if ip: ip_addr = ipinfo()["ip"] ip_info = f', ip: {ip_addr}' else: ip_info = '' stat_info = f'Connected to: {iface}{ip_info}' else: stat_info = 'Not connected' return stat_info
[docs]def interface() -> str: """Return interface of VPN connection, if available. Returns: Name of connected interface (e.g. 'de12-wireguard), otherwise None, if not connected. """ iface = run_command(WIREGUARD_SHOW_INTERFACES_CMD) return iface if iface else None
[docs]def check_wireguard_commands() -> dict: """Check absolute path to 'wg' and 'wg-quick' commands if they are installed and executable. Returns: {'wg': '/usr/bin/wg', 'wg-quick': '/usr/bin/wg-quick'} Raises: RuntimeError if path to wireguard command could not be determined. """ # Restrict possible path names to secure ones on Linux/macOS systems. This # prevents that a user has a local program called 'wg' to which 'sudo root' # would be granted. allowed_cmd_paths = "/usr/bin:/bin:/usr/sbin:/usr/local/bin:/usr/local/sbin" cmds = ['wg', 'wg-quick'] full_cmds = {} for cmd in cmds: cmd_path = shutil.which(cmd, path=allowed_cmd_paths) if not cmd_path: logger.error('Could not find wireguard command %s', cmd) raise RuntimeError('Could not find wireguard command "%s"' % cmd) full_cmds[cmd] = cmd_path return full_cmds
NON_ROOT_SETUP_COMMANDS_LINUX = [ 'chmod 700 {tmp_dir}', ('echo "%mozvpn ALL = (root) NOPASSWD: {wg-quick} up *-wireguard, {wg-quick} down *-wireguard" ' '> {tmp_dir}/mozvpn.sudo'), 'mozwire relay save -o {tmp_dir} -n {limit}', 'mozvpn geolocate {tmp_dir} -o {tmp_dir}/locations.csv', ] ROOT_SETUP_COMMANDS_LINUX = [ 'groupadd -f mozvpn', 'usermod -a -G mozvpn {user}', 'chown root.root {tmp_dir}/*', 'mv {tmp_dir}/mozvpn.sudo /etc/sudoers.d/mozvpn', 'chmod 440 /etc/sudoers.d/mozvpn', 'mkdir -p /etc/wireguard', 'mv {tmp_dir}/* {wireguard_etc_dir}', 'chmod 440 {wireguard_etc_dir}/*.conf', ]
[docs]def setup_wireguard_configuration(user: str, verbose: bool, dry_run: bool, limit: int): """Setup configurations needed to operate wireguard. Args: user: name of primary user who should be allowed to use MozVPN. verbose: if True print command to stdout. dry_run: if True then the commands will only be written to stdout only. limit: Limit the number of servers downloaded via mozwire and not executed. """ params = check_wireguard_commands() params['user'] = user params['limit'] = limit params['wireguard_etc_dir'] = WIREGUARD_ETC_DIR with tempfile.TemporaryDirectory() as tmp_dir: params['tmp_dir'] = tmp_dir for cmd in NON_ROOT_SETUP_COMMANDS_LINUX: scmd = cmd.format(**params) run_command(scmd, shell=True, verbose=verbose, dry_run=dry_run) for cmd in ROOT_SETUP_COMMANDS_LINUX: scmd = 'sudo ' + cmd.format(**params) run_command(scmd, shell=True, verbose=verbose, dry_run=dry_run) print( f"The only user currently allowed to use mozvpn is '{user}'.\n" "Please add more users to group 'mozvpn' for those who should be allowed to use mozvpn. " "You and all added users have logout and login again in order to activate this new group." )