Skip to content

Executors

Runner (Base)

clean_output(self, data)

Decodes data and strips CLI garbage from returned outputs and errors

Parameters:

Name Type Description Default
data str

A output or error returned from subprocess

required

Returns:

Type Description
str

A cleaned string which will be displayed on the console and in logs

Source code in atomic_operator/execution/runner.py
def clean_output(self, data):
    """Decodes data and strips CLI garbage from returned outputs and errors

    Args:
        data (str): A output or error returned from subprocess

    Returns:
        str: A cleaned string which will be displayed on the console and in logs
    """
    if data:
        # Remove Windows CLI garbage
        data = re.sub(r"Microsoft\ Windows\ \[version .+\]\r?\nCopyright.*(\r?\n)+[A-Z]\:.+?\>", "", data.decode("utf-8", "ignore"))
        # formats strings with newline and return characters
        return re.sub(r"(\r?\n)*[A-Z]\:.+?\>", "", data)

execute(self, host_name='localhost', executor=None, host=None)

The main method which runs a single AtomicTest object on a local system.

Source code in atomic_operator/execution/runner.py
def execute(self, host_name='localhost', executor=None, host=None):
    """The main method which runs a single AtomicTest object on a local system.
    """
    return_dict = {}
    self.show_details(f"Using {executor} as executor.")
    if executor:
        if Base.CONFIG.check_prereqs and self.test.dependencies:
            return_dict.update(self._run_dependencies(host=host, executor=executor))
        self.show_details("Running command")
        response = self.execute_process(
            command=self.test.executor.command,
            executor=executor,
            host=host,
            cwd=self.test_path
        )
        return_dict.update({'command': response})
        if Runner.CONFIG.cleanup and self.test.executor.cleanup_command:
            self.show_details("Running cleanup command")
            cleanup_response = self.execute_process(
                command=self.test.executor.cleanup_command,
                executor=executor,
                host=host,
                cwd=self.test_path
            )
            return_dict.update({'cleanup': cleanup_response})
    return {host_name: return_dict}

print_process_output(self, command, return_code, output, errors)

Outputs the appropriate outputs if they exists to the console and log files

Parameters:

Name Type Description Default
command str

The command which was ran by subprocess

required
return_code int

The return code from subprocess

required
output bytes

Output from subprocess which is typically in bytes

required
errors bytes

Errors from subprocess which is typically in bytes

required
Source code in atomic_operator/execution/runner.py
def print_process_output(self, command, return_code, output, errors):
    """Outputs the appropriate outputs if they exists to the console and log files

    Args:
        command (str): The command which was ran by subprocess
        return_code (int): The return code from subprocess
        output (bytes): Output from subprocess which is typically in bytes
        errors (bytes): Errors from subprocess which is typically in bytes
    """
    return_dict = {}
    if return_code == 127:
        return_dict['error'] =  f"\n\nCommand Not Found: {command} returned exit code {return_code}: \nErrors: {self.clean_output(errors)}/nOutput: {output}"
        self.__logger.warning(return_dict['error'])
        return return_dict
    if output or errors:
        if output:
            return_dict['output'] = self.clean_output(output)
            self.__logger.info("\n\nOutput: {}".format(return_dict['output']))
        else:
            return_dict['error'] =  f"\n\nCommand: {command} returned exit code {return_code}: \n{self.clean_output(errors)}"
            self.__logger.warning(return_dict['error'])
    else:
        self.__logger.info("(No output)")
    return return_dict

LocalRunner (Runner)

Runs AtomicTest objects locally

__init__(self, atomic_test, test_path) special

A single AtomicTest object is provided and ran on the local system

Parameters:

Name Type Description Default
atomic_test AtomicTest

A single AtomicTest object.

required
test_path Atomic

A path where the AtomicTest object resides

required
Source code in atomic_operator/execution/localrunner.py
def __init__(self, atomic_test, test_path):
    """A single AtomicTest object is provided and ran on the local system

    Args:
        atomic_test (AtomicTest): A single AtomicTest object.
        test_path (Atomic): A path where the AtomicTest object resides
    """
    self.test = atomic_test
    self.test_path = test_path
    self.__local_system_platform = self.get_local_system_platform()

execute_process(self, command, executor=None, host=None, cwd=None)

Executes commands using subprocess

Parameters:

Name Type Description Default
executor str

An executor or shell used to execute the provided command(s)

None
command str

The commands to run using subprocess

required
cwd str

A string which indicates the current working directory to run the command

None

Returns:

Type Description
tuple

A tuple of either outputs or errors from subprocess

Source code in atomic_operator/execution/localrunner.py
def execute_process(self, command, executor=None, host=None, cwd=None):
    """Executes commands using subprocess

    Args:
        executor (str): An executor or shell used to execute the provided command(s)
        command (str): The commands to run using subprocess
        cwd (str): A string which indicates the current working directory to run the command

    Returns:
        tuple: A tuple of either outputs or errors from subprocess
    """
    p = subprocess.Popen(
        executor, 
        shell=False, 
        stdin=subprocess.PIPE, 
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT, 
        env=os.environ, 
        cwd=cwd
    )
    try:
        outs, errs = p.communicate(
            bytes(command, "utf-8") + b"\n", 
            timeout=Runner.CONFIG.command_timeout
        )
        response = self.print_process_output(command, p.returncode, outs, errs)
        return response
    except subprocess.TimeoutExpired as e:
        # Display output if it exists.
        if e.output:
            self.__logger.warning(e.output)
        if e.stdout:
            self.__logger.warning(e.stdout)
        if e.stderr:
            self.__logger.warning(e.stderr)
        self.__logger.warning("Command timed out!")

        # Kill the process.
        p.kill()
        return {}

RemoteRunner (Runner)

__init__(self, atomic_test, test_path, supporting_files=None) special

A single AtomicTest object is provided and ran on the local system

Parameters:

Name Type Description Default
atomic_test AtomicTest

A single AtomicTest object.

required
test_path Atomic

A path where the AtomicTest object resides

required
Source code in atomic_operator/execution/remoterunner.py
def __init__(self, atomic_test, test_path, supporting_files=None):
    """A single AtomicTest object is provided and ran on the local system

    Args:
        atomic_test (AtomicTest): A single AtomicTest object.
        test_path (Atomic): A path where the AtomicTest object resides
    """
    self.test = atomic_test
    self.test_path = test_path
    self.supporting_files = supporting_files

execute_process(self, command, executor=None, host=None, cwd=None)

Main method to execute commands using state machine

Parameters:

Name Type Description Default
command str

The command to run remotely on the desired systems

required
executor str

An executor that can be passed to state machine. Defaults to None.

None
host str

A host to run remote commands on. Defaults to None.

None
Source code in atomic_operator/execution/remoterunner.py
def execute_process(self, command, executor=None, host=None, cwd=None):
    """Main method to execute commands using state machine

    Args:
        command (str): The command to run remotely on the desired systems
        executor (str): An executor that can be passed to state machine. Defaults to None.
        host (str): A host to run remote commands on. Defaults to None.
    """
    self.state = CreationState()
    final_state = None
    try:
        finished = False
        while not finished:
            if str(self.state) == 'CreationState':
                self.__logger.debug('Running CreationState on_event')
                self.state = self.state.on_event(executor, command)
            if str(self.state) == 'InnvocationState':
                self.__logger.debug('Running InnvocationState on_event')
                self.state = self.state.invoke(host, executor, command, input_arguments=self.test.input_arguments, supporting_files=self.supporting_files, test_path=self.test_path)
            if str(self.state) == 'ParseResultsState':
                self.__logger.debug('Running ParseResultsState on_event')
                final_state = self.state.on_event()
                self.__logger.info(final_state)
                finished = True
    except NoValidConnectionsError as ec:
        self.__logger.warning(f'SSH Error: Unable to connect to {host.hostname}: {ec}')
        final_state = ec
    except AuthenticationException as ea:
        self.__logger.warning(f'SSH Error: Unable to authenticate to host {host.hostname}: {ea}')
        final_state = ea
    except BadAuthenticationType as eb:
        self.__logger.warning(f'SSH Error: Unable to use provided authentication type to {host.hostname}: {eb}')
        final_state = eb
    except PasswordRequiredException as ep:
        self.__logger.warning(f'SSH Error: Must provide a password to authenticate to {host.hostname}: {ep}')
        final_state = ep
    except AuthenticationError as ewa:
        self.__logger.warning(f'Windows Error: Unable to authenticate to host {host.hostname}: {ewa}')
        final_state = ewa
    except WinRMTransportError as ewt:
        self.__logger.warning(f'Windows Error: Error occurred during transport on host {host.hostname}: {ewt}')
        final_state = ewt
    except WSManFaultError as ewf:
        self.__logger.warning(f'Windows Error: Received WSManFault information from host {host.hostname}: {ewf}')
        final_state = ewf
    except Exception as ex:
        self.__logger.warning(f"Uknown Error: Received an unknown error from host {host.hostname}: {ex}")
        final_state = ex
    return final_state

start(self, host=None, executor=None)

The main method which runs a single AtomicTest object remotely on one remote host.

Source code in atomic_operator/execution/remoterunner.py
def start(self, host=None, executor=None):
    """The main method which runs a single AtomicTest object remotely on one remote host.
    """
    return self.execute(host_name=host.hostname, executor=executor, host=host)

AWSRunner (Runner)

Runs AtomicTest objects against AWS using the aws-cli

__init__(self, atomic_test, test_path) special

A single AtomicTest object is provided and ran using the aws-cli

Parameters:

Name Type Description Default
atomic_test AtomicTest

A single AtomicTest object.

required
test_path Atomic

A path where the AtomicTest object resides

required
Source code in atomic_operator/execution/awsrunner.py
def __init__(self, atomic_test, test_path):
    """A single AtomicTest object is provided and ran using the aws-cli

    Args:
        atomic_test (AtomicTest): A single AtomicTest object.
        test_path (Atomic): A path where the AtomicTest object resides
    """
    self.test = atomic_test
    self.test_path = test_path
    self.__local_system_platform = self.get_local_system_platform()

execute_process(self, command, executor=None, host=None, cwd=None)

Executes commands using subprocess

Parameters:

Name Type Description Default
executor str

An executor or shell used to execute the provided command(s)

None
command str

The commands to run using subprocess

required
cwd str

A string which indicates the current working directory to run the command

None

Returns:

Type Description
tuple

A tuple of either outputs or errors from subprocess

Source code in atomic_operator/execution/awsrunner.py
def execute_process(self, command, executor=None, host=None, cwd=None):
    """Executes commands using subprocess

    Args:
        executor (str): An executor or shell used to execute the provided command(s)
        command (str): The commands to run using subprocess
        cwd (str): A string which indicates the current working directory to run the command

    Returns:
        tuple: A tuple of either outputs or errors from subprocess
    """
    p = subprocess.Popen(
        executor, 
        shell=False, 
        stdin=subprocess.PIPE, 
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT, 
        env=os.environ, 
        cwd=cwd
    )
    try:
        outs, errs = p.communicate(
            bytes(command, "utf-8") + b"\n", 
            timeout=Runner.CONFIG.command_timeout
        )
        response = self.print_process_output(command, p.returncode, outs, errs)
        return response
    except subprocess.TimeoutExpired as e:
        # Display output if it exists.
        if e.output:
            self.__logger.warning(e.output)
        if e.stdout:
            self.__logger.warning(e.stdout)
        if e.stderr:
            self.__logger.warning(e.stderr)
        self.__logger.warning("Command timed out!")
        # Kill the process.
        p.kill()
        return {}

CreationState (State)

The state which is used to modify commands

on_event(self, command_type, command)

Handle events that are delegated to this State.

Source code in atomic_operator/execution/statemachine.py
def on_event(self, command_type, command):
    if command_type == 'powershell':
        return self.powershell(command)
    elif command_type == 'cmd':
        return self.cmd()
    elif command_type == 'ssh':
        return self.ssh()
    elif command_type == 'sh':
        return self.ssh()
    elif command_type == 'bash':
        return self.ssh()
    return self

InnvocationState (State, Base)

The state which indicates the invocation of a command

ParseResultsState (State, Runner)

The state which is used to parse the results

on_event(self)

Handle events that are delegated to this State.

Source code in atomic_operator/execution/statemachine.py
def on_event(self):
    return self.result

State

We define a state object which provides some utility functions for the individual states within the state machine.

on_event(self, event)

Handle events that are delegated to this State.

Source code in atomic_operator/execution/statemachine.py
def on_event(self, event):
    """
    Handle events that are delegated to this State.
    """
    pass

SuppressFilter (Filter)

filter(self, record)

Determine if the specified record is to be logged.

Is the specified record to be logged? Returns 0 for no, nonzero for yes. If deemed appropriate, the record may be modified in-place.

Source code in atomic_operator/execution/statemachine.py
def filter(self, record):
    return 'wsman' not in record.getMessage()