Executors
Runner (Base)
clean_output(self, data)
Decodes data and strips CLI garbage from returned outputs and errors
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
str |
A output or error returned from subprocess |
required |
Returns:
Type | Description |
---|---|
str |
A cleaned string which will be displayed on the console and in logs |
Source code in atomic_operator/execution/runner.py
def clean_output(self, data):
"""Decodes data and strips CLI garbage from returned outputs and errors
Args:
data (str): A output or error returned from subprocess
Returns:
str: A cleaned string which will be displayed on the console and in logs
"""
if data:
# Remove Windows CLI garbage
data = re.sub(r"Microsoft\ Windows\ \[version .+\]\r?\nCopyright.*(\r?\n)+[A-Z]\:.+?\>", "", data.decode("utf-8", "ignore"))
# formats strings with newline and return characters
return re.sub(r"(\r?\n)*[A-Z]\:.+?\>", "", data)
execute(self, host_name='localhost', executor=None, host=None)
The main method which runs a single AtomicTest object on a local system.
Source code in atomic_operator/execution/runner.py
def execute(self, host_name='localhost', executor=None, host=None):
"""The main method which runs a single AtomicTest object on a local system.
"""
return_dict = {}
self.show_details(f"Using {executor} as executor.")
if executor:
if Base.CONFIG.check_prereqs and self.test.dependencies:
return_dict.update(self._run_dependencies(host=host, executor=executor))
self.show_details("Running command")
response = self.execute_process(
command=self.test.executor.command,
executor=executor,
host=host,
cwd=self.test_path
)
return_dict.update({'command': response})
if Runner.CONFIG.cleanup and self.test.executor.cleanup_command:
self.show_details("Running cleanup command")
cleanup_response = self.execute_process(
command=self.test.executor.cleanup_command,
executor=executor,
host=host,
cwd=self.test_path
)
return_dict.update({'cleanup': cleanup_response})
return {host_name: return_dict}
print_process_output(self, command, return_code, output, errors)
Outputs the appropriate outputs if they exists to the console and log files
Parameters:
Name | Type | Description | Default |
---|---|---|---|
command |
str |
The command which was ran by subprocess |
required |
return_code |
int |
The return code from subprocess |
required |
output |
bytes |
Output from subprocess which is typically in bytes |
required |
errors |
bytes |
Errors from subprocess which is typically in bytes |
required |
Source code in atomic_operator/execution/runner.py
def print_process_output(self, command, return_code, output, errors):
"""Outputs the appropriate outputs if they exists to the console and log files
Args:
command (str): The command which was ran by subprocess
return_code (int): The return code from subprocess
output (bytes): Output from subprocess which is typically in bytes
errors (bytes): Errors from subprocess which is typically in bytes
"""
return_dict = {}
if return_code == 127:
return_dict['error'] = f"\n\nCommand Not Found: {command} returned exit code {return_code}: \nErrors: {self.clean_output(errors)}/nOutput: {output}"
self.__logger.warning(return_dict['error'])
return return_dict
if output or errors:
if output:
return_dict['output'] = self.clean_output(output)
self.__logger.info("\n\nOutput: {}".format(return_dict['output']))
else:
return_dict['error'] = f"\n\nCommand: {command} returned exit code {return_code}: \n{self.clean_output(errors)}"
self.__logger.warning(return_dict['error'])
else:
self.__logger.info("(No output)")
return return_dict
LocalRunner (Runner)
Runs AtomicTest objects locally
__init__(self, atomic_test, test_path)
special
A single AtomicTest object is provided and ran on the local system
Parameters:
Name | Type | Description | Default |
---|---|---|---|
atomic_test |
AtomicTest |
A single AtomicTest object. |
required |
test_path |
Atomic |
A path where the AtomicTest object resides |
required |
Source code in atomic_operator/execution/localrunner.py
def __init__(self, atomic_test, test_path):
"""A single AtomicTest object is provided and ran on the local system
Args:
atomic_test (AtomicTest): A single AtomicTest object.
test_path (Atomic): A path where the AtomicTest object resides
"""
self.test = atomic_test
self.test_path = test_path
self.__local_system_platform = self.get_local_system_platform()
execute_process(self, command, executor=None, host=None, cwd=None)
Executes commands using subprocess
Parameters:
Name | Type | Description | Default |
---|---|---|---|
executor |
str |
An executor or shell used to execute the provided command(s) |
None |
command |
str |
The commands to run using subprocess |
required |
cwd |
str |
A string which indicates the current working directory to run the command |
None |
Returns:
Type | Description |
---|---|
tuple |
A tuple of either outputs or errors from subprocess |
Source code in atomic_operator/execution/localrunner.py
def execute_process(self, command, executor=None, host=None, cwd=None):
"""Executes commands using subprocess
Args:
executor (str): An executor or shell used to execute the provided command(s)
command (str): The commands to run using subprocess
cwd (str): A string which indicates the current working directory to run the command
Returns:
tuple: A tuple of either outputs or errors from subprocess
"""
p = subprocess.Popen(
executor,
shell=False,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=os.environ,
cwd=cwd
)
try:
outs, errs = p.communicate(
bytes(command, "utf-8") + b"\n",
timeout=Runner.CONFIG.command_timeout
)
response = self.print_process_output(command, p.returncode, outs, errs)
return response
except subprocess.TimeoutExpired as e:
# Display output if it exists.
if e.output:
self.__logger.warning(e.output)
if e.stdout:
self.__logger.warning(e.stdout)
if e.stderr:
self.__logger.warning(e.stderr)
self.__logger.warning("Command timed out!")
# Kill the process.
p.kill()
return {}
RemoteRunner (Runner)
__init__(self, atomic_test, test_path, supporting_files=None)
special
A single AtomicTest object is provided and ran on the local system
Parameters:
Name | Type | Description | Default |
---|---|---|---|
atomic_test |
AtomicTest |
A single AtomicTest object. |
required |
test_path |
Atomic |
A path where the AtomicTest object resides |
required |
Source code in atomic_operator/execution/remoterunner.py
def __init__(self, atomic_test, test_path, supporting_files=None):
"""A single AtomicTest object is provided and ran on the local system
Args:
atomic_test (AtomicTest): A single AtomicTest object.
test_path (Atomic): A path where the AtomicTest object resides
"""
self.test = atomic_test
self.test_path = test_path
self.supporting_files = supporting_files
execute_process(self, command, executor=None, host=None, cwd=None)
Main method to execute commands using state machine
Parameters:
Name | Type | Description | Default |
---|---|---|---|
command |
str |
The command to run remotely on the desired systems |
required |
executor |
str |
An executor that can be passed to state machine. Defaults to None. |
None |
host |
str |
A host to run remote commands on. Defaults to None. |
None |
Source code in atomic_operator/execution/remoterunner.py
def execute_process(self, command, executor=None, host=None, cwd=None):
"""Main method to execute commands using state machine
Args:
command (str): The command to run remotely on the desired systems
executor (str): An executor that can be passed to state machine. Defaults to None.
host (str): A host to run remote commands on. Defaults to None.
"""
self.state = CreationState()
final_state = None
try:
finished = False
while not finished:
if str(self.state) == 'CreationState':
self.__logger.debug('Running CreationState on_event')
self.state = self.state.on_event(executor, command)
if str(self.state) == 'InnvocationState':
self.__logger.debug('Running InnvocationState on_event')
self.state = self.state.invoke(host, executor, command, input_arguments=self.test.input_arguments, supporting_files=self.supporting_files, test_path=self.test_path)
if str(self.state) == 'ParseResultsState':
self.__logger.debug('Running ParseResultsState on_event')
final_state = self.state.on_event()
self.__logger.info(final_state)
finished = True
except NoValidConnectionsError as ec:
self.__logger.warning(f'SSH Error: Unable to connect to {host.hostname}: {ec}')
final_state = ec
except AuthenticationException as ea:
self.__logger.warning(f'SSH Error: Unable to authenticate to host {host.hostname}: {ea}')
final_state = ea
except BadAuthenticationType as eb:
self.__logger.warning(f'SSH Error: Unable to use provided authentication type to {host.hostname}: {eb}')
final_state = eb
except PasswordRequiredException as ep:
self.__logger.warning(f'SSH Error: Must provide a password to authenticate to {host.hostname}: {ep}')
final_state = ep
except AuthenticationError as ewa:
self.__logger.warning(f'Windows Error: Unable to authenticate to host {host.hostname}: {ewa}')
final_state = ewa
except WinRMTransportError as ewt:
self.__logger.warning(f'Windows Error: Error occurred during transport on host {host.hostname}: {ewt}')
final_state = ewt
except WSManFaultError as ewf:
self.__logger.warning(f'Windows Error: Received WSManFault information from host {host.hostname}: {ewf}')
final_state = ewf
except Exception as ex:
self.__logger.warning(f"Uknown Error: Received an unknown error from host {host.hostname}: {ex}")
final_state = ex
return final_state
start(self, host=None, executor=None)
The main method which runs a single AtomicTest object remotely on one remote host.
Source code in atomic_operator/execution/remoterunner.py
def start(self, host=None, executor=None):
"""The main method which runs a single AtomicTest object remotely on one remote host.
"""
return self.execute(host_name=host.hostname, executor=executor, host=host)
AWSRunner (Runner)
Runs AtomicTest objects against AWS using the aws-cli
__init__(self, atomic_test, test_path)
special
A single AtomicTest object is provided and ran using the aws-cli
Parameters:
Name | Type | Description | Default |
---|---|---|---|
atomic_test |
AtomicTest |
A single AtomicTest object. |
required |
test_path |
Atomic |
A path where the AtomicTest object resides |
required |
Source code in atomic_operator/execution/awsrunner.py
def __init__(self, atomic_test, test_path):
"""A single AtomicTest object is provided and ran using the aws-cli
Args:
atomic_test (AtomicTest): A single AtomicTest object.
test_path (Atomic): A path where the AtomicTest object resides
"""
self.test = atomic_test
self.test_path = test_path
self.__local_system_platform = self.get_local_system_platform()
execute_process(self, command, executor=None, host=None, cwd=None)
Executes commands using subprocess
Parameters:
Name | Type | Description | Default |
---|---|---|---|
executor |
str |
An executor or shell used to execute the provided command(s) |
None |
command |
str |
The commands to run using subprocess |
required |
cwd |
str |
A string which indicates the current working directory to run the command |
None |
Returns:
Type | Description |
---|---|
tuple |
A tuple of either outputs or errors from subprocess |
Source code in atomic_operator/execution/awsrunner.py
def execute_process(self, command, executor=None, host=None, cwd=None):
"""Executes commands using subprocess
Args:
executor (str): An executor or shell used to execute the provided command(s)
command (str): The commands to run using subprocess
cwd (str): A string which indicates the current working directory to run the command
Returns:
tuple: A tuple of either outputs or errors from subprocess
"""
p = subprocess.Popen(
executor,
shell=False,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=os.environ,
cwd=cwd
)
try:
outs, errs = p.communicate(
bytes(command, "utf-8") + b"\n",
timeout=Runner.CONFIG.command_timeout
)
response = self.print_process_output(command, p.returncode, outs, errs)
return response
except subprocess.TimeoutExpired as e:
# Display output if it exists.
if e.output:
self.__logger.warning(e.output)
if e.stdout:
self.__logger.warning(e.stdout)
if e.stderr:
self.__logger.warning(e.stderr)
self.__logger.warning("Command timed out!")
# Kill the process.
p.kill()
return {}
CreationState (State)
The state which is used to modify commands
on_event(self, command_type, command)
Handle events that are delegated to this State.
Source code in atomic_operator/execution/statemachine.py
def on_event(self, command_type, command):
if command_type == 'powershell':
return self.powershell(command)
elif command_type == 'cmd':
return self.cmd()
elif command_type == 'ssh':
return self.ssh()
elif command_type == 'sh':
return self.ssh()
elif command_type == 'bash':
return self.ssh()
return self
ParseResultsState (State, Runner)
The state which is used to parse the results
on_event(self)
Handle events that are delegated to this State.
Source code in atomic_operator/execution/statemachine.py
def on_event(self):
return self.result
State
We define a state object which provides some utility functions for the individual states within the state machine.
on_event(self, event)
Handle events that are delegated to this State.
Source code in atomic_operator/execution/statemachine.py
def on_event(self, event):
"""
Handle events that are delegated to this State.
"""
pass
SuppressFilter (Filter)
filter(self, record)
Determine if the specified record is to be logged.
Is the specified record to be logged? Returns 0 for no, nonzero for yes. If deemed appropriate, the record may be modified in-place.
Source code in atomic_operator/execution/statemachine.py
def filter(self, record):
return 'wsman' not in record.getMessage()