Skip to main content

Common Server Python

Common functions that will be appended to the code of each integration/script before being executed.

AutoFocusKeyRetriever#

AutoFocus API Key management class

Arguments:

  • api_key str: Auto Focus API key coming from the integration parameters
  • override_default_credentials bool: Whether to override the default credentials and use the Cortex XSOAR given AutoFocus API Key

BarColumnPieWidget#

Bar/Column/Pie Widget representation

Arguments:

  • categories list: a list of categories to display(Better use the add_category function to populate the data.

add_category#

| BarColumnPieWidget.add_category(name, number)

Add a category to widget.

Arguments:

  • name str: the name of the category to add.

  • number int: the number value of the category.

to_display#

| BarColumnPieWidget.to_display()

CommandResults#

CommandResults class - use to return results to warroom

Arguments:

  • outputs_prefix str: should be identical to the prefix in the yml contextPath in yml file. for example: CortexXDR.Incident

  • outputs_key_field str or list[str]: primary key field in the main object. If the command returns Incidents, and of the properties of Incident is incident_id, then outputs_key_field='incident_id'. If object has multiple unique keys, then list of strings is supported outputs_key_field=['id1', 'id2']

  • outputs list or dict: the data to be returned and will be set to context

  • indicators list: DEPRECATED: use 'indicator' instead.

  • indicator Common.Indicator: single indicator like Common.IP, Common.URL, Common.File, etc.

  • readable_output str: (Optional) markdown string that will be presented in the warroom, should be human readable - (HumanReadable) - if not set, readable output will be generated

  • raw_response dict | list: must be dictionary, if not provided then will be equal to outputs. usually must be the original raw response from the 3rd party service (originally Contents)

  • indicators_timeline IndicatorsTimeline: must be an IndicatorsTimeline. used by the server to populate an indicator's timeline.

  • ignore_auto_extract bool: must be a boolean, default value is False. Used to prevent AutoExtract on output.

  • relationships list of EntityRelationship: List of relationships of the indicator.

  • mark_as_note bool: must be a boolean, default value is False. Used to mark entry as note.

  • tags list: must be a list, default value is None. Used to tag war room entries.

  • entry_type int code of EntryType: type of return value, see EntryType

  • scheduled_command ScheduledCommand: manages the way the command should be polled.

  • execution_metrics ExecutionMetrics: contains metric data about a command's execution

  • replace_existing bool: Replace the context value at outputs_prefix if it exists. Works only if outputs_prefix is a path to a nested value i.e., contains a period. For example, the "next token" result should always be overwritten. This response can be returned as follows:

Examples:

>>> CommandResults(
>>> readable_output=f'Next Token: {next_token}',
>>> outputs=next_token,
>>> outputs_prefix='Path.To.NextToken',
>>> replace_existing=True,
>>> )

to_context#

| CommandResults.to_context()

CommandRunner#

Class for executing multiple commands and save the results of each command.

Command#

Data class with the data required to execute a command.

Arguments:

  • commands __: The commands list or a single command
  • args_lst str + List[str]: The args list or a single args
  • brand List[Dict] + Dict: The brand to use
  • instance str: The instance to use

_is_valid#

| Command._is_valid(commands, args_lst)

Error handling of the given arguments

Arguments:

  • commands __: list of commands
  • args_lst __: list of args

Result#

Class for the result of the command.

Arguments:

  • command __: command that was run. :type command str
  • args __: args that was run.
  • brand dict: The brand that was used.
  • instance str: The instance that was used.
  • result str: The result of the command.

execute_commands#

| CommandRunner.execute_commands(command, extract_contents=True)

Runs the demisto.executeCommand() and gets all the results, including the errors, returned from the command.

Arguments:

  • command Command: The commands to run. (required)

  • extract_contents bool: Whether to extract Contents part of the results. Default is True.

Returns:

  • Tuple[List[ResultWrapper], List[ResultWrapper]] - A tuple of two lists: the command results list and command errors list.

run_commands_with_summary#

| CommandRunner.run_commands_with_summary(commands)

Given a list of commands, return a list of results (to pass to return_results). In addition, it will create a CommandResult of the summary of the commands, which is a readable_output.

Arguments:

  • commands __: A list of commands.

get_results_summary#

| CommandRunner.get_results_summary(results, errors)

Get a Human Readable result for all the results of the commands.

Arguments:

  • results __: list of returned results
  • errors __: list of returned errors

ConfKey#

YML configuration key fields.

This is an empty class, used for code autocompletion when using demisto-sdk generate_yml_from_python command syntax. For more information, visit the command's README.md.

Returns:

  • ConfKey - The ConfKey object

DBotScoreReliability#

Enum: Source reliability levels Values are case sensitive

  • A_PLUS

  • A

  • B

  • C

  • D

  • E

  • F

is_valid_type#

| DBotScoreReliability.is_valid_type(_type)

get_dbot_score_reliability_from_str#

| DBotScoreReliability.get_dbot_score_reliability_from_str(reliability_str)

DBotScoreType#

Enum: contains all the indicator types DBotScoreType.IP DBotScoreType.FILE DBotScoreType.DOMAIN DBotScoreType.URL DBotScoreType.CVE DBotScoreType.ACCOUNT DBotScoreType.CRYPTOCURRENCY DBotScoreType.EMAIL DBotScoreType.ATTACKPATTERN DBotScoreType.CUSTOM

  • IP

  • FILE

  • DOMAIN

  • URL

  • CVE

  • ACCOUNT

  • CIDR

  • DOMAINGLOB

  • CERTIFICATE

  • CRYPTOCURRENCY

  • EMAIL

  • ATTACKPATTERN

  • CUSTOM

is_valid_type#

| DBotScoreType.is_valid_type(cls, _type)

DebugLogger#

Wrapper to initiate logging at logging.DEBUG level. Is used when debug-mode=True.

__del__#

| DebugLogger.__del__()

log_start_debug#

| DebugLogger.log_start_debug()

Utility function to log start of debug mode logging

DemistoHandler#

Handler to route logging messages to an IntegrationLogger or demisto.debug if not supplied

emit#

| DemistoHandler.emit(record)

EntityRelationship#

XSOAR entity relationship.

Arguments:

  • name str: Relationship name.

  • relationship_type str: Relationship type. (e.g. IndicatorToIndicator...).

  • entity_a str: A value, A aka source of the relationship.

  • entity_a_family str: Entity family of A, A aka source of the relationship. (e.g. Indicator...)

  • entity_a_type str: Entity A type, A aka source of the relationship. (e.g. IP/URL/...).

  • entity_b str: B value, B aka destination of the relationship.

  • entity_b_family str: Entity family of B, B aka destination of the relationship. (e.g. Indicator...)

  • entity_b_type str: Entity B type, B aka destination of the relationship. (e.g. IP/URL/...).

  • source_reliability str: Source reliability.

  • fields dict: Custom fields. (Optional)

  • brand str: Source brand name. (Optional)

RelationshipsTypes#

Relationships Types objects.

RELATIONSHIP_TYPES#

is_valid_type#

| RelationshipsTypes.is_valid_type(_type)

RelationshipsFamily#

Relationships Family object list.

INDICATOR#

is_valid_type#

| RelationshipsFamily.is_valid_type(_type)

Relationships#

Enum: Relations names and their reverse

  • APPLIED

  • ATTACHMENT_OF

  • ATTACHES

  • ATTRIBUTE_OF

  • ATTRIBUTED_BY

  • ATTRIBUTED_TO

  • AUTHORED_BY

  • BEACONS_TO

  • BUNDLED_IN

  • BUNDLES

  • COMMUNICATED_WITH

  • COMMUNICATED_BY

  • COMMUNICATES_WITH

  • COMPROMISES

  • CONTAINS

  • CONTROLS

  • CREATED_BY

  • CREATES

  • DELIVERED_BY

  • DELIVERS

  • DETECTS

  • DETECTED_BY

  • DOWNLOADS

  • DOWNLOADS_FROM

  • DROPPED_BY

  • DROPS

  • DUPLICATE_OF

  • EMBEDDED_IN

  • EMBEDS

  • EXECUTED

  • EXECUTED_BY

  • EXFILTRATES_TO

  • EXPLOITS

  • HAS

  • HOSTED_ON

  • HOSTS

  • IMPERSONATES

  • INDICATED_BY

  • INDICATOR_OF

  • INJECTED_FROM

  • INJECTS_INTO

  • INVESTIGATES

  • IS_ALSO

  • LOCATED_AT

  • MITIGATED_BY

  • MITIGATES

  • ORIGINATED_FROM

  • OWNED_BY

  • OWNS

  • PART_OF

  • RELATED_TO

  • REMEDIATES

  • RESOLVED_BY

  • RESOLVED_FROM

  • RESOLVES_TO

  • SEEN_ON

  • SENT

  • SENT_BY

  • SENT_FROM

  • SENT_TO

  • SIMILAR_TO

  • SUB_DOMAIN_OF

  • SUB_TECHNIQUE_OF

  • PARENT_TECHNIQUE_OF

  • SUPRA_DOMAIN_OF

  • TARGETED_BY

  • TARGETS

  • TYPES

  • UPLOADED_TO

  • USED_BY

  • USED_ON

  • USES

  • VARIANT_OF

  • RELATIONSHIPS_NAMES

is_valid#

| Relationships.is_valid(_type)

Arguments:

  • _type str: the data to be returned and will be set to context

Returns:

  • bool - Is the given type supported

get_reverse#

| Relationships.get_reverse(name)

Arguments:

  • name str: Relationship name

Returns:

  • str - Returns the reversed relationship name

to_entry#

| EntityRelationship.to_entry()

Convert object to XSOAR entry

Returns:

  • dict - XSOAR entry representation.

to_indicator#

| EntityRelationship.to_indicator()

Convert object to XSOAR entry

Returns:

  • dict - XSOAR entry representation.

to_context#

| EntityRelationship.to_context()

Convert object to XSOAR context

Returns:

  • dict - XSOAR context representation.

EntryFormat#

Enum: contains all the entry formats (e.g. HTML, TABLE, JSON, etc.)

  • HTML

  • TABLE

  • JSON

  • TEXT

  • DBOT_RESPONSE

  • MARKDOWN

is_valid_type#

| EntryFormat.is_valid_type(cls, _type)

EntryType#

Enum: contains all the entry types (e.g. NOTE, ERROR, WARNING, FILE, etc.)

  • NOTE

  • DOWNLOAD_AGENT

  • FILE

  • ERROR

  • PINNED

  • USER_MANAGEMENT

  • IMAGE

  • PLAYGROUND_ERROR

  • ENTRY_INFO_FILE

  • VIDEO_FILE

  • WARNING

  • STATIC_VIDEO_FILE

  • MAP_ENTRY_TYPE

  • WIDGET

  • EXECUTION_METRICS

ErrorTypes#

Enum: contains all the available error types

  • SUCCESS

  • QUOTA_ERROR

  • GENERAL_ERROR

  • AUTH_ERROR

  • SERVICE_ERROR

  • CONNECTION_ERROR

  • PROXY_ERROR

  • SSL_ERROR

  • TIMEOUT_ERROR

  • RETRY_ERROR

ExecutionMetrics#

ExecutionMetrics is used to collect and format metric data to be reported to the XSOAR server.

is_supported#

| ExecutionMetrics.is_supported()

success#

| ExecutionMetrics.success()

success#

| ExecutionMetrics.success(value)

quota_error#

| ExecutionMetrics.quota_error()

quota_error#

| ExecutionMetrics.quota_error(value)

general_error#

| ExecutionMetrics.general_error()

general_error#

| ExecutionMetrics.general_error(value)

auth_error#

| ExecutionMetrics.auth_error()

auth_error#

| ExecutionMetrics.auth_error(value)

service_error#

| ExecutionMetrics.service_error()

service_error#

| ExecutionMetrics.service_error(value)

connection_error#

| ExecutionMetrics.connection_error()

connection_error#

| ExecutionMetrics.connection_error(value)

proxy_error#

| ExecutionMetrics.proxy_error()

proxy_error#

| ExecutionMetrics.proxy_error(value)

ssl_error#

| ExecutionMetrics.ssl_error()

ssl_error#

| ExecutionMetrics.ssl_error(value)

timeout_error#

| ExecutionMetrics.timeout_error()

timeout_error#

| ExecutionMetrics.timeout_error(value)

retry_error#

| ExecutionMetrics.retry_error()

retry_error#

| ExecutionMetrics.retry_error(value)

get_metric_list#

| ExecutionMetrics.get_metric_list()

update_metrics#

| ExecutionMetrics.update_metrics(metric_type, metric_value)

FeedIndicatorType#

Type of Indicator (Reputations), used in TIP integrations

Account#

CVE#

Domain#

DomainGlob#

Email#

File#

FQDN#

Host#

IP#

CIDR#

IPv6#

IPv6CIDR#

Registry#

SSDeep#

URL#

AS#

MUTEX#

Malware#

Identity#

Location#

Software#

X509#

is_valid_type#

| FeedIndicatorType.is_valid_type(_type)

list_all_supported_indicators#

| FeedIndicatorType.list_all_supported_indicators()

ip_to_indicator_type#

| FeedIndicatorType.ip_to_indicator_type(ip)

Returns the indicator type of the input IP.

Arguments:

  • ip str: IP address to get it's indicator type.

Returns:

  • str - : Indicator type from FeedIndicatorType, or None if invalid IP address.

indicator_type_by_server_version#

| FeedIndicatorType.indicator_type_by_server_version(indicator_type)

Returns the indicator type of the input by the server version. If the server version is 6.2 and greater, remove the STIX prefix of the type

Arguments:

  • indicator_type str: Type of an indicator.

Returns:

  • str - : Indicator type .

FileAttachmentType#

Enum: contains the file attachment types, Used to add metadata to the description of the attachment whether the file content is expected to be inline or attached as a file

Returns:

  • str - : The file attachment type

  • ATTACHED

FormatADTimestamp#

FormatADTimestamp(ts)

Formats an Active Directory timestamp into human readable time representation

Arguments:

  • ts int: The timestamp to be formatted (required)

Returns:

  • str - A string represeting the time

FormatIso8601#

FormatIso8601(t)

Convert a time expressed in seconds to ISO 8601 time format string

Arguments:

  • t int: Time expressed in seconds (required)

Returns:

  • str - An ISO 8601 time format string

GetDemistoVersion#

Callable class to replace get_demisto_version function

__call__#

| GetDemistoVersion.__call__()

Returns the Demisto version and build number.

Returns:

  • dict - Demisto version object if Demisto class has attribute demistoVersion, else raises AttributeError

GetMappingFieldsResponse#

Handler for the mapping fields object.

Arguments:

  • scheme_types_mapping list: List of all the mappings in the remote system.

add_scheme_type#

| GetMappingFieldsResponse.add_scheme_type(scheme_type_mapping)

Add another incident type mapping.

Arguments:

  • scheme_type_mapping dict: mapping of a singular field.

extract_mapping#

| GetMappingFieldsResponse.extract_mapping()

Extracts the mapping into XSOAR mapping screen.

Returns:

  • dict - the mapping object for the current field.

GetModifiedRemoteDataArgs#

get-modified-remote-data args parser

Arguments:

  • args dict: arguments for the command.

GetModifiedRemoteDataResponse#

get-modified-remote-data response parser

Arguments:

  • modified_incident_ids list: The incidents that were modified since the last check.

to_entry#

| GetModifiedRemoteDataResponse.to_entry()

Extracts the response

Returns:

  • list - List of incidents to run the get-remote-data command on.

GetRemoteDataArgs#

get-remote-data args parser

Arguments:

  • args dict: arguments for the command.

GetRemoteDataResponse#

get-remote-data response parser

Arguments:

  • mirrored_object dict: The object you are mirroring, in most cases the incident.

  • entries list: The entries you want to add to the war room.

extract_for_local#

| GetRemoteDataResponse.extract_for_local()

Extracts the response into the mirrored incident.

Returns:

  • list - List of details regarding the mirrored incident.

IncidentSeverity#

Enum: contains all the incident severity types

  • UNKNOWN

  • INFO

  • LOW

  • MEDIUM

  • HIGH

  • CRITICAL

IncidentStatus#

Enum: contains all the incidents status types (e.g. pending, active, done, archive)

  • PENDING

  • ACTIVE

  • DONE

  • ARCHIVE

IndicatorsSearcher#

Used in order to search indicators by the paging or serachAfter param

Arguments:

  • page int: the number of page from which we start search indicators from.

  • filter_fields Optional[str]: comma separated fields to filter (e.g. "value,type")

  • from_date Optional[str]: the start date to search from.

  • query Optional[str]: indicator search query

  • to_date Optional[str]: the end date to search until to.

  • value str: the indicator value to search.

  • limit Optional[int]: the current upper limit of the search (can be updated after init)

  • sort List[Dict]: An array of sort params ordered by importance. Item structure: {"field": string, "asc": boolean}

SEARCH_AFTER_TITLE#

__iter__#

| IndicatorsSearcher.__iter__()

next#

| IndicatorsSearcher.next()

__next__#

| IndicatorsSearcher.__next__()

page#

| IndicatorsSearcher.page()

total#

| IndicatorsSearcher.total()

limit#

| IndicatorsSearcher.limit()

limit#

| IndicatorsSearcher.limit(value)

is_search_done#

| IndicatorsSearcher.is_search_done()

Return True if one of these conditions is met (else False):

  1. self.limit is set, and it's updated to be less or equal to zero - return True
  2. for search_after if self.total was populated by a previous search, but no self._search_after_param
  3. for page if self.total was populated by a previous search, but page is too large

search_indicators_by_version#

| IndicatorsSearcher.search_indicators_by_version(from_date=None, query='', size=100, to_date=None, value='')

There are 2 cases depends on the sever version:

  1. Search indicators using paging, raise the page number in each call.
  2. Search indicators using searchAfter param, update the _search_after_param in each call.

Arguments:

  • from_date Optional[str]: the start date to search from.

  • query Optional[str]: indicator search query

  • size int: limit the number of returned results.

  • to_date Optional[str]: the end date to search until to.

  • value str: the indicator value to search.

Returns:

  • dict - object contains the search results

IndicatorsTimeline#

IndicatorsTimeline class - use to return Indicator Timeline object to be used in CommandResults

Arguments:

  • indicators list: expects a list of indicators.

  • category str: indicator category.

  • message str: indicator message.

InputArgument#

YML input argument for a command.

This is an empty class, used for code autocompletion when using demisto-sdk generate_yml_from_python command syntax. For more information, visit the command's README.md.

Returns:

  • InputArgument - The InputArgument object

IntegrationLogger#

a logger for python integrations: use LOG(<message>) to add a record to the logger (message can be any object with str) use LOG.print_log(verbose=True/False) to display all records in War-Room (if verbose) and server log. use add_replace_strs to add sensitive strings that should be replaced before going to the log.

Arguments:

  • message str: The message to be logged

_iter_sensistive_dict_obj#

| IntegrationLogger._iter_sensistive_dict_obj(dict_obj, sensitive_params)

encode#

| IntegrationLogger.encode(message)

__call__#

| IntegrationLogger.__call__(message)

add_replace_strs#

| IntegrationLogger.add_replace_strs(*args)

Add strings which will be replaced when logging. Meant for avoiding passwords and so forth in the log.

set_buffering#

| IntegrationLogger.set_buffering(state)

set whether the logger buffers messages or writes staight to the demisto log

Arguments:

  • state __: True/False

print_log#

| IntegrationLogger.print_log(verbose=False)

build_curl#

| IntegrationLogger.build_curl(text)

Parses the HTTP client "send" log messages and generates cURL queries out of them.

Arguments:

  • text str: The HTTP client log message.

write#

| IntegrationLogger.write(msg)

print_override#

| IntegrationLogger.print_override(*args, **kwargs)

JsonTransformer#

A class to transform a json to

Arguments:

  • flatten bool: Should we flatten the json using flattenCell (for BC)

  • keys Set[str]: Set of keys to keep

  • is_nested bool: If look for nested

  • func Callable: A function to parse the json

Constructor for JsonTransformer

Arguments:

  • flatten bool: Should we flatten the json using flattenCell (for BC)

  • keys Iterable[str]: an iterable of relevant keys list from the json. Notice we save it as a set in the class

  • is_nested bool: Whether to search in nested keys or not

  • func Callable: A function to parse the json

json_to_str#

| JsonTransformer.json_to_str(json_input, is_pretty=True)

json_to_path_generator#

| JsonTransformer.json_to_path_generator(json_input, path=None)

Arguments:

  • json_input list or dict: The json input to transform
  • path List[str + int]: The path of the key, value pair inside the json

:rtype Tuple[List[str + int], str, str]

LineWidget#

Line Widget representation

Arguments:

  • categories Any: a list of categories to display(Better use the add_category function to populate the data.

add_category#

| LineWidget.add_category(name, number, group)

Add a category to widget.

Arguments:

  • name str: the name of the category to add.

  • number int: the number value of the category.

  • group str: the name of the relevant group.

to_display#

| LineWidget.to_display()

NormalizeRegistryPath#

NormalizeRegistryPath(strRegistryPath)

Normalizes a registry path string

Arguments:

  • strRegistryPath str: The registry path (required)

Returns:

  • str - The normalized string

NumberWidget#

Number Widget representation

Arguments:

  • number int: The number for the widget to display.

to_display#

| NumberWidget.to_display()

OutputArgument#

YML output argument.

This is an empty class, used for code autocompletion when using demisto-sdk generate_yml_from_python command syntax. For more information, visit the command's README.md.

Returns:

  • OutputArgument - The OutputArgument object

ParameterTypes#

YML ConfKey key_type type.

This is an empty class, used for code autocompletion when using demisto-sdk generate_yml_from_python command syntax. For more information, visit the command's README.md.

Returns:

  • ParameterTypes - The ParameterTypes enum

STRING#

NUMBER#

ENCRYPTED#

BOOLEAN#

AUTH#

DOWNLOAD_LINK#

TEXT_AREA#

INCIDENT_TYPE#

TEXT_AREA_ENCRYPTED#

SINGLE_SELECT#

MULTI_SELECT#

PollResult#

The response object for polling functions. This object contains information about whether to run again, and what the CommandResults are in case of success, or failure.

Returns:

  • PollResult - PollResult

Constructor for PollResult

Arguments:

  • response Any: The response of the command in the event of success, or in case of failure but Polling is false

  • continue_to_poll Union[bool, Callable]: An iterable of relevant keys list from the json. Notice we save it as a set in the class

  • args_for_next_run Dict: The arguments to use in the next iteration. Will use the input args in case of None

  • partial_result CommandResults: CommandResults to return, even though we will poll again

PrettifyCompactedTimestamp#

PrettifyCompactedTimestamp(x)

Formats a compacted timestamp string into human readable time representation

Arguments:

  • x str: The timestamp to be formatted (required)

Returns:

  • str - A string represeting the time

ScheduledCommand#

ScheduledCommand configuration class Holds the scheduled command configuration for the command result - managing the way the command should be polled.

Arguments:

  • command str: The command that'll run after next_run_in_seconds has passed.

  • next_run_in_seconds int: How long to wait before executing the command.

  • args Optional[Dict[str, Any]]: Arguments to use when executing the command.

  • timeout_in_seconds Optional[int]: Number of seconds until the polling sequence will timeout.

  • items_remaining Optional[int]: Number of items that are remaining to be polled.

VERSION_MISMATCH_ERROR#

raise_error_if_not_supported#

| ScheduledCommand.raise_error_if_not_supported()

supports_polling#

| ScheduledCommand.supports_polling()

Check if the integration supports polling (if server version is greater than 6.2.0). Returns: Boolean

to_results#

| ScheduledCommand.to_results()

Returns the result dictionary of the polling command

SchemeTypeMapping#

Scheme type mappings builder.

Arguments:

  • type_name str: The name of the remote incident type.

  • fields dict: The dict of fields to their description.

add_field#

| SchemeTypeMapping.add_field(name, description='')

Adds a field to the incident type mapping.

Arguments:

  • name str: The name of the field.

  • description str: The description for that field.a

extract_mapping#

| SchemeTypeMapping.extract_mapping()

Extracts the mapping into XSOAR mapping screen.

Returns:

  • dict - the mapping object for the current field.

SmartGetDict#

A dict that when called with get(key, default) will return the default passed value, even if there is a value of "None" in the place of the key. Example with built-in dict:

**Examples**:
```python
>>> d = {}
>>> d['test'] = None
>>> d.get('test', 1)
>>> print(d.get('test', 1))
None

Example with SmartGetDict:

>>> d = SmartGetDict()
>>> d['test'] = None
>>> d.get('test', 1)
>>> print(d.get('test', 1))
1
**Returns**:
- `SmartGetDict` - SmartGetDict
#### get
```python
| SmartGetDict.get(key, default=None)

TableOrListWidget#

Table/List Widget representation

Arguments:

  • data Any: a list of data to display(Better use the add_category function to populate the data.

add_row#

| TableOrListWidget.add_row(data)

Add a row to the widget.

Arguments:

  • data Any: the data to add to the list/table.

to_display#

| TableOrListWidget.to_display()

TextWidget#

Text Widget representation

Arguments:

  • text str: The text for the widget to display

to_display#

| TextWidget.to_display()

Text Widget representation

Arguments:

  • text str: The text for the widget to display

ThreatIntel#

XSOAR Threat Intel Objects

ObjectsNames#

Enum: Threat Intel Objects names.

  • CAMPAIGN

  • ATTACK_PATTERN

  • REPORT

  • MALWARE

  • COURSE_OF_ACTION

  • INTRUSION_SET

  • TOOL

  • THREAT_ACTOR

  • INFRASTRUCTURE

ObjectsScore#

Enum: Threat Intel Objects Score.

  • CAMPAIGN

  • ATTACK_PATTERN

  • REPORT

  • MALWARE

  • COURSE_OF_ACTION

  • INTRUSION_SET

  • TOOL

  • THREAT_ACTOR

  • INFRASTRUCTURE

KillChainPhases#

Enum: Kill Chain Phases names.

  • BUILD_CAPABILITIES

  • PRIVILEGE_ESCALATION

  • ADVERSARY_OPSEC

  • CREDENTIAL_ACCESS

  • EXFILTRATION

  • LATERAL_MOVEMENT

  • DEFENSE_EVASION

  • PERSISTENCE

  • COLLECTION

  • IMPACT

  • INITIAL_ACCESS

  • DISCOVERY

  • EXECUTION

  • INSTALLATION

  • DELIVERY

  • WEAPONIZATION

  • ACT_ON_OBJECTIVES

  • COMMAND_AND_CONTROL

TrendWidget#

Trend Widget representation

Arguments:

  • current_number int: The Current number in the trend.

  • previous_number int: The previous number in the trend.

to_display#

| TrendWidget.to_display()

UpdateRemoteSystemArgs#

update-remote-system args parser

Arguments:

  • args dict: arguments for the command of the command.

YMLMetadataCollector#

The YMLMetadataCollector class provides decorators for integration functions which contain details relevant to yml generation.

This is an empty class, used for code autocompletion when using demisto-sdk generate_yml_from_python command syntax. For more information, visit the command's README.md.

Returns:

  • YMLMetadataCollector - The YMLMetadataCollector object

command#

| YMLMetadataCollector.command(command_name, outputs_prefix=None, outputs_list=None, inputs_list=None, execution=None, file_output=False, multiple_output_prefixes=False, deprecated=False, restore=False, description=None)

_find_relevant_module#

_find_relevant_module(line)

Find which module contains the given line number.

Arguments:

  • trace_str int: Line number to search. (required)

Returns:

  • str - The name of the module.

add_http_prefix_if_missing#

add_http_prefix_if_missing(address='')

This function adds http:// prefix to the proxy address in case it is missing.

Arguments:

  • address string: Proxy address.

Returns:

  • string - proxy address after the 'http://' prefix was added, if needed.

add_sensitive_log_strs#

add_sensitive_log_strs(sensitive_str)

Adds the received string to both LOG and DebugLogger. The logger will mask the string each time he encounters it.

Arguments:

  • sensitive_str str: The string to be replaced.

appendContext#

appendContext(key, data, dedup=False)

Append data to the investigation context

Arguments:

  • key str: The context path (required)

  • data any: Data to be added to the context (required)

  • dedup bool: True if de-duplication is required. Default is False.

append_metrics#

append_metrics(execution_metrics, results)

Returns a 'CommandResults' list appended with metrics.

Arguments:

  • execution_metrics ExecutionMetrics: Metrics object to be added to CommandResults list(optional).

  • results list: 'CommandResults' list to append metrics to (required).

Returns:

  • list - results appended with the metrics if the server version is supported.

argToBoolean#

argToBoolean(value)

Boolean-ish arguments that are passed through demisto.args() could be type bool or type string. This command removes the guesswork and returns a value of type bool, regardless of the input value's type. It will also return True for 'yes' and False for 'no'.

Arguments:

  • value __: the value to evaluate

Returns:

  • bool - a boolean representatation of 'value'

argToList#

argToList(arg, separator=',', transform=None)

Converts a string representation of args to a python list

Arguments:

  • arg str or list: Args to be converted (required)

  • separator str: A string separator to separate the strings, the default is a comma.

  • transform callable: A function transformer to transfer the returned list arguments.

Returns:

  • list - A python list of args

arg_to_datetime#

arg_to_datetime(arg, arg_name=None, is_utc=True, required=False, settings=None)

Converts an XSOAR argument to a datetime

This function is used to quickly validate an argument provided to XSOAR via demisto.args() into an datetime. It will throw a ValueError if the input is invalid. If the input is None, it will throw a ValueError if required is True, or None if required is ``False.

Arguments:

  • arg Any: argument to convert

  • arg_name str: argument name

  • is_utc bool: if True then date converted as utc timezone, otherwise will convert with local timezone.

  • required bool: throws exception if True and argument provided is None

  • settings dict: If provided, passed to dateparser.parse function.

returns an datetime if conversion works returns None if arg is None and required is set to False otherwise throws an Exception

Returns:

  • Optional[datetime] -

arg_to_number#

arg_to_number(arg, arg_name=None, required=False)

Converts an XSOAR argument to a Python int

This function is used to quickly validate an argument provided to XSOAR via demisto.args() into an int type. It will throw a ValueError if the input is invalid. If the input is None, it will throw a ValueError if required is True, or None if required is ``False.

Arguments:

  • arg Any: argument to convert

  • arg_name str: argument name

  • required bool: throws exception if True and argument provided is None

returns an int if arg can be converted returns None if arg is None and required is set to False otherwise throws an Exception

Returns:

  • Optional[int] -

assign_params#

assign_params(keys_to_ignore=None, values_to_ignore=None, **kwargs)

Creates a dictionary from given kwargs without empty values. empty values are: None, '', [], {}, ()

Examples:

>>> assign_params(a='1', b=True, c=None, d='')
{'a': '1', 'b': True}
>>> since_time = 'timestamp'
>>> assign_params(values_to_ignore=(15, ), sinceTime=since_time, b=15)
{'sinceTime': 'timestamp'}
>>> item_id = '1236654'
>>> assign_params(keys_to_ignore=['rnd'], ID=item_id, rnd=15)
{'ID': '1236654'}

Arguments:

  • keys_to_ignore tuple or list: Keys to ignore if exists

  • values_to_ignore tuple or list: Values to ignore if exists

  • kwargs kwargs: kwargs to filter

Returns:

  • dict - dict without empty values

auto_detect_indicator_type#

auto_detect_indicator_type(indicator_value)

Infer the type of the indicator.

Arguments:

  • indicator_value str: The indicator whose type we want to check. (required)

Returns:

  • str - The type of the indicator.

aws_table_to_markdown#

aws_table_to_markdown(response, table_header)

Converts a raw response from AWS into a markdown formatted table. This function checks to see if there is only one nested dict in the top level of the dictionary and will use the nested data.

Arguments:

  • response __: Raw response from AWS
  • table_header dict: The header string to use for the table.

Returns:

  • str - Markdown formatted table as a string.

b64_encode#

b64_encode(text)

Base64 encode a string. Wrapper function around base64.b64encode which will accept a string In py3 will encode the string to binary using utf-8 encoding and return a string result decoded using utf-8

Arguments:

  • text __: string to encode

Returns:

  • str - encoded string

batch#

batch(iterable, batch_size=1)

Gets an iterable and yields slices of it.

Arguments:

  • iterable list: list or other iterable object.

  • batch_size int: the size of batches to fetch

Returns:

  • list -

build_dbot_entry#

build_dbot_entry(indicator, indicator_type, vendor, score, description=None, build_malicious=True)

Build a dbot entry. if score is 3 adds malicious

Examples:

>>> build_dbot_entry('user@example.com', 'Email', 'Vendor', 1)
{'DBotScore': {'Indicator': 'user@example.com', 'Type': 'email', 'Vendor': 'Vendor', 'Score': 1}}
>>> build_dbot_entry('user@example.com', 'Email', 'Vendor', 3, build_malicious=False)
{'DBotScore': {'Indicator': 'user@example.com', 'Type': 'email', 'Vendor': 'Vendor', 'Score': 3}}
>>> build_dbot_entry('user@example.com', 'email', 'Vendor', 3, 'Malicious email')
{'DBotScore': {'Vendor': 'Vendor', 'Indicator': 'user@example.com', 'Score': 3, 'Type': 'email'}, \
'Account.Email(val.Address && val.Address == obj.Address)': {'Malicious': {'Vendor': 'Vendor', 'Description': \
'Malicious email'}, 'Address': 'user@example.com'}}
>>> build_dbot_entry('md5hash', 'md5', 'Vendor', 1)
{'DBotScore': {'Indicator': 'md5hash', 'Type': 'file', 'Vendor': 'Vendor', 'Score': 1}}

Arguments:

  • indicator str: indicator field. if using file hashes, can be dict

  • indicator_type str: type of indicator ('url, 'domain', 'ip', 'cve', 'email', 'md5', 'sha1', 'sha256', 'crc32', 'sha512', 'ctph')

  • vendor str: Integration ID

  • score int: DBot score (0-3)

  • description str or None: description (will be added to malicious if dbot_score is 3). can be None

  • build_malicious bool: if True, will add a malicious entry

Returns:

  • dict - dbot entry

build_malicious_dbot_entry#

build_malicious_dbot_entry(indicator, indicator_type, vendor, description=None)

Build Malicious dbot entry

Examples:

>>> build_malicious_dbot_entry('8.8.8.8', 'ip', 'Vendor', 'Google DNS')
{'IP(val.Address && val.Address == obj.Address)': {'Malicious': {'Vendor': 'Vendor', 'Description': 'Google DNS\
'}, 'Address': '8.8.8.8'}}
>>> build_malicious_dbot_entry('md5hash', 'MD5', 'Vendor', 'Malicious File')
{'File(val.MD5 && val.MD5 == obj.MD5 || val.SHA1 && val.SHA1 == obj.SHA1 || val.SHA256 && val.SHA256 == obj.SHA\
256 || val.SHA512 && val.SHA512 == obj.SHA512 || val.CRC32 && val.CRC32 == obj.CRC32 || val.CTPH && val.CTPH == obj.CTP\
H || val.SSDeep && val.SSDeep == obj.SSDeep)': {'Malicious': {'Vendor': 'Vendor', 'Description': 'Malicious File'}\
, 'MD5': 'md5hash'}}

Arguments:

  • indicator str: Value (e.g. 8.8.8.8)

  • indicator_type str: e.g. 'IP'

  • vendor str: Integration ID

  • description str: Why it's malicious

Returns:

  • dict - A malicious DBot entry

calculate_new_offset#

calculate_new_offset(old_offset, num_incidents, total_incidents)

This calculates the new offset based on the response

Arguments:

  • old_offset int: The offset from the previous run

  • num_incidents int: The number of incidents returned by the API.

  • total_incidents int: The total number of incidents returned by the API.

Returns:

  • int - The new offset for the next run.

camel_case_to_underscore#

camel_case_to_underscore(s)

Converts a camelCase string to snake_case

Arguments:

  • s str: The string to convert (e.g. helloWorld) (required)

Returns:

  • str - The converted string (e.g. hello_world)

camelize#

camelize(src, delim=' ', upper_camel=True)

Convert all keys of a dictionary (or list of dictionaries) to CamelCase (with capital first letter)

Arguments:

  • src dict or list: The dictionary (or list of dictionaries) to convert the keys for. (required)

  • delim str: The delimiter between two words in the key (e.g. delim=' ' for "Start Date"). Default ' '.

  • upper_camel bool: When True then transforms dictionary keys to camel case with the first letter capitalised (for example: demisto_content to DemistoContent), otherwise the first letter will not be capitalised (for example: demisto_content to demistoContent).

Returns:

  • dict or list - The dictionary (or list of dictionaries) with the keys in CamelCase.

camelize_string#

camelize_string(src_str, delim='_', upper_camel=True)

Transform snake_case to CamelCase

Arguments:

  • src_str str: snake_case string to convert.

  • delim str: indicator category.

  • upper_camel bool: When True then transforms string to camel case with the first letter capitalised (for example: demisto_content to DemistoContent), otherwise the first letter will not be capitalised (for example: demisto_content to demistoContent).

Returns:

  • str - A CammelCase string.

censor_request_logs#

censor_request_logs(request_log)

Censors the request logs generated from the urllib library directly by replacing sensitive information such as tokens and cookies with a mask. In most cases, the sensitive value is the first word after the keyword, but in some cases, it is the second one.

Arguments:

  • request_log __: The request log to censor

Returns:

  • str - The censored request log

comma_separated_mapping_to_dict#

comma_separated_mapping_to_dict(raw_text)

Transforming a textual comma-separated mapping into a dictionary object.

Arguments:

  • raw_text str: Comma-separated mapping e.g ('key1=value1', 'key2=value2', ...)

Returns:

  • dict -

convert_dict_values_bytes_to_str#

convert_dict_values_bytes_to_str(input_dict)

Converts byte dict values to str

Arguments:

  • input_dict dict: dict to converts its values.

Returns:

  • dict - dict contains str instead of bytes.

createContext#

createContext(data, id=None, keyTransform=None, removeNull=False)

Receives a dict with flattened key values, and converts them into nested dicts

Arguments:

  • data dict or list: The data to be added to the context (required)

  • id str: The ID of the context entry

  • keyTransform function: A formatting function for the markdown table headers

  • removeNull bool: True if empty columns should be removed, false otherwise

Returns:

  • list - The converted context list

createContextSingle#

createContextSingle(obj, id=None, keyTransform=None, removeNull=False)

Receives a dict with flattened key values, and converts them into nested dicts

Arguments:

  • obj dict or list: The data to be added to the context (required)

  • id str: The ID of the context entry

  • keyTransform function: A formatting function for the markdown table headers

  • removeNull bool: True if empty columns should be removed, false otherwise

Returns:

  • list - The converted context list

create_clickable_url#

create_clickable_url(url, text=None)

Make the given url clickable when in markdown format by concatenating itself, with the proper brackets

Arguments:

  • url Union[List[str], str]: the url of interest or a list of urls

  • text Union[List[str], str, None]: the text of the url or a list of texts of urls.

Returns:

  • Union[List[str], str] - Markdown format for clickable url

create_indicator_result_with_dbotscore_unknown#

create_indicator_result_with_dbotscore_unknown(indicator, indicator_type, reliability=None, context_prefix=None, address_type=None, relationships=None)

Used for cases where the api response to an indicator is not found, returns CommandResults with readable_output generic in this case, and indicator with DBotScore unknown

Arguments:

  • indicator str: The value of the indicator

  • indicator_type DBotScoreType: use DBotScoreType class [Unsupport in types CVE and ATTACKPATTERN]

  • reliability DBotScoreReliability: use DBotScoreReliability class

  • context_prefix str: Use only in case that the indicator is CustomIndicator

  • address_type str: Use only in case that the indicator is Cryptocurrency

  • relationships list of EntityRelationship: List of relationships of the indicator.

Returns:

  • CommandResults -

create_updated_last_run_object#

create_updated_last_run_object(last_run, incidents, fetch_limit, look_back, start_fetch_time, end_fetch_time, created_time_field, date_format='%Y-%m-%dT%H:%M:%S', increase_last_run_time=False, new_offset=None)

Calculates the next fetch time and limit depending the incidents result and creates an updated LastRun object with the new time and limit.

Arguments:

  • last_run dict: The LastRun object

  • incidents list: List of the incidents result

  • fetch_limit int: The fetch limit

  • look_back int: The time to look back in fetch in minutes

  • start_fetch_time str: The time the fetch started to fetch from

  • end_fetch_time str: The end time in which the fetch incidents ended

  • created_time_field str: The incident created time field

  • date_format str: The date format

  • increase_last_run_time bool: Whether to increase the last run time with one millisecond

  • new_offset int | None: The new offset to set in the last run

Returns:

  • Dict - The new LastRun object

date_to_timestamp#

date_to_timestamp(date_str_or_dt, date_format='%Y-%m-%dT%H:%M:%S')

Parses date_str_or_dt in the given format (default: %Y-%m-%dT%H:%M:%S) to milliseconds Examples: ('2018-11-06T08:56:41', '2018-11-06T08:56:41', etc.)

Arguments:

  • date_str_or_dt str or datetime.datetime: The date to be parsed. (required)

  • date_format str: The date format of the date string (will be ignored if date_str_or_dt is of type datetime.datetime). (optional)

Returns:

  • int - The parsed timestamp.

datetime_to_string#

datetime_to_string(datetime_obj)

Converts a datetime object into a string. When used with json.dumps() for the default parameter, e.g. json.dumps(response, default=datetime_to_string) datetime_to_string allows entire JSON objects to be safely added to context without causing any datetime marshalling errors.

Arguments:

  • datetime_obj __: Datetime object.

Returns:

  • str - String representation of a datetime object.

detect_file_indicator_type#

detect_file_indicator_type(indicator_value)

Detect the type of the file indicator.

Arguments:

  • indicator_value str: The indicator whose type we want to check. (required)

Returns:

  • str - The type of the indicator.

dict_safe_get#

dict_safe_get(dict_object, keys, default_return_value=None, return_type=None, raise_return_type=True)

Recursive safe get query (for nested dicts and lists), If keys found return value otherwise return None or default value. Example:

Examples:

>>> data = {"something" : {"test": "A"}}
>>> dict_safe_get(data, ['something', 'test'])
>>> 'A'
>>> dict_safe_get(data, ['something', 'else'], 'default value')
>>> 'default value'

Arguments:

  • dict_object dict: dictionary to query.

  • keys list: keys for recursive get.

  • default_return_value object: Value to return when no key available.

  • return_type type: Excepted return type.

  • raise_return_type bool: Whether to raise an error when the value didn't match the expected return type.

Returns:

  • object -

elem2json#

elem2json(elem, options, strip_ns=1, strip=1)

Convert an ElementTree or Element into a JSON string.

elem_to_internal#

elem_to_internal(elem, strip_ns=1, strip=1)

Convert an Element into an internal dictionary (not JSON!).

encode_string_results#

encode_string_results(text)

Encode string as utf-8, if any unicode character exists.

Arguments:

  • text __: string to encode

Returns:

  • str - encoded string

ensure_proxy_has_http_prefix#

ensure_proxy_has_http_prefix()

The function checks if proxy environment vars are missing http/https prefixes, and adds http if so.

execute_command#

execute_command(command, args, extract_contents=True, fail_on_error=True)

Runs the demisto.executeCommand() function and checks for errors.

Arguments:

  • command str: The command to run. (required)

  • args dict: The command arguments. (required)

  • extract_contents bool: Whether to return only the Contents part of the results. Default is True.

  • fail_on_error bool: Whether to fail the command when receiving an error from the command. Default is True.

Returns:

  • `` - The command results.
  • When fail_on_error is True - list or dict or str.
  • When fail_on_error is False -bool and str.

Note: For backward compatibility, only when fail_on_error is set to False, two values will be returned.

fileResult#

fileResult(filename, data, file_type=None)

Creates a file from the given data

Arguments:

  • filename str: The name of the file to be created (required)

  • data str or bytes: The file data (required)

  • file_type str: one of the entryTypes file or entryInfoFile (optional)

Returns:

  • dict - A Demisto war room entry

file_result_existing_file#

file_result_existing_file(filename, saveFilename=None)

Rename an existing file

Arguments:

  • filename str: The name of the file to be modified (required)

  • saveFilename str: The new file name

Returns:

  • dict - A Demisto war room entry

filter_incidents_by_duplicates_and_limit#

filter_incidents_by_duplicates_and_limit(incidents_res, last_run, fetch_limit, id_field)

Removes duplicate incidents from response and returns the incidents till limit. The function should be called after getting the get-incidents API response, and by passing the id_field it will filter out the incidents that were already fetched by checking the incident IDs that are saved from the previous fetch in the last run object

Arguments:

  • incidents_res list: The incidents from the API response

  • last_run dict: The LastRun object

  • fetch_limit int: The incidents limit to return

  • id_field str: The incident id field

Returns:

  • list - List of incidents after filtering duplicates when len(incidents) <= limit

fix_traceback_line_numbers#

fix_traceback_line_numbers(trace_str)

Fixes the given traceback line numbers.

Arguments:

  • trace_str str: The traceback string to edit. (required)

Returns:

  • str - The new formated traceback.

flattenCell#

flattenCell(data, is_pretty=True)

Flattens a markdown table cell content into a single string

Arguments:

  • data str or list: The cell content (required)

  • is_pretty bool: Should cell content be pretified (default is True)

Returns:

  • str - A sting representation of the cell content

flattenRow#

flattenRow(rowDict)

Flatten each element in the given rowDict

Arguments:

  • rowDict dict: The dict to be flattened (required)

Returns:

  • dict - A flattened dict

flattenTable#

flattenTable(tableDict)

Flatten each row in the given tableDict

Arguments:

  • tableDict dict: The table to be flattened (required)

Returns:

  • dict - A flattened table

formatAllArgs#

formatAllArgs(args, kwds)

makes a nice string representation of all the arguments

Arguments:

  • args list: function arguments (required)

  • kwds dict: function keyword arguments (required)

Returns:

  • string - string representation of all the arguments

formatCell#

formatCell(data, is_pretty=True, json_transform=None)

Convert a given object to md while decending multiple levels

Arguments:

  • data str or list or dict: The cell content (required)

  • is_pretty bool: Should cell content be prettified (default is True)

  • json_transform JsonTransformer: The Json transform object to transform the data

Returns:

  • str - The formatted cell content as a string

formatEpochDate#

formatEpochDate(t)

Convert a time expressed in seconds since the epoch to a string representing local time

Arguments:

  • t int: Time represented in seconds (required)

Returns:

  • str - A string representing local time

generic_http_request#

generic_http_request(method, server_url, timeout=60, verify=True, proxy=False, client_headers=None, headers=None, url_suffix=None, data=None, ok_codes=None, auth=None, error_handler=None, files=None, params=None, retries=0, resp_type='json', status_list_to_retry=None, json_data=None, return_empty_response=False, backoff_factor=5, raise_on_redirect=False, raise_on_status=False, empty_valid_codes=None, params_parser=None, with_metrics=False, **kwargs)

A wrapper for the BaseClient._http_request() method, that allows performing HTTP requests without initiating a BaseClient object. Note: Avoid using this method if unnecessary. It is more recommended to use the BaseClient class.

Args: method (str): HTTP request method (e.g., GET, POST, PUT, DELETE). server_url (str): Base URL of the server. timeout (int, optional): Timeout in seconds for the request (defaults to 10). verify (bool, optional): Whether to verify SSL certificates (defaults to True). proxy (bool or str, optional): Use a proxy server. Can be a boolean (defaults to False) or a proxy URL string. client_headers (dict, optional): Additional headers to be included in all requests made by the client (overrides headers argument). headers (dict, optional): Additional headers for this specific request. url_suffix (str, optional): Path suffix to be appended to the server URL. data (object, optional): Data to be sent in the request body (e.g., dictionary for POST requests). ok_codes (list of int, optional): A list of HTTP status codes that are considered successful responses (defaults to [200]). auth (tuple, optional): Authentication credentials (username, password) for the request. error_handler (callable, optional): Function to handle request errors. files (dict, optional): Dictionary of files to be uploaded (for multipart/form-data requests). params (dict, optional): URL parameters to be included in the request. retries (int, optional): Number of times to retry the request on failure (defaults to 0). retries (int, optional): Number of times to retry the request on failure (defaults to 0). status_list_to_retry (int, optional): A set of integer HTTP status codes that we should force a retry on. A retry is initiated if the request method is in ['GET', 'POST', 'PUT'] and the response status code is in status_list_to_retry. resp_type (iterable, optional): Determines which data format to return from the HTTP request. The default is 'json'. json_data (dict, optional): The dictionary to send in a 'POST' request. backoff_factor (float, optional): A backoff factor to apply between attempts after the second try (most errors are resolved immediately by a second try without a delay). urllib3 will sleep for::

{backoff factor} * (2 ** ({number of total retries} - 1))

seconds. If the backoff_factor is 0.1, then :func:.sleep will sleep for [0.0s, 0.2s, 0.4s, ...] between retries. It will never be longer than :attr:Retry.BACKOFF_MAX.

By default, backoff_factor set to 5

raise_on_redirect (bool, optional): Whether, if the number of redirects is exhausted, to raise a MaxRetryError, or to return a response with a response code in the 3xx range.

raise_on_status (bool,optional): Similar meaning to raise_on_redirect: whether we should raise an exception, or return a response, if status falls in status_forcelist range and retries have been exhausted.

empty_valid_codes (list, optional): A list of all valid status codes of empty responses (usually only 204, but can vary)

return_empty_response (bool, optional): Whether to return an empty response body if the response code is in empty_valid_codes

params_parser (callable, optional): How to quote the params. By default, spaces are replaced with + and / to %2F. see here for more info: https://docs.python.org/3/library/urllib.parse.html#urllib.parse.urlencode Note! supported only in python3.

with_metrics (bool, optional): Whether or not to calculate execution metrics from the response

Returns:

Returns:

  • dict or str or bytes or xml.etree.ElementTree.Element or requests.Response - Depends on the resp_type parameter

Raises: exceptions.RequestException: If an error occurs during the request.

get_current_time#

get_current_time(time_zone=0)

Gets the current time in a given timezone, as time awared datetime.

Arguments:

  • time_zone int: The time zone offset in hours.

Returns:

  • datetime - The current time.

get_demisto_version_as_str#

get_demisto_version_as_str()

Get the Demisto Server version as a string <version>-<build>. If unknown will return: 'Unknown'. Meant to be use in places where we want to display the version. If you want to perform logic based upon vesrion use: is_demisto_version_ge.

Returns:

  • dict - Demisto version as string

get_error#

get_error(execute_command_result)

execute_command_result must contain error entry - check the result first with is_error function if there is no error entry in the result then it will raise an Exception

Arguments:

  • execute_command_result dict or list: result of demisto.executeCommand()

Returns:

  • string - Error message extracted from the demisto.executeCommand() result

get_feed_last_run#

get_feed_last_run()

This function gets the feed's last run: using demisto.getLastRun().

Returns:

  • dict -

get_fetch_run_time_range#

get_fetch_run_time_range(last_run, first_fetch, look_back=0, timezone=0, date_format='%Y-%m-%dT%H:%M:%S')

Calculates the time range for fetch depending the look_back argument and the previous fetch start time given from the last_run object.

Arguments:

  • last_run dict: The LastRun object

  • first_fetch str: The first time to fetch, used in the first fetch of an instance

  • look_back int: The time to look back in fetch in minutes

  • timezone int: The time zone offset in hours

  • date_format str: The date format

Returns:

  • Tuple - The time range (start_time, end_time) of the creation date for the incidents to fetch in the current run.

get_found_incident_ids#

get_found_incident_ids(last_run, incidents, look_back, id_field, remove_incident_ids)

Gets the found incident ids from the last run object and adds the new fetched incident IDs.

Arguments:

  • last_run dict: The LastRun object

  • incidents list: List of incidents to add

  • look_back int: The look back time in minutes

  • id_field str: The incident id field

Returns:

  • dict - The new incident ids.

get_hash_type#

get_hash_type(hash_file)

Checks the type of the given hash. Returns 'md5', 'sha1', 'sha256' or 'Unknown'.

Arguments:

  • hash_file str: The hash to be checked (required)

Returns:

  • str - The hash type

get_integration_context#

get_integration_context(sync=True, with_version=False)

Gets the integration context.

Arguments:

  • sync bool: Whether to get the integration context directly from the DB.

  • with_version bool: Whether to return the version.

Returns:

  • dict -

get_integration_context_with_version#

get_integration_context_with_version(sync=True)

Get the latest integration context with version, if available.

Arguments:

  • sync bool: Whether to get the context directly from the DB.

Returns:

  • tuple -

get_integration_instance_name#

get_integration_instance_name()

Getting calling integration instance name

Returns:

  • str - Calling integration instance name

get_integration_name#

get_integration_name()

Getting calling integration's name

Returns:

  • str - Calling integration's name

get_last_mirror_run#

get_last_mirror_run()

This function gets the last run of the mirror, from XSOAR version 6.6.0, using demisto.getLastMirrorRun(). Before XSOAR version 6.6.0, the given data is not returned and an exception will be raised.

Returns:

  • dict - we did not set anything yet).

get_latest_incident_created_time#

get_latest_incident_created_time(incidents, created_time_field, date_format='%Y-%m-%dT%H:%M:%S', increase_last_run_time=False)

Gets the latest incident created time

Arguments:

  • incidents list: List of incidents

  • created_time_field str: The incident created time field

  • date_format str: The date format

  • increase_last_run_time bool: Whether to increase the last run time with one millisecond

Returns:

  • str - The latest incident time

get_message_classes_dump#

get_message_classes_dump(classes_as_list)

A function that returns the printable message about classes dump

Arguments:

  • classes_as_list list: The classes to print to the log

Returns:

  • str - Message to print.

get_message_global_vars#

get_message_global_vars()

A function that returns the printable message about global variables

Returns:

  • str - Message to print.

get_message_local_vars#

get_message_local_vars()

A function that returns the printable message about local variables

Returns:

  • str - Message to print.

get_message_memory_dump#

get_message_memory_dump(_sig, _frame)

Listener function to dump the memory to log info

Arguments:

  • _sig int: The signal number

  • _frame Any: The current stack frame

Returns:

  • str - Message to print.

get_message_modules_sizes#

get_message_modules_sizes()

A function that returns the printable message about the loaded modules by size

Returns:

  • str - Message to print.

get_message_threads_dump#

get_message_threads_dump(_sig, _frame)

Listener function to dump the threads to log info

Arguments:

  • _sig int: The signal number

  • _frame Any: The current stack frame

Returns:

  • str - Message to print.

get_pack_version#

get_pack_version(pack_name='')

Get a pack version. The version can be retrieved either by a pack name or by the calling script/integration in which script/integration is part of.

To get the version of the pack in which the calling script/integration is part of, just call the function without pack_name.

Arguments:

  • pack_name str: the pack name as mentioned in the pack metadata file to query its version. use only if querying by a pack name.

case provided. in case not found returns empty string.

Returns:

  • str - The pack version in which the integration/script is part of / the version of the requested pack name in

get_schedule_metadata#

get_schedule_metadata(context)

Get the entry schedule metadata if available

Arguments:

  • context dict: Context in which the command was executed.

Returns:

  • dict - Dict with metadata of scheduled entry

get_script_name#

get_script_name()

Getting calling script name

Returns:

  • str - Calling script name

get_server_config#

get_server_config()

Retrieves XSOAR server configuration.

Returns:

  • dict -

get_size_of_object#

get_size_of_object(input_object)

A function that recursively iterate to sum size of object & members.

Arguments:

  • input_object Any: The object to calculate its memory footprint

Returns:

  • int - Size of input_object in bytes, or -1 if cannot determine the size.

get_tenant_account_name#

get_tenant_account_name()

Gets the tenant name from the server url.

Returns:

  • str - The account name.

get_x_content_info_headers#

get_x_content_info_headers()

Get X-Content-* headers to send in outgoing requests to use when performing requests to external services such as oproxy.

Returns:

  • dict - headers dict

handle_proxy#

handle_proxy(proxy_param_name='proxy', checkbox_default_value=False, handle_insecure=True, insecure_param_name=None)

Handle logic for routing traffic through the system proxy. Should usually be called at the beginning of the integration, depending on proxy checkbox state.

Additionally will unset env variables REQUESTS_CA_BUNDLE and CURL_CA_BUNDLE if handle_insecure is speficied (default). This is needed as when these variables are set and a requests.Session object is used, requests will ignore the Sesssion.verify setting. See: https://github.com/psf/requests/blob/master/requests/sessions.py#L703

Arguments:

  • proxy_param_name string: name of the "use system proxy" integration parameter

  • checkbox_default_value bool: Default value of the proxy param checkbox

  • handle_insecure bool: Whether to check the insecure param and unset env variables

  • insecure_param_name string: Name of insecure param. If None will search insecure and unsecure

Returns:

  • dict - proxies dict for the 'proxies' parameter of 'requests' functions

handle_proxy_for_long_running#

handle_proxy_for_long_running(proxy_param_name='proxy', checkbox_default_value=False, handle_insecure=True, insecure_param_name=None)

Handle logic for long running integration routing traffic through the system proxy. Should usually be called at the beginning of the integration, depending on proxy checkbox state. Long running integrations on hosted tenants XSOAR8 and XSIAM has a dedicated env. var.: CRTX_HTTP_PROXY. Fallback call to handle_proxy in cases long running integration on engine or XSOAR6

Arguments:

  • proxy_param_name string: name of the "use system proxy" integration parameter

  • checkbox_default_value bool: Default value of the proxy param checkbox

  • handle_insecure bool: Whether to check the insecure param and unset env variables

  • insecure_param_name string: Name of insecure param. If None will search insecure and unsecure

Returns:

  • Tuple[dict, boolean] - proxies dict for the 'proxies' parameter of 'requests' functions and use_ssl boolean

has_passed_time_threshold#

has_passed_time_threshold(timestamp_str, seconds_threshold)

Checks if the time difference between the current time and the timestamp is greater than the threshold.

Arguments:

  • timestamp_str str: The timestamp to compare the current time to.
  • seconds_threshold int: The threshold in seconds.

Returns:

  • bool - True if the time difference is greater than the threshold, otherwise False.

hash_djb2#

hash_djb2(s, seed=5381)

Hash string with djb2 hash function

Arguments:

  • s str: The input string to hash

  • seed int: The seed for the hash function (default is 5381)

Returns:

  • int - The hashed value

indicators_value_to_clickable#

indicators_value_to_clickable(indicators)

Function to get the indicator url link for indicators

Arguments:

  • indicators dict + List[dict]: An indicator or a list of indicators

Returns:

  • dict -

internal_to_elem#

internal_to_elem(pfsh, factory=ET.Element)

Convert an internal dictionary (not JSON!) into an Element. Whatever Element implementation we could import will be used by default; if you want to use something else, pass the Element class as the factory parameter.

isCommandAvailable#

isCommandAvailable(cmd)

Check the list of available modules to see whether a command is currently available to be run.

Arguments:

  • cmd str: The command to check (required)

Returns:

  • bool - True if command is available, False otherwise

is_debug_mode#

is_debug_mode()

Return if this script/command was passed debug-mode=true option

Returns:

  • bool - true if debug-mode is enabled

is_demisto_version_ge#

is_demisto_version_ge(version, build_number='')

Utility function to check if current running integration is at a server greater or equal to the passed version

Arguments:

  • version str: Version to check

  • build_number str: Build number to check

Returns:

  • bool - True if running within a Server version greater or equal than the passed version

is_error#

is_error(execute_command_result)

Check if the given execute_command_result has an error entry

Arguments:

  • execute_command_result dict or list: Demisto entry (required) or result of demisto.executeCommand()

Returns:

  • bool - True if the execute_command_result has an error entry, false otherwise

is_filename_valid#

is_filename_valid(filename)

Checking if the file name contains invalid characters.

Arguments:

  • filename __: The file name

Returns:

  • bool - True if valid otherwise False.

is_integration_command_execution#

is_integration_command_execution()

This function determines whether the current execution a script execution or a integration command execution.

Returns:

  • bool - Is the current execution a script execution or a integration command execution.

is_ip_valid#

is_ip_valid(s, accept_v6_ips=False)

Checks if the given string represents a valid IP address. By default, will only return 'True' for IPv4 addresses.

Arguments:

  • s str: The string to be checked (required)
  • accept_v6_ips bool: A boolean determining whether the function should accept IPv6 addresses

Returns:

  • bool - True if the given string represents a valid IP address, False otherwise

is_ipv6_valid#

is_ipv6_valid(address)

Checks if the given string represents a valid IPv6 address.

Arguments:

  • address str: The string to check.

Returns:

  • bool - True if the given string represents a valid IPv6 address.

is_mac_address#

is_mac_address(mac)

Test for valid mac address

Arguments:

  • mac str: MAC address in the form of AA:BB:CC:00:11:22

Returns:

  • bool - True/False

is_scheduled_command_retry#

is_scheduled_command_retry()

Determines if the current command is a polling retry command. This is useful if some actions should not be performed when a command is polling for a response such as submitting data for processing.

Returns:

  • Bool - True if the command is part of a polling retry, otherwise false

is_time_sensitive#

is_time_sensitive()

Checks if the command reputation (auto-enrichment) is called as auto-extract=inline. This function checks if the 'isTimeSensitive' attribute exists in the 'demisto' object and if it's set to True.

Returns:

  • bool - bool

is_using_engine#

is_using_engine()

Determines whether or not the platform is using engine.

Returns:

  • bool - True iff the platform is using engine.

is_versioned_context_available#

is_versioned_context_available()

Determines whether versioned integration context is available according to the server version.

Returns:

  • bool -

is_xsiam#

is_xsiam()

Determines whether or not the platform is XSIAM.

Returns:

  • bool - True iff the platform is XSIAM.

is_xsiam_or_xsoar_saas#

is_xsiam_or_xsoar_saas()

Determines whether or not the platform is XSIAM or XSOAR SAAS.

Returns:

  • bool - True iff the platform is XSIAM or XSOAR SAAS.

is_xsoar#

is_xsoar()

Determines whether or not the platform is XSOAR.

Returns:

  • bool - True iff the platform is XSOAR.

is_xsoar_hosted#

is_xsoar_hosted()

Determines whether or not the platform is XSOAR hosted.

Returns:

  • bool - True iff the platform is XSOAR hosted.

is_xsoar_on_prem#

is_xsoar_on_prem()

Determines whether or not the platform is a XSOAR on-prem.

Returns:

  • bool - True iff the platform is XSOAR on-prem.

is_xsoar_saas#

is_xsoar_saas()

Determines whether or not the platform is XSOAR SAAS.

Returns:

  • bool - True iff the platform is XSOAR SAAS.

json2elem#

json2elem(json_data, factory=ET.Element)

Convert a JSON string into an Element. Whatever Element implementation we could import will be used by default; if you want to use something else, pass the Element class as the factory parameter.

json2xml#

json2xml(json_data, factory=ET.Element)

Convert a JSON string into an XML string. Whatever Element implementation we could import will be used by default; if you want to use something else, pass the Element class as the factory parameter.

logger#

logger(func)

decorator function to log the function call using LOG

Arguments:

  • func function: function to call (required)

Returns:

  • any - returns the func return value.

merge_lists#

merge_lists(original_list, updated_list, key)

Replace values in a list with those in an updated list. Example:

Examples:

>>> original = [{'id': '1', 'updated': 'n'}, {'id': '2', 'updated': 'n'}, {'id': '11', 'updated': 'n'}]
>>> updated = [{'id': '1', 'updated': 'y'}, {'id': '3', 'updated': 'y'}, {'id': '11', 'updated': 'n',
>>> 'remove': True}]
>>> result = [{'id': '1', 'updated': 'y'}, {'id': '2', 'updated': 'n'}, {'id': '3', 'updated': 'y'}]

Arguments:

  • original_list list: The original list.

  • updated_list list: The updated list.

  • key str: The key to replace elements by.

Returns:

  • list -

parse_date_range#

parse_date_range(date_range, date_format=None, to_timestamp=False, timezone=0, utc=True)

THIS FUNCTTION IS DEPRECATED - USE dateparser.parse instead

Parses date_range string to a tuple date strings (start, end). Input must be in format 'number date_range_unit') Examples: (2 hours, 4 minutes, 6 month, 1 day, etc.)

Arguments:

  • date_range str: The date range to be parsed (required)

  • date_format str: Date format to convert the date_range to. (optional)

  • to_timestamp bool: If set to True, then will return time stamp rather than a datetime.datetime. (optional)

  • timezone int: timezone should be passed in hours (e.g if +0300 then pass 3, if -0200 then pass -2).

  • utc bool: If set to True, utc time will be used, otherwise local time.

Returns:

  • (datetime.datetime, datetime.datetime) or (int, int) or (str, str) - The parsed date range.

parse_date_string#

parse_date_string(date_string, date_format='%Y-%m-%dT%H:%M:%S')

Parses the date_string function to the corresponding datetime object. Note: If possible (e.g. running Python 3), it is suggested to use dateutil.parser.parse or dateparser.parse functions instead.

Examples:

>>> parse_date_string('2019-09-17T06:16:39Z')
datetime.datetime(2019, 9, 17, 6, 16, 39)
>>> parse_date_string('2019-09-17T06:16:39.22Z')
datetime.datetime(2019, 9, 17, 6, 16, 39, 220000)
>>> parse_date_string('2019-09-17T06:16:39.4040+05:00', '%Y-%m-%dT%H:%M:%S+02:00')
datetime.datetime(2019, 9, 17, 6, 16, 39, 404000)

Arguments:

  • date_string str: The date string to parse. (required)

  • date_format str: The date format of the date string. If the date format is known, it should be provided. (optional)

Returns:

  • (datetime.datetime, datetime.datetime) - The parsed datetime.

parse_json_string#

parse_json_string(json_string)

Parse a JSON string into a Python dictionary.

Arguments:

  • json_string str: The JSON string to be parsed.

Returns:

  • dict -

pascalToSpace#

pascalToSpace(s)

Converts pascal strings to human readable (e.g. "ThreatScore" -> "Threat Score", "thisIsIPAddressName" -> "This Is IP Address Name"). Could be used as headerTransform

Arguments:

  • s str: The string to be converted (required)

Returns:

  • str - The converted string

polling_function#

polling_function(name, interval=30, timeout=600, poll_message='Fetching Results:', polling_arg_name="polling", requires_polling_arg=True)

To use on a function that should rerun itself Commands that use this decorator must have a Polling argument, polling: true in yaml, and a hidden hide_polling_output argument. Commands that use this decorator should return a PollResult. Will raise an DemistoException if the server version doesn't support Scheduled Commands (< 6.2.0)

Arguments:

  • name str: The name of the command

  • interval int: How many seconds until the next run

  • timeout int: How long

  • poll_message str: The message to display in the war room while polling

  • requires_polling_arg bool: Whether a polling argument should be expected as one of the demisto args

Returns:

  • Function - Decorator for polling functions

positiveFile#

positiveFile(entry)

Checks if the given entry from a file reputation query is positive (known bad) (deprecated)

Arguments:

  • entry dict: File entry (required)

Returns:

  • bool - True if bad, false otherwise

positiveIp#

positiveIp(entry)

Checks if the given entry from a file reputation query is positive (known bad) (deprecated)

Arguments:

  • entry dict: IP entry (required)

Returns:

  • bool - True if bad, false otherwise

positiveUrl#

positiveUrl(entry)

Checks if the given entry from a URL reputation query is positive (known bad) (deprecated)

Arguments:

  • entry dict: URL entry (required)

Returns:

  • bool - True if bad, false otherwise

register_module_line#

register_module_line(module_name, start_end, line, wrapper=0)

Register a module in the line mapping for the traceback line correction algorithm.

Arguments:

  • module_name str: The name of the module. (required)

  • start_end str: Whether to register the line as the start or the end of the module. Possible values: start, end. (required)

  • line int: the line number to record. (required)

  • wrapper int: Wrapper size (used for inline replacements with headers such as ApiModules). (optional)

register_signal_handler_profiling_dump#

register_signal_handler_profiling_dump(signal_type=None, profiling_dump_rows_limit=PROFILING_DUMP_ROWS_LIMIT)

Function that registers the threads and memory dump signal listener

Arguments:

  • profiling_dump_rows_limit int: The max number of profiling related rows to print to the log

  • profiling_dump_rows_limit int: The max number of profiling related rows to print to the log

remove_duplicates_from_list_arg#

remove_duplicates_from_list_arg(args, field)

Removes duplicates from a list after calling argToList. For example: args: {'ids': "1,2,1"}, field='ids' The return output will be ["1", "2"]

Arguments:

  • args dict: Args to be converted (required)

  • field str: Field in args to be converted into list without duplicates (required)

Returns:

  • list - A python list of args without duplicates

remove_empty_elements#

remove_empty_elements(d)

Recursively remove empty lists, empty dicts, or None elements from a dictionary.

Arguments:

  • d __: Input dictionary.

Returns:

  • dict - Dictionary with all empty lists, and empty dictionaries removed.

remove_nulls_from_dictionary#

remove_nulls_from_dictionary(data)

Remove Null values from a dictionary. (updating the given dictionary)

Arguments:

  • data dict: The data to be added to the context (required)

remove_old_incidents_ids#

remove_old_incidents_ids(found_incidents_ids, current_time, look_back)

Removes old incident ids from the last run object to avoid overloading.

Arguments:

  • found_incidents_ids dict: Dict of incidents ids

  • current_time int: The current epoch time to compare with the existing IDs added time

  • look_back int: The look back time in minutes

Returns:

  • dict - The new incidents ids

replace_in_keys#

replace_in_keys(src, existing='.', new='_')

Replace a substring in all of the keys of a dictionary (or list of dictionaries)

Arguments:

  • src dict or list: The dictionary (or list of dictionaries) with keys that need replacement. (required)

  • existing str: substring to replace.

  • new str: new substring that will replace the existing substring.

Returns:

  • dict or list - The dictionary (or list of dictionaries) with keys after substring replacement.

replace_spaces_in_credential#

replace_spaces_in_credential(credential)

This function is used in case of credential from type: 9 is in the wrong format of one line with spaces instead of multiple lines.

Arguments:

  • credential str or None: the credential to replace spaces in.

otherwise the credential will be returned as is.

response_to_context#

response_to_context(reponse_obj, user_predefiend_keys=None)

Recursively creates a data dictionary where all key starts with capital letters. If a key include underscores, removes underscores, capitalize every word. Example: "one_two" to "OneTwo

Arguments:

  • reponse_obj Any: The response object to update. :user_predefiend_keys: An optional argument, a dict with predefined keys where the key is the key in the response and value is the key we want to turn the key into.

Returns:

  • Any - A response with all keys (if there're any) starts with a capital letter.

retry#

retry(times=3, delay=1, exceptions=Exception)

retries to execute a function until an exception isn't raised anymore.

Arguments:

  • times int: The number of times to trigger the retry mechanism.

  • delay int: The time in seconds to sleep between each time

  • exceptions Exception: The exceptions that should be caught when executing the function (Union[tuple[type[Exception], ...], type[Exception]])

Returns:

  • Any - Any

return_error#

return_error(message, error='', outputs=None)

Returns error entry with given message and exits the script

Arguments:

  • message str: The message to return to the entry (required)

  • error str or Exception: The raw error message to log (optional)

  • outputs dict or None: the outputs that will be returned to playbook/investigation context (optional)

Returns:

  • dict - Error entry object

return_outputs#

return_outputs(readable_output, outputs=None, raw_response=None, timeline=None, ignore_auto_extract=False)

DEPRECATED: use return_results() instead

This function wraps the demisto.results(), makes the usage of returning results to the user more intuitively.

Arguments:

  • readable_output str | int: markdown string that will be presented in the warroom, should be human readable - (HumanReadable)

  • outputs dict: the outputs that will be returned to playbook/investigation context (originally EntryContext)

  • raw_response dict | list | str: must be dictionary, if not provided then will be equal to outputs. usually must be the original raw response from the 3rd party service (originally Contents)

  • timeline dict | list: expects a list, if a dict is passed it will be put into a list. used by server to populate an indicator's timeline. if the 'Category' field is not present in the timeline dict(s), it will automatically be be added to the dict(s) with its value set to 'Integration Update'.

  • ignore_auto_extract bool: expects a bool value. if true then the warroom entry readable_output will not be auto enriched.

return_results#

return_results(results)

This function wraps the demisto.results(), supports.

Arguments:

  • results CommandResults or PollResult or str or dict or BaseWidget or list or GetMappingFieldsResponse or GetModifiedRemoteDataResponse or GetRemoteDataResponse: A result object to return as a War-Room entry.

return_warning#

return_warning(message, exit=False, warning='', outputs=None, ignore_auto_extract=False)

Returns a warning entry with the specified message, and exits the script.

Arguments:

  • message str: The message to return in the entry (required).

  • exit bool: Determines if the program will terminate after the command is executed. Default is False.

  • warning str: The warning message (raw) to log (optional).

  • outputs dict or None: The outputs that will be returned to playbook/investigation context (optional).

  • ignore_auto_extract bool: Determines if the War Room entry will be auto-enriched. Default is false.

Returns:

  • dict - Warning entry object

safe_load_json#

safe_load_json(json_object)

Safely loads a JSON object from an argument. Allows the argument to accept either a JSON in string form, or an entry ID corresponding to a JSON file.

Arguments:

  • json_object __: Entry ID or JSON string.

Returns:

  • dict - Dictionary object from a parsed JSON file or string.

safe_sleep#

safe_sleep(duration_seconds)

Sleeps for the given duration, but raises an error if it would exceed the TTL.

Arguments:

  • duration_seconds float: The desired sleep duration in seconds.

scoreToReputation#

scoreToReputation(score)

Converts score (in number format) to human readable reputation format

Arguments:

  • score int: The score to be formatted (required)

Returns:

  • str - The formatted score

sectionsToMarkdown#

sectionsToMarkdown(root)

Converts a list of Demisto JSON tables to markdown string of tables

Arguments:

  • root dict or list: The JSON table - List of dictionaries with the same keys or a single dictionary (required)

Returns:

  • str - A string representation of the markdown table

send_data_to_xsiam#

send_data_to_xsiam(data, vendor, product, data_format=None, url_key='url', num_of_attempts=3, chunk_size=XSIAM_EVENT_CHUNK_SIZE, data_type=EVENTS, should_update_health_module=True, add_proxy_to_request=False, snapshot_id='', items_count=None)

Send the supported fetched data types into the XDR data-collector private api.

Arguments:

  • data Union[str, list]: The data to send to XSIAM server. Should be of the following:
  1. List of strings or dicts where each string or dict represents an event or asset.
  2. String containing raw events separated by a new line.
  • vendor str: The vendor corresponding to the integration that originated the data.

  • product str: The product corresponding to the integration that originated the data.

  • data_format str: Should only be filled in case the 'events' parameter contains a string of raw events in the format of 'leef' or 'cef'. In other cases the data_format will be set automatically.

  • url_key str: The param dict key where the integration url is located at. the default is 'url'.

  • num_of_attempts int: The num of attempts to do in case there is an api limit (429 error codes)

  • chunk_size int: Advanced - The maximal size of each chunk size we send to API. Limit of 9 MB will be inforced.

  • data_type str: Type of data to send to Xsiam, events or assets.

  • should_update_health_module bool: whether to trigger the health module showing how many events were sent to xsiam This can be useful when using send_data_to_xsiam in batches for the same fetch.

  • add_proxy_to_request bool: whether to add proxy to the send evnets request.

  • snapshot_id str: the snapshot id.

  • items_count str: the asset snapshot items count.

send_events_to_xsiam#

send_events_to_xsiam(events, vendor, product, data_format=None, url_key='url', num_of_attempts=3, chunk_size=XSIAM_EVENT_CHUNK_SIZE, should_update_health_module=True, add_proxy_to_request=False)

Send the fetched events into the XDR data-collector private api.

Arguments:

  • events Union[str, list]: The events to send to XSIAM server. Should be of the following:
  1. List of strings or dicts where each string or dict represents an event.
  2. String containing raw events separated by a new line.
  • vendor str: The vendor corresponding to the integration that originated the events.

  • product str: The product corresponding to the integration that originated the events.

  • data_format str: Should only be filled in case the 'events' parameter contains a string of raw events in the format of 'leef' or 'cef'. In other cases the data_format will be set automatically.

  • url_key str: The param dict key where the integration url is located at. the default is 'url'.

  • num_of_attempts int: The num of attempts to do in case there is an api limit (429 error codes)

  • chunk_size int: Advanced - The maximal size of each chunk size we send to API. Limit of 9 MB will be inforced.

  • should_update_health_module bool: whether to trigger the health module showing how many events were sent to xsiam

  • add_proxy_to_request bool: whether to add proxy to the send evnets request.

set_feed_last_run#

set_feed_last_run(last_run_indicators)

This function sets the feed's last run: using demisto.setLastRun().

Arguments:

  • last_run_indicators dict: Indicators to save in "lastRun" object.

set_integration_context#

set_integration_context(context, sync=True, version=-1)

Sets the integration context.

Arguments:

  • context dict: The context to set.

  • sync bool: Whether to save the context directly to the DB.

  • version Any: The version of the context to set.

Returns:

  • dict -

set_last_mirror_run#

set_last_mirror_run(last_mirror_run)

This function sets the last run of the mirror, from XSOAR version 6.6.0, by using demisto.setLastMirrorRun(). Before XSOAR version 6.6.0, we don't set the given data and an exception will be raised.

Arguments:

  • last_mirror_run dict: Data to save in the "LastMirrorRun" object.

set_to_integration_context_with_retries#

set_to_integration_context_with_retries(context, object_keys=None, sync=True, max_retry_times=CONTEXT_UPDATE_RETRY_TIMES)

Update the integration context with a dictionary of keys and values with multiple attempts. The function supports merging the context keys using the provided object_keys parameter. If the version is too old by the time the context is set, another attempt will be made until the limit after a random sleep.

Arguments:

  • context dict: A dictionary of keys and values to set.

  • object_keys dict: A dictionary to map between context keys and their unique ID for merging them.

  • sync bool: Whether to save the context directly to the DB.

  • max_retry_times int: The maximum number of attempts to try.

shortCrowdStrike#

shortCrowdStrike(entry)

Display CrowdStrike Intel results in Markdown (deprecated)

Arguments:

  • entry dict: CrowdStrike result entry (required)

Returns:

  • dict - A Demisto entry containing the shortened CrowdStrike info

shortDomain#

shortDomain(entry)

Formats a domain reputation entry into a short table (deprecated)

Arguments:

  • entry dict: Domain result entry (required)

Returns:

  • dict - A Demisto entry containing the shortened domain info

shortFile#

shortFile(entry)

Formats a file reputation entry into a short table (deprecated)

Arguments:

  • entry dict: File result entry (required)

Returns:

  • dict - A Demisto entry containing the shortened file info

shortIp#

shortIp(entry)

Formats an ip reputation entry into a short table (deprecated)

Arguments:

  • entry dict: IP result entry (required)

Returns:

  • dict - A Demisto entry containing the shortened IP info

shortUrl#

shortUrl(entry)

Formats a URL reputation entry into a short table (deprecated)

Arguments:

  • entry dict: URL result entry (required)

Returns:

  • dict - A Demisto entry containing the shortened URL info

shorten_string_for_printing#

shorten_string_for_printing(source_string, max_length=64)

Function that removes the middle of a long str, for printint or logging. If needed, it will replace the middle with '...',

Examples:

>>> shorten_string_for_printing('123456789', 9)
'123456789'
>>> shorten_string_for_printing('1234567890', 9)
'abc...890'
>>> shorten_string_for_printing('123456789012', 10)
'1234...012'

Arguments:

  • source_string str: A long str that needs shortening.

  • max_length int: Maximum length of the returned str, should be higher than 0. Default is 64.

Returns:

  • str - : A string no longer than max_length.

signal_handler_profiling_dump#

signal_handler_profiling_dump(_sig, _frame)

Listener function to dump the threads and memory to log info

Arguments:

  • _sig int: The signal number

  • _frame Any: The current stack frame

skip_cert_verification#

skip_cert_verification()

The function deletes the self signed certificate env vars in order to http requests to skip certificate validation.

skip_proxy#

skip_proxy()

The function deletes the proxy environment vars in order to http requests to skip routing through proxy

snakify#

snakify(src)

Convert all keys of a dictionary to snake_case (underscored separated)

Arguments:

  • src dict: The dictionary to convert the keys for. (required)

Returns:

  • dict - The dictionary (or list of dictionaries) with the keys in CamelCase.

split_data_to_chunks#

split_data_to_chunks(data, target_chunk_size)

Splits a string of data into chunks of an approximately specified size. The actual size can be lower.

Arguments:

  • data list or a string: A list of data or a string delimited with \n to split to chunks.
  • target_chunk_size int: The maximum size of each chunk. The maximal size allowed is 9MB.

Returns:

  • collections.Iterable[list] - An iterable of lists where each list contains events with approx size of chunk size.

stringEscape#

stringEscape(st)

Escape newline chars in the given string.

Arguments:

  • st str: The string to be modified (required).

Returns:

  • str - A modified string.

stringEscapeMD#

stringEscapeMD(st, minimal_escaping=False, escape_multiline=False)

Escape any chars that might break a markdown string

Arguments:

  • st str: The string to be modified (required)

  • minimal_escaping bool: Whether replace all special characters or table format only (optional)

  • escape_multiline bool: Whether convert line-ending characters (optional)

Returns:

  • str - A modified string

stringUnEscape#

stringUnEscape(st)

Unescape newline chars in the given string.

Arguments:

  • st str: The string to be modified (required).

Returns:

  • str - A modified string.

string_to_context_key#

string_to_context_key(string)

Checks if string, removes underscores, capitalize every word. Example: "one_two" to "OneTwo"

Arguments:

  • string str: The string to be converted (required)

Returns:

  • str - The converted string

string_to_table_header#

string_to_table_header(string)

Checks if string, change underscores to spaces, capitalize every word. Example: "one_two" to "One Two"

Arguments:

  • string str: The string to be converted (required)

Returns:

  • str - The converted string

support_multithreading#

support_multithreading()

Adds lock on the calls to the Cortex XSOAR server from the Demisto object to support integration which use multithreading.

tableToMarkdown#

tableToMarkdown(name, t, headers=None, headerTransform=None, removeNull=False, metadata=None, url_keys=None, date_fields=None, json_transform_mapping=None, is_auto_json_transform=False, sort_headers=True)

Converts a demisto table in JSON form to a Markdown table

Arguments:

  • name str: The name of the table (required)

  • t dict or list: The JSON table - List of dictionaries with the same keys or a single dictionary (required)

  • headers list or string: A list of headers to be presented in the output table (by order). If string will be passed then table will have single header. Default will include all available headers.

  • headerTransform function: A function that formats the original data headers (optional)

  • removeNull bool: Remove empty columns from the table. Default is False

  • metadata str: Metadata about the table contents

  • url_keys list: a list of keys in the given JSON table that should be turned in to clickable

  • date_fields list: A list of date fields to format the value to human-readable output.

  • json_transform_mapping Dict[str, JsonTransformer]: A mapping between a header key to corresponding JsonTransformer

  • is_auto_json_transform bool: Boolean to try to auto transform complex json

  • sort_headers bool: Sorts the table based on its headers only if the headers parameter is not specified

Returns:

  • str - A string representation of the markdown table

timestamp_to_datestring#

timestamp_to_datestring(timestamp, date_format="%Y-%m-%dT%H:%M:%S.000Z", is_utc=False)

Parses timestamp (milliseconds) to a date string in the provided date format (by default: ISO 8601 format) Examples: (1541494441222, 1541495441000, etc.)

Arguments:

  • timestamp int or str: The timestamp to be parsed (required)

  • date_format str: The date format the timestamp should be parsed to. (optional)

  • is_utc bool: Should the string representation of the timestamp use UTC time or the local machine time

Returns:

  • str - The parsed timestamp in the date_format

underscoreToCamelCase#

underscoreToCamelCase(s, upper_camel=True)

Convert an underscore separated string to camel case

Arguments:

  • s str: The string to convert (e.g. hello_world) (required)

  • upper_camel bool: When True then transforms dictionarykeys to camel case with the first letter capitalised (for example: demisto_content to DemistoContent), otherwise the first letter will not be capitalised (for example: demisto_content to demistoContent).

Returns:

  • str - The converted string (e.g. HelloWorld)

update_integration_context#

update_integration_context(context, object_keys=None, sync=True)

Update the integration context with a given dictionary after merging it with the latest integration context.

Arguments:

  • context dict: The keys and values to update in the integration context.

  • object_keys dict: A dictionary to map between context keys and their unique ID for merging them with the latest context.

  • sync bool: Whether to use the context directly from the DB.

Returns:

  • tuple -

update_last_run_object#

update_last_run_object(last_run, incidents, fetch_limit, start_fetch_time, end_fetch_time, look_back, created_time_field, id_field, date_format='%Y-%m-%dT%H:%M:%S', increase_last_run_time=False, new_offset=None)

Updates the LastRun object with the next fetch time and limit and with the new fetched incident IDs.

Arguments:

  • last_run dict: The LastRun object

  • incidents list: List of the incidents result

  • fetch_limit int: The fetch limit

  • start_fetch_time str: The time the fetch started to fetch from

  • end_fetch_time str: The end time in which the fetch incidents ended

  • look_back int: The time to look back in fetch in minutes

  • created_time_field str: The incident created time field

  • id_field str: The incident id field

  • date_format str: The date format

  • increase_last_run_time bool: Whether to increase the last run time with one millisecond

  • new_offset int | None: The new offset to set in the last run

Returns:

  • Dict - The updated LastRun object

url_to_clickable_markdown#

url_to_clickable_markdown(data, url_keys)

Turn the given urls fields in to clickable url, used for the markdown table.

Arguments:

  • data [Union[str, List[Any], Dict[str, Any]]]: a dictionary or a list containing data with some values that are urls

  • url_keys List[str]: the keys of the url's wished to turn clickable

Returns:

  • [Union[str, List[Any], Dict[str, Any]]] - markdown format for clickable url

urljoin#

urljoin(url, suffix="")

Will join url and its suffix

Example: "https://google.com/", "/" => "https://google.com/" "https://google.com", "/" => "https://google.com/" "https://google.com", "api" => "https://google.com/api" "https://google.com", "/api" => "https://google.com/api" "https://google.com/", "api" => "https://google.com/api" "https://google.com/", "/api" => "https://google.com/api"

Arguments:

  • url string: URL string (required)

  • suffix string: the second part of the url

Returns:

  • string - Full joined url

vtCountPositives#

vtCountPositives(entry)

Counts the number of detected URLs in the entry

Arguments:

  • entry dict: Demisto entry (required)

Returns:

  • int - The number of detected URLs

xml2json#

xml2json(xmlstring, options={}, strip_ns=1, strip=1)

Convert an XML string into a JSON string.

Arguments:

  • xmlstring str: The string to be converted (required)

Returns:

  • dict or list - The converted JSON

xsiam_api_call_with_retries#

xsiam_api_call_with_retries(client, xsiam_url, zipped_data, headers, num_of_attempts, events_error_handler=None, error_msg='', is_json_response=False, data_type=EVENTS)

Send the fetched events or assests into the XDR data-collector private api.

Arguments:

  • client BaseClient: base client containing the XSIAM url.

  • xsiam_url str: The URL of XSIAM to send the api request.

  • zipped_data bytes: encoded events

  • headers dict: headers for the request

  • error_msg str: The error message prefix in case of an error.

  • num_of_attempts int: The num of attempts to do in case there is an api limit (429 error codes).

  • events_error_handler callable: error handler function

  • data_type str: events or assets

Returns:

  • requests.Response or DemistoException - Response object or DemistoException