Atomic Operator
AtomicOperator (Base)
Main class used to run Atomic Red Team tests.
atomic-operator is used to run Atomic Red Team tests both locally and remotely. These tests (atomics) are predefined tests to mock or emulate a specific technique.
config_file definition: atomic-operator's run method can be supplied with a path to a configuration file (config_file) which defines specific tests and/or values for input parameters to facilitate automation of said tests. An example of this config_file can be seen below:
inventory:
linux1:
executor: ssh
authentication:
username: root
password: UR4Swimlane!
#ssk_key_path:
port: 22
timeout: 5
hosts:
# - 192.168.1.1
- 10.32.100.199
# etc.
atomic_tests:
- guid: f7e6ec05-c19e-4a80-a7e7-241027992fdb
input_arguments:
output_file:
value: custom_output.txt
input_file:
value: custom_input.txt
- guid: 3ff64f0b-3af2-3866-339d-38d9791407c3
input_arguments:
second_arg:
value: SWAPPPED argument
- guid: 32f90516-4bc9-43bd-b18d-2cbe0b7ca9b2
inventories:
- linux1
Exceptions:
Type | Description |
---|---|
ValueError |
If a provided technique is unknown we raise an error. |
get_atomics(self, desintation='/home/docs/checkouts/readthedocs.org/user_builds/atomic-operator/checkouts/latest', **kwargs)
Downloads the RedCanary atomic-red-team repository to your local system.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
desintation |
str |
A folder path to download the repositorty data to. Defaults to os.getcwd(). |
'/home/docs/checkouts/readthedocs.org/user_builds/atomic-operator/checkouts/latest' |
kwargs |
dict |
This kwargs will be passed along to Python requests library during download. Defaults to None. |
{} |
Returns:
Type | Description |
---|---|
str |
The path the data can be found at. |
Source code in atomic_operator/atomic_operator.py
def get_atomics(self, desintation=os.getcwd(), **kwargs):
"""Downloads the RedCanary atomic-red-team repository to your local system.
Args:
desintation (str, optional): A folder path to download the repositorty data to. Defaults to os.getcwd().
kwargs (dict, optional): This kwargs will be passed along to Python requests library during download. Defaults to None.
Returns:
str: The path the data can be found at.
"""
if not os.path.exists(desintation):
os.makedirs(desintation)
folder_name = self.download_atomic_red_team_repo(desintation, **kwargs)
return os.path.join(desintation, folder_name)
run(self, techniques=['all'], test_guids=[], atomics_path='/home/docs/checkouts/readthedocs.org/user_builds/atomic-operator/checkouts/latest', check_prereqs=False, get_prereqs=False, cleanup=False, copy_source_files=True, command_timeout=20, show_details=False, prompt_for_input_args=False, return_atomics=False, config_file=None, hosts=[], username=None, password=None, ssh_key_path=None, private_key_string=None, verify_ssl=False, ssh_port=22, ssh_timeout=5, **kwargs)
The main method in which we run Atomic Red Team tests.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
techniques |
list |
One or more defined techniques by attack_technique ID. Defaults to 'all'. |
['all'] |
test_guids |
list |
One or more Atomic test GUIDs. Defaults to None. |
[] |
atomics_path |
str |
The path of Atomic tests. Defaults to os.getcwd(). |
'/home/docs/checkouts/readthedocs.org/user_builds/atomic-operator/checkouts/latest' |
check_prereqs |
bool |
Whether or not to check for prereq dependencies (prereq_comand). Defaults to False. |
False |
get_prereqs |
bool |
Whether or not you want to retrieve prerequisites. Defaults to False. |
False |
cleanup |
bool |
Whether or not you want to run cleanup command(s). Defaults to False. |
False |
copy_source_files |
bool |
Whether or not you want to copy any related source (src, bin, etc.) files to a remote host. Defaults to True. |
True |
command_timeout |
int |
Timeout duration for each command. Defaults to 20. |
20 |
show_details |
bool |
Whether or not you want to output details about tests being ran. Defaults to False. |
False |
prompt_for_input_args |
bool |
Whether you want to prompt for input arguments for each test. Defaults to False. |
False |
return_atomics |
bool |
Whether or not you want to return atomics instead of running them. Defaults to False. |
False |
config_file |
str |
A path to a conifg_file which is used to automate atomic-operator in environments. Default to None. |
None |
hosts |
list |
A list of one or more remote hosts to run a test on. Defaults to []. |
[] |
username |
str |
Username for authentication of remote connections. Defaults to None. |
None |
password |
str |
Password for authentication of remote connections. Defaults to None. |
None |
ssh_key_path |
str |
Path to a SSH Key for authentication of remote connections. Defaults to None. |
None |
private_key_string |
str |
A private SSH Key string used for authentication of remote connections. Defaults to None. |
None |
verify_ssl |
bool |
Whether or not to verify ssl when connecting over RDP (windows). Defaults to False. |
False |
ssh_port |
int |
SSH port for authentication of remote connections. Defaults to 22. |
22 |
ssh_timeout |
int |
SSH timeout for authentication of remote connections. Defaults to 5. |
5 |
kwargs |
dict |
If provided, keys matching inputs for a test will be replaced. Default is None. |
{} |
Exceptions:
Type | Description |
---|---|
ValueError |
If a provided technique is unknown we raise an error. |
Source code in atomic_operator/atomic_operator.py
def run(self, techniques: list=['all'], test_guids: list=[], atomics_path=os.getcwd(),
check_prereqs=False, get_prereqs=False, cleanup=False, copy_source_files=True,
command_timeout=20, show_details=False, prompt_for_input_args=False,
return_atomics=False, config_file=None, hosts=[], username=None,
password=None, ssh_key_path=None, private_key_string=None,
verify_ssl=False, ssh_port=22, ssh_timeout=5, **kwargs) -> None:
"""The main method in which we run Atomic Red Team tests.
Args:
techniques (list, optional): One or more defined techniques by attack_technique ID. Defaults to 'all'.
test_guids (list, optional): One or more Atomic test GUIDs. Defaults to None.
atomics_path (str, optional): The path of Atomic tests. Defaults to os.getcwd().
check_prereqs (bool, optional): Whether or not to check for prereq dependencies (prereq_comand). Defaults to False.
get_prereqs (bool, optional): Whether or not you want to retrieve prerequisites. Defaults to False.
cleanup (bool, optional): Whether or not you want to run cleanup command(s). Defaults to False.
copy_source_files (bool, optional): Whether or not you want to copy any related source (src, bin, etc.) files to a remote host. Defaults to True.
command_timeout (int, optional): Timeout duration for each command. Defaults to 20.
show_details (bool, optional): Whether or not you want to output details about tests being ran. Defaults to False.
prompt_for_input_args (bool, optional): Whether you want to prompt for input arguments for each test. Defaults to False.
return_atomics (bool, optional): Whether or not you want to return atomics instead of running them. Defaults to False.
config_file (str, optional): A path to a conifg_file which is used to automate atomic-operator in environments. Default to None.
hosts (list, optional): A list of one or more remote hosts to run a test on. Defaults to [].
username (str, optional): Username for authentication of remote connections. Defaults to None.
password (str, optional): Password for authentication of remote connections. Defaults to None.
ssh_key_path (str, optional): Path to a SSH Key for authentication of remote connections. Defaults to None.
private_key_string (str, optional): A private SSH Key string used for authentication of remote connections. Defaults to None.
verify_ssl (bool, optional): Whether or not to verify ssl when connecting over RDP (windows). Defaults to False.
ssh_port (int, optional): SSH port for authentication of remote connections. Defaults to 22.
ssh_timeout (int, optional): SSH timeout for authentication of remote connections. Defaults to 5.
kwargs (dict, optional): If provided, keys matching inputs for a test will be replaced. Default is None.
Raises:
ValueError: If a provided technique is unknown we raise an error.
"""
atomics_path = self.__find_path(atomics_path)
if not atomics_path:
return AtomicsFolderNotFound('Unable to find a folder containing Atomics. Please provide a path or run get_atomics.')
Base.CONFIG = Config(
atomics_path = atomics_path,
check_prereqs = check_prereqs,
get_prereqs = get_prereqs,
cleanup = cleanup,
command_timeout = command_timeout,
show_details = show_details,
prompt_for_input_args = prompt_for_input_args,
kwargs = kwargs,
copy_source_files = copy_source_files
)
# taking inputs from both config_file and passed in values via command
# line to build a run_list of objects
self.__config_parser = ConfigParser(
config_file=config_file,
techniques=self.parse_input_lists(techniques),
test_guids=self.parse_input_lists(test_guids),
host_list=self.parse_input_lists(hosts),
username=username,
password=password,
ssh_key_path=ssh_key_path,
private_key_string=private_key_string,
verify_ssl=verify_ssl,
ssh_port=ssh_port,
ssh_timeout=ssh_timeout
)
self.__run_list = self.__config_parser.run_list
__return_atomics = []
for item in self.__run_list:
if return_atomics:
__return_atomics.append(item)
elif kwargs.get('kwargs'):
self.__run_technique(item, **kwargs.get('kwargs'))
else:
self.__run_technique(item)
if return_atomics and __return_atomics:
return __return_atomics
return self.__test_responses
Base
download_atomic_red_team_repo(self, save_path, **kwargs)
Downloads the Atomic Red Team repository from github
Parameters:
Name | Type | Description | Default |
---|---|---|---|
save_path |
str |
The path to save the downloaded and extracted ZIP contents |
required |
Returns:
Type | Description |
---|---|
str |
A string of the location the data was saved to. |
Source code in atomic_operator/base.py
def download_atomic_red_team_repo(self, save_path, **kwargs) -> str:
"""Downloads the Atomic Red Team repository from github
Args:
save_path (str): The path to save the downloaded and extracted ZIP contents
Returns:
str: A string of the location the data was saved to.
"""
response = requests.get(Base.ATOMIC_RED_TEAM_REPO, stream=True, **kwargs)
z = zipfile.ZipFile(BytesIO(response.content))
z.extractall(save_path)
return z.namelist()[0]
get_abs_path(self, value)
Formats and returns the absolute path for a path value
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
str |
A path string in many different accepted formats |
required |
Returns:
Type | Description |
---|---|
str |
The absolute path of the provided string |
Source code in atomic_operator/base.py
def get_abs_path(self, value) -> str:
"""Formats and returns the absolute path for a path value
Args:
value (str): A path string in many different accepted formats
Returns:
str: The absolute path of the provided string
"""
return os.path.abspath(os.path.expanduser(os.path.expandvars(value)))
get_local_system_platform(self)
Identifies the local systems operating system platform
Returns:
Type | Description |
---|---|
str |
The current/local systems operating system platform |
Source code in atomic_operator/base.py
def get_local_system_platform(self) -> str:
"""Identifies the local systems operating system platform
Returns:
str: The current/local systems operating system platform
"""
os_name = platform.system().lower()
if os_name == "darwin":
return "macos"
return os_name
prompt_user_for_input(self, title, input_object)
Prompts user for input values based on the provided values.
Source code in atomic_operator/base.py
def prompt_user_for_input(self, title, input_object):
"""Prompts user for input values based on the provided values.
"""
print(f"""
Inputs for {title}:
Input Name: {input_object.name}
Default: {input_object.default}
Description: {input_object.description}
""")
print(f"Please provide a value for {input_object.name} (If blank, default is used):",)
value = sys.stdin.readline()
if bool(value):
return value
return input_object.default
show_details(self, value)
Displays the provided value string if Base.CONFIG.show_details is True
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
str |
A string to display if selected in config. |
required |
Source code in atomic_operator/base.py
def show_details(self, value) -> None:
"""Displays the provided value string if Base.CONFIG.show_details is True
Args:
value (str): A string to display if selected in config.
"""
if Base.CONFIG.show_details:
self.__logger.info(value)
ConfigParser (Base)
config
property
readonly
Returns raw converted config_file passed into class
Returns:
Type | Description |
---|---|
[dict] |
Returns the converted config_file as dictionary. |
run_list
property
readonly
Returns a list of Atomic objects that will be ran.
This list combines Atomics and potentially filters
tests defined within that Atomic object based on passed
in parameters and config_file.
Additionally, a list of Host objects are added to their
defined techniques or test_guids based on config and/or
passed in parameters.
[
Atomic(
attack_technique='T1016',
display_name='System Network Configuration Discovery',
path='/Users/josh.rickard/_Swimlane2/atomic-operator/redcanaryco-atomic-red-team-22dd2fb/atomics/T1016',
atomic_tests=[
AtomicTest(
name='System Network Configuration Discovery',
description='Identify network configuration information.
Upon successful execution, ...', supported_platforms=['macos', 'linux'], auto_generated_guid='c141bbdb-7fca-4254-9fd6-f47e79447e17', executor=AtomicExecutor( name='sh', command='if [ -x "$(command -v arp)" ]; then arp -a; else echo "arp is missing from ....', cleanup_command=None, elevation_required=False, steps=None ), input_arguments=None, dependency_executor_name=None, dependencies=[] ) ], hosts=[ Host( hostname='192.168.1.1', username='username', password='some_passowrd!', verify_ssl=False, ssh_key_path=None, private_key_string=None, port=22, timeout=5 ) ], supporting_files=[ 'redcanaryco-atomic-red-team-22dd2fb/atomics/T1016/src/top-128.txt', 'redcanaryco-atomic-red-team-22dd2fb/atomics/T1016/src/qakbot.bat' ] ) ]
!!! returns
[list]: A list of modified Atomic objects that will be used to run
either remotely or locally.
__init__(self, config_file=None, techniques=None, test_guids=None, host_list=None, username=None, password=None, ssh_key_path=None, private_key_string=None, verify_ssl=False, ssh_port=22, ssh_timeout=5)
special
Parses a provided config file as well as parameters to build a run list
This list combines Atomics and potentially filters
tests defined within that Atomic object based on passed
in parameters and config_file.
Additionally, a list of Host objects are added to their
defined techniques or test_guids based on config and/or
passed in parameters.
Example: Example structure returned from provided values
[
Atomic(
attack_technique='T1016',
display_name='System Network Configuration Discovery',
path='/Users/josh.rickard/_Swimlane2/atomic-operator/redcanaryco-atomic-red-team-22dd2fb/atomics/T1016',
atomic_tests=[
AtomicTest(
name='System Network Configuration Discovery',
description='Identify network configuration information.
Upon successful execution, ...', supported_platforms=['macos', 'linux'], auto_generated_guid='c141bbdb-7fca-4254-9fd6-f47e79447e17', executor=AtomicExecutor( name='sh', command='if [ -x "$(command -v arp)" ]; then arp -a; else echo "arp is missing from ....', cleanup_command=None, elevation_required=False, steps=None ), input_arguments=None, dependency_executor_name=None, dependencies=[] ) ], hosts=[ Host( hostname='192.168.1.1', username='username', password='some_passowrd!', verify_ssl=False, ssh_key_path=None, private_key_string=None, port=22, timeout=5 ) ], supporting_files=[ 'redcanaryco-atomic-red-team-22dd2fb/atomics/T1016/src/top-128.txt', 'redcanaryco-atomic-red-team-22dd2fb/atomics/T1016/src/qakbot.bat' ] ) ]
Source code in atomic_operator/configparser.py
def __init__(self, config_file=None, techniques=None, test_guids=None,
host_list=None, username=None, password=None,
ssh_key_path=None, private_key_string=None, verify_ssl=False,
ssh_port=22, ssh_timeout=5
):
"""Parses a provided config file as well as parameters to build a run list
This list combines Atomics and potentially filters
tests defined within that Atomic object based on passed
in parameters and config_file.
Additionally, a list of Host objects are added to their
defined techniques or test_guids based on config and/or
passed in parameters.
Example: Example structure returned from provided values
[
Atomic(
attack_technique='T1016',
display_name='System Network Configuration Discovery',
path='/Users/josh.rickard/_Swimlane2/atomic-operator/redcanaryco-atomic-red-team-22dd2fb/atomics/T1016',
atomic_tests=[
AtomicTest(
name='System Network Configuration Discovery',
description='Identify network configuration information.\n\nUpon successful execution, ...',
supported_platforms=['macos', 'linux'],
auto_generated_guid='c141bbdb-7fca-4254-9fd6-f47e79447e17',
executor=AtomicExecutor(
name='sh',
command='if [ -x "$(command -v arp)" ]; then arp -a; else echo "arp is missing from ....',
cleanup_command=None,
elevation_required=False, steps=None
),
input_arguments=None,
dependency_executor_name=None,
dependencies=[]
)
],
hosts=[
Host(
hostname='192.168.1.1',
username='username',
password='some_passowrd!',
verify_ssl=False,
ssh_key_path=None,
private_key_string=None,
port=22,
timeout=5
)
],
supporting_files=[
'redcanaryco-atomic-red-team-22dd2fb/atomics/T1016/src/top-128.txt',
'redcanaryco-atomic-red-team-22dd2fb/atomics/T1016/src/qakbot.bat'
]
)
]
"""
self.__config_file = self.__load_config(config_file)
self.techniques = techniques
self.test_guids = test_guids
self.__host_list = []
if host_list:
for host in self.parse_input_lists(host_list):
self.__host_list.append(self.__create_remote_host_object(
hostname=host,
username=username,
password=password,
ssh_key_path=ssh_key_path,
private_key_string=private_key_string,
verify_ssl=verify_ssl,
ssh_port=ssh_port,
ssh_timeout=ssh_timeout
))
get_inputs(self, guid)
Retrieves any defined inputs for a given atomic test GUID
Parameters:
Name | Type | Description | Default |
---|---|---|---|
guid |
str |
An Atomic test GUID |
required |
Returns:
Type | Description |
---|---|
dict |
A dictionary of defined input arguments or empty |
Source code in atomic_operator/configparser.py
def get_inputs(self, guid: str):
"""Retrieves any defined inputs for a given atomic test GUID
Args:
guid (str): An Atomic test GUID
Returns:
dict: A dictionary of defined input arguments or empty
"""
if self.__config_file:
for item in self.__config_file['atomic_tests']:
if item['guid'] == guid:
return item.get('input_arguments', {})
return {}
is_defined(self, guid)
Checks to see if a GUID is defined within a config file
Parameters:
Name | Type | Description | Default |
---|---|---|---|
guid |
str |
The GUID defined within a parsed config file |
required |
Returns:
Type | Description |
---|---|
[bool] |
Returns True if GUID is defined within parsed config_file |
Source code in atomic_operator/configparser.py
def is_defined(self, guid: str):
"""Checks to see if a GUID is defined within a config file
Args:
guid (str): The GUID defined within a parsed config file
Returns:
[bool]: Returns True if GUID is defined within parsed config_file
"""
if self.__config_file:
for item in self.__config_file['atomic_tests']:
if item['guid'] == guid:
return True
return False
Config
The main configuration class used across atomic-operator
Exceptions:
Type | Description |
---|---|
AtomicsFolderNotFound |
Raised when unable to find the provided atomics_path value |
__init__(self, atomics_path, check_prereqs=False, get_prereqs=False, cleanup=False, command_timeout=20, show_details=False, prompt_for_input_args=False, kwargs={}, copy_source_files=True)
special
Method generated by attrs for class Config.
Source code in atomic_operator/models.py
def __init__(self, atomics_path, check_prereqs=attr_dict['check_prereqs'].default, get_prereqs=attr_dict['get_prereqs'].default, cleanup=attr_dict['cleanup'].default, command_timeout=attr_dict['command_timeout'].default, show_details=attr_dict['show_details'].default, prompt_for_input_args=attr_dict['prompt_for_input_args'].default, kwargs=attr_dict['kwargs'].default, copy_source_files=attr_dict['copy_source_files'].default):
_setattr = _cached_setattr.__get__(self, self.__class__)
_inst_dict = self.__dict__
_inst_dict['atomics_path'] = atomics_path
_inst_dict['check_prereqs'] = check_prereqs
_inst_dict['get_prereqs'] = get_prereqs
_inst_dict['cleanup'] = cleanup
_inst_dict['command_timeout'] = command_timeout
_inst_dict['show_details'] = show_details
_inst_dict['prompt_for_input_args'] = prompt_for_input_args
_inst_dict['kwargs'] = kwargs
_inst_dict['copy_source_files'] = copy_source_files
if _config._run_validators is True:
__attr_validator_atomics_path(self, __attr_atomics_path, self.atomics_path)
self.__attrs_post_init__()
Host
__init__(self, hostname, username=None, password=None, verify_ssl=False, ssh_key_path=None, private_key_string=None, port=22, timeout=5)
special
Method generated by attrs for class Host.
Source code in atomic_operator/models.py
def __init__(self, hostname, username=attr_dict['username'].default, password=attr_dict['password'].default, verify_ssl=attr_dict['verify_ssl'].default, ssh_key_path=attr_dict['ssh_key_path'].default, private_key_string=attr_dict['private_key_string'].default, port=attr_dict['port'].default, timeout=attr_dict['timeout'].default):
self.hostname = hostname
self.username = username
self.password = password
self.verify_ssl = verify_ssl
self.ssh_key_path = ssh_key_path
self.private_key_string = private_key_string
self.port = port
self.timeout = timeout
if _config._run_validators is True:
__attr_validator_ssh_key_path(self, __attr_ssh_key_path, self.ssh_key_path)
Atomic
A single Atomic data structure. Each Atomic (technique) will contain a list of one or more AtomicTest objects.
__init__(self, attack_technique, display_name, path, atomic_tests, hosts=None, supporting_files=[])
special
Method generated by attrs for class Atomic.
Source code in atomic_operator/atomic/atomic.py
def __init__(self, attack_technique, display_name, path, atomic_tests, hosts=attr_dict['hosts'].default, supporting_files=attr_dict['supporting_files'].default):
self.attack_technique = attack_technique
self.display_name = display_name
self.path = path
self.atomic_tests = atomic_tests
self.hosts = hosts
self.supporting_files = supporting_files
self.__attrs_post_init__()
AtomicDependency
__init__(self, description, get_prereq_command=None, prereq_command=None)
special
Method generated by attrs for class AtomicDependency.
Source code in atomic_operator/atomic/atomictest.py
def __init__(self, description, get_prereq_command=attr_dict['get_prereq_command'].default, prereq_command=attr_dict['prereq_command'].default):
self.description = description
self.get_prereq_command = get_prereq_command
self.prereq_command = prereq_command
AtomicExecutor
__init__(self, name, command, cleanup_command=None, elevation_required=False, steps=None)
special
Method generated by attrs for class AtomicExecutor.
Source code in atomic_operator/atomic/atomictest.py
def __init__(self, name, command, cleanup_command=attr_dict['cleanup_command'].default, elevation_required=attr_dict['elevation_required'].default, steps=attr_dict['steps'].default):
self.name = name
self.command = command
self.cleanup_command = cleanup_command
self.elevation_required = elevation_required
self.steps = steps
AtomicTest
A single Atomic test object structure
Returns:
Type | Description |
---|---|
AtomicTest |
A single Atomic test object |
__init__(self, name, description, supported_platforms, auto_generated_guid, executor, input_arguments=None, dependency_executor_name=None, dependencies=[])
special
Method generated by attrs for class AtomicTest.
Source code in atomic_operator/atomic/atomictest.py
def __init__(self, name, description, supported_platforms, auto_generated_guid, executor, input_arguments=attr_dict['input_arguments'].default, dependency_executor_name=attr_dict['dependency_executor_name'].default, dependencies=attr_dict['dependencies'].default):
self.name = name
self.description = description
self.supported_platforms = supported_platforms
self.auto_generated_guid = auto_generated_guid
self.executor = executor
self.input_arguments = input_arguments
self.dependency_executor_name = dependency_executor_name
self.dependencies = dependencies
self.__attrs_post_init__()
AtomicTestInput
__init__(self, name, description, type, default, value=None, source=None, destination=None)
special
Method generated by attrs for class AtomicTestInput.
Source code in atomic_operator/atomic/atomictest.py
def __init__(self, name, description, type, default, value=attr_dict['value'].default, source=attr_dict['source'].default, destination=attr_dict['destination'].default):
self.name = name
self.description = description
self.type = type
self.default = default
self.value = value
self.source = source
self.destination = destination
Loader (Base)
find_atomics(self, atomics_path, pattern='**/T*/T*.yaml')
Attempts to find the atomics folder within the provided atomics_path
Parameters:
Name | Type | Description | Default |
---|---|---|---|
atomics_path |
str |
A path to the atomic-red-team directory |
required |
pattern |
str |
Pattern used to find atomics and their required yaml files. Defaults to '/T/T.yaml'. |
'**/T*/T*.yaml' |
Returns:
Type | Description |
---|---|
list |
A list of paths of all identified atomics found in the given directory |
Source code in atomic_operator/atomic/loader.py
def find_atomics(self, atomics_path, pattern='**/T*/T*.yaml') -> list:
"""Attempts to find the atomics folder within the provided atomics_path
Args:
atomics_path (str): A path to the atomic-red-team directory
pattern (str, optional): Pattern used to find atomics and their required yaml files. Defaults to '**/T*/T*.yaml'.
Returns:
list: A list of paths of all identified atomics found in the given directory
"""
result = []
path = PurePath(atomics_path)
for p in Path(path).rglob(pattern):
result.append(p.resolve())
return result
load_technique(self, path_to_dir)
Loads a provided yaml file which is typically an Atomic defintiion or configuration file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path_to_dir |
str |
A string path to a yaml formatted file |
required |
Returns:
Type | Description |
---|---|
dict |
Returns the loaded yaml file in a dictionary format |
Source code in atomic_operator/atomic/loader.py
def load_technique(self, path_to_dir) -> dict:
"""Loads a provided yaml file which is typically an Atomic defintiion or configuration file.
Args:
path_to_dir (str): A string path to a yaml formatted file
Returns:
dict: Returns the loaded yaml file in a dictionary format
"""
try:
with open(self.get_abs_path(path_to_dir), 'r', encoding="utf-8") as f:
return yaml.load(f.read(), Loader=yaml.SafeLoader)
except:
# windows does not like get_abs_path so casting to string
with open(str(path_to_dir), 'r', encoding="utf-8") as f:
return yaml.load(f.read(), Loader=yaml.SafeLoader)
load_techniques(self)
The main entrypoint when loading techniques from disk.
Exceptions:
Type | Description |
---|---|
AtomicsFolderNotFound |
Thrown when unable to find the folder containing Atomic tests |
Returns:
Type | Description |
---|---|
dict |
A dict with the key(s) as the Atomic technique ID and the val is a list of Atomic objects. |
Source code in atomic_operator/atomic/loader.py
def load_techniques(self) -> dict:
"""The main entrypoint when loading techniques from disk.
Raises:
AtomicsFolderNotFound: Thrown when unable to find the folder containing
Atomic tests
Returns:
dict: A dict with the key(s) as the Atomic technique ID and the val
is a list of Atomic objects.
"""
atomics_path = Base.CONFIG.atomics_path
if not os.path.exists(self.get_abs_path(atomics_path)):
atomics_path = self.find_atomics(self.get_abs_path(__file__))
if not atomics_path:
raise AtomicsFolderNotFound('Unable to find any atomics folder')
else:
atomics_path = self.find_atomics(atomics_path)
if not atomics_path:
raise AtomicsFolderNotFound('Unable to find any atomics folder')
for atomic_entry in atomics_path:
technique = self.__get_file_name(atomic_entry)
if not self.__techniques.get(technique):
loaded_technique = self.load_technique(str(atomic_entry))
loaded_technique.update({'path': os.path.dirname(str(atomic_entry))})
self.__techniques[technique] = Atomic(**loaded_technique)
return self.__techniques