import demistomock as demisto
from CommonServerPython import *
from CommonServerUserPython import *
import enum
import json
import urllib3
import dateparser
import traceback
from typing import Any, Dict, List, Optional, Union
urllib3.disable_warnings()
''' CONSTANTS '''
DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ'
MAX_INCIDENTS_TO_FETCH = 50
HELLOWORLD_SEVERITIES = ['Low', 'Medium', 'High', 'Critical']
metadata_collector = YMLMetadataCollector(integration_name="HelloWorldNoYML",
description="This is the Hello World integration for getting started.",
display="HelloWorldNoYML",
category="Utilities",
docker_image="demisto/python3:3.9.8.24399",
is_fetch=True,
long_running=False,
long_running_port=False,
is_runonce=False,
integration_subtype="python3",
integration_type="python",
fromversion="5.0.0",
default_mapper_in="HelloWorld-mapper",
default_classifier="HelloWorld",
conf=[ConfKey(name="url",
display="Server URL (e.g., https://soar.monstersofhack.com)",
required=True,
default_value="https://soar.monstersofhack.com"),
ConfKey(name="isFetch",
display="Fetch incidents",
required=False,
key_type=ParameterTypes.BOOLEAN),
ConfKey(name="incidentType",
display="Incident type",
required=False,
default_value="https://soar.monstersofhack.com",
key_type=ParameterTypes.SINGLE_SELECT),
ConfKey(name="max_fetch",
display="Maximum number of incidents per fetch",
required=False,
default_value='10'),
ConfKey(name="apikey",
display="API Key",
required=True,
key_type=ParameterTypes.TEXT_AREA_ENCRYPTED),
ConfKey(name="threshold_ip",
display="Score threshold for IP reputation command",
required=False,
default_value='65',
additional_info="Set this to determine the HelloWorld score "
"that will determine if an IP is malicious "
"(0-100)"),
ConfKey(name="threshold_domain",
display="Score threshold for domain reputation command",
required=False,
default_value='65',
additional_info="Set this to determine the HelloWorld score "
"that will determine if a domain is malicious "
"(0-100)"),
ConfKey(name="alert_status",
display="Fetch alerts with status (ACTIVE, CLOSED)",
required=False,
default_value="ACTIVE",
options=["ACTIVE", "CLOSED"],
key_type=ParameterTypes.SINGLE_SELECT),
ConfKey(name="alert_type",
display="Fetch alerts with type",
required=False,
additional_info="Comma-separated list of types of alerts to "
"fetch. Types might change over time. Some "
"examples are 'Bug' and 'Vulnerability'"),
ConfKey(name="min_severity",
display="Minimum severity of alerts to fetch",
required=True,
default_value='Low',
options=["Low", "Medium", "High", "Critical"],
key_type=ParameterTypes.SINGLE_SELECT),
ConfKey(name="first_fetch",
display="First fetch time",
required=False,
default_value="3 days"),
ConfKey(name="insecure",
display="Trust any certificate (not secure)",
required=False,
key_type=ParameterTypes.BOOLEAN),
ConfKey(name="proxy",
display="Use system proxy settings",
required=False,
key_type=ParameterTypes.BOOLEAN)
])
''' CLIENT CLASS '''
class Client(BaseClient):
"""Client class to interact with the service API
This Client implements API calls, and does not contain any Demisto logic.
It should only do requests and return data.
It inherits from BaseClient defined in CommonServer Python.
Most calls use _http_request() that handles proxy, SSL verification, etc.
For this HelloWorld implementation, no special attributes are defined.
"""
def get_ip_reputation(self, ip):
"""Gets the IP reputation using the '/ip' API endpoint
:type ip: ``str``
:param ip: IP address to get the reputation for
:return: dict containing the IP reputation as returned from the API
:rtype: ``Dict[str, Any]``
"""
return self._http_request(
method='GET',
url_suffix='/ip',
params={
'ip': ip
}
)
def get_domain_reputation(self, domain: str) -> Dict[str, Any]:
"""Gets the Domain reputation using the '/domain' API endpoint
:type domain: ``str``
:param domain: domain name to get the reputation for
:return: dict containing the domain reputation as returned from the API
:rtype: ``Dict[str, Any]``
"""
return self._http_request(
method='GET',
url_suffix='/domain',
params={
'domain': domain
}
)
def search_alerts(self, alert_status: Optional[str], severity: Optional[str],
alert_type: Optional[str], max_results: Optional[int],
start_time: Optional[int]) -> List[Dict[str, Any]]:
"""Searches for HelloWorld alerts using the '/get_alerts' API endpoint
All the parameters are passed directly to the API as HTTP POST parameters in the request
:type alert_status: ``Optional[str]``
:param alert_status: status of the alert to search for. Options are: 'ACTIVE' or 'CLOSED'
:type severity: ``Optional[str]``
:param severity:
severity of the alert to search for. Comma-separated values.
Options are: "Low", "Medium", "High", "Critical"
:type alert_type: ``Optional[str]``
:param alert_type: type of alerts to search for. There is no list of predefined types
:type max_results: ``Optional[int]``
:param max_results: maximum number of results to return
:type start_time: ``Optional[int]``
:param start_time: start timestamp (epoch in seconds) for the alert search
:return: list containing the found HelloWorld alerts as dicts
:rtype: ``List[Dict[str, Any]]``
"""
request_params: Dict[str, Any] = {}
if alert_status:
request_params['alert_status'] = alert_status
if alert_type:
request_params['alert_type'] = alert_type
if severity:
request_params['severity'] = severity
if max_results:
request_params['max_results'] = max_results
if start_time:
request_params['start_time'] = start_time
return self._http_request(
method='GET',
url_suffix='/get_alerts',
params=request_params
)
def get_alert(self, alert_id: str) -> Dict[str, Any]:
"""Gets a specific HelloWorld alert by id
:type alert_id: ``str``
:param alert_id: ID of the alert to return
:return: dict containing the alert as returned from the API
:rtype: ``Dict[str, Any]``
"""
return self._http_request(
method='GET',
url_suffix='/get_alert_details',
params={
'alert_id': alert_id
}
)
def update_alert_status(self, alert_id: str, alert_status: str) -> Dict[str, Any]:
"""Changes the status of a specific HelloWorld alert
:type alert_id: ``str``
:param alert_id: ID of the alert to return
:type alert_status: ``str``
:param alert_status: new alert status. Options are: 'ACTIVE' or 'CLOSED'
:return: dict containing the alert as returned from the API
:rtype: ``Dict[str, Any]``
"""
return self._http_request(
method='GET',
url_suffix='/change_alert_status',
params={
'alert_id': alert_id,
'alert_status': alert_status
}
)
def scan_start(self, hostname: str) -> Dict[str, Any]:
"""Starts a HelloWorld scan on a specific hostname
:type hostname: ``str``
:param hostname: hostname of the machine to scan
:return: dict containing the scan status as returned from the API
:rtype: ``Dict[str, Any]``
"""
return self._http_request(
method='GET',
url_suffix='/start_scan',
params={
'hostname': hostname
}
)
def scan_status(self, scan_id: str) -> Dict[str, Any]:
"""Gets the status of a HelloWorld scan
:type scan_id: ``str``
:param scan_id: ID of the scan to retrieve status for
:return: dict containing the scan status as returned from the API
:rtype: ``Dict[str, Any]``
"""
return self._http_request(
method='GET',
url_suffix='/check_scan',
params={
'scan_id': scan_id
}
)
def scan_results(self, scan_id: str) -> Dict[str, Any]:
"""Gets the results of a HelloWorld scan
:type scan_id: ``str``
:param scan_id: ID of the scan to retrieve results for
:return: dict containing the scan results as returned from the API
:rtype: ``Dict[str, Any]``
"""
return self._http_request(
method='GET',
url_suffix='/get_scan_results',
params={
'scan_id': scan_id
}
)
def say_hello(self, name: str) -> str:
"""Returns 'Hello {name}'
:type name: ``str``
:param name: name to append to the 'Hello' string
:return: string containing 'Hello {name}'
:rtype: ``str``
"""
return f'Hello {name}'
''' HELPER FUNCTIONS '''
def parse_domain_date(domain_date: Union[List[str], str], date_format: str = '%Y-%m-%dT%H:%M:%S.000Z') -> Optional[str]:
"""Converts WHOIS date format to an ISO8601 string
Converts the HelloWorld domain WHOIS date (YYYY-mm-dd HH:MM:SS) format
in a datetime. If a list is returned with multiple elements, takes only
the first one.
:type domain_date: ``Union[List[str],str]``
:param date_format:
a string or list of strings with the format 'YYYY-mm-DD HH:MM:SS'
:return: Parsed time in ISO8601 format
:rtype: ``Optional[str]``
"""
if isinstance(domain_date, str):
domain_date_dt = dateparser.parse(domain_date)
if domain_date_dt:
return domain_date_dt.strftime(date_format)
elif isinstance(domain_date, list) and len(domain_date) > 0 and isinstance(domain_date[0], str):
domain_date_dt = dateparser.parse(domain_date[0])
if domain_date_dt:
return domain_date_dt.strftime(date_format)
return None
def convert_to_demisto_severity(severity: str) -> int:
"""Maps HelloWorld severity to Cortex XSOAR severity
Converts the HelloWorld alert severity level ('Low', 'Medium',
'High', 'Critical') to Cortex XSOAR incident severity (1 to 4)
for mapping.
:type severity: ``str``
:param severity: severity as returned from the HelloWorld API (str)
:return: Cortex XSOAR Severity (1 to 4)
:rtype: ``int``
"""
return {
'Low': IncidentSeverity.LOW,
'Medium': IncidentSeverity.MEDIUM,
'High': IncidentSeverity.HIGH,
'Critical': IncidentSeverity.CRITICAL
}[severity]
''' COMMAND FUNCTIONS '''
def test_module(client: Client, first_fetch_time: int) -> str:
"""Tests API connectivity and authentication'
Returning 'ok' indicates that the integration works like it is supposed to.
Connection to the service is successful.
Raises exceptions if something goes wrong.
:type client: ``Client``
:param Client: HelloWorld client to use
:type name: ``str``
:param name: name to append to the 'Hello' string
:return: 'ok' if test passed, anything else will fail the test.
:rtype: ``str``
"""
try:
client.search_alerts(max_results=1, start_time=first_fetch_time, alert_status=None, alert_type=None,
severity=None)
except DemistoException as e:
if 'Forbidden' in str(e):
return 'Authorization Error: make sure API Key is correctly set'
else:
raise e
return 'ok'
@metadata_collector.command(command_name="helloworld-say-hello", outputs_prefix="HelloWorld")
def say_hello_command(client: Client, args: Dict[str, Any], **kwargs) -> CommandResults:
"""Hello command - prints hello to anyone.
Args:
client (Client): HelloWorld client to use.
name (str): The name of whom you want to say hello to.
Returns:
A ``CommandResults`` object that is then passed to ``return_results``,
that contains the hello world message
Context Outputs:
hello (str): Should be Hello **something** here.
"""
name = args.get('name', None)
if not name:
raise ValueError('name not specified')
result = client.say_hello(name)
readable_output = f'## {result}'
return CommandResults(
readable_output=readable_output,
outputs_prefix='HelloWorld.hello',
outputs_key_field='',
outputs=result
)
class SeverityEnum(enum.Enum):
"""YML configuration key types."""
Low = "Low"
Medium = "Medium"
High = "High"
Critical = "Critical"
SEARCH_ALERTS_INPUTS = [InputArgument(name='severity',
description='Filter by alert severity. Comma-separated value '
'(Low,Medium,High,Critical)',
input_type=SeverityEnum),
InputArgument(name='status',
description='Filter by alert status.',
options=['ACTIVE', 'CLOSED']),
InputArgument(name='alert_type',
description='Filter by alert type.'),
InputArgument(name='max_results',
description='Maximum results to return.'),
InputArgument(name='start_time',
description='Filter by start time. \nExamples:\n \"3 days ago\"\n '
'\"1 month\"\n \"2019-10-10T12:22:00\"\n \"2019-10-10\"')]
@metadata_collector.command(command_name='helloworld-search-alerts', inputs_list=SEARCH_ALERTS_INPUTS,
outputs_prefix='HelloWorld.Alert')
def search_alerts_command(client: Client, args: Dict[str, Any]) -> CommandResults:
"""Search HelloWorld Alerts.
:type client: ``Client``
:param Client: HelloWorld client to use
:type args: ``Dict[str, Any]``
:param args:
all command arguments, usually passed from ``demisto.args()``.
``args['status']`` alert status. Options are 'ACTIVE' or 'CLOSED'
``args['severity']`` alert severity CSV
``args['alert_type']`` alert type
``args['start_time']`` start time as ISO8601 date or seconds since epoch
``args['max_results']`` maximum number of results to return
:return:
A ``CommandResults`` object that is then passed to ``return_results``,
that contains alerts
:rtype: ``CommandResults``
Context Outputs:
alert_id (str): Alert ID.
alert_status (str): Alert status. Can be 'ACTIVE' or 'CLOSED'.
alert_type (str): Alert type. For example 'Bug' or 'Vulnerability'.
created (datetime.datetime): Alert created time. Format is ISO8601 (i.e., '2020-04-30T10:35:00.000Z').
name (str): Alert name.
severity (str): Alert severity. Can be 'Low', 'Medium', 'High' or 'Critical'.
"""
status = args.get('status')
severities: List[str] = HELLOWORLD_SEVERITIES
severity = args.get('severity', None)
if severity:
severities = severity.split(',')
if not all(s in HELLOWORLD_SEVERITIES for s in severities):
raise ValueError(
f'severity must be a comma-separated value '
f'with the following options: {",".join(HELLOWORLD_SEVERITIES)}')
alert_type = args.get('alert_type')
start_time = arg_to_datetime(
arg=args.get('start_time'),
arg_name='start_time',
required=False
)
max_results = arg_to_number(
arg=args.get('max_results'),
arg_name='max_results',
required=False
)
alerts = client.search_alerts(
severity=','.join(severities),
alert_status=status,
alert_type=alert_type,
start_time=int(start_time.timestamp()) if start_time else None,
max_results=max_results
)
for alert in alerts:
if 'created' not in alert:
continue
created_time_ms = int(alert.get('created', '0')) * 1000
alert['created'] = timestamp_to_datestring(created_time_ms)
return CommandResults(
outputs_prefix='HelloWorld.Alert',
outputs_key_field='alert_id',
outputs=alerts
)
GET_ALERT_OUTPUTS = [OutputArgument(name='alert_id', output_type=str, description='Alert ID.'),
OutputArgument(name='created', output_type=datetime,
description="Alert created time. Format is ISO8601 "
"(i.e., '2020-04-30T10:35:00.000Z').")]
@metadata_collector.command(command_name='helloworld-get-alert', outputs_prefix='HelloWorld.Alert',
outputs_list=GET_ALERT_OUTPUTS, restore=True)
def get_alert_command(client: Client, args: Dict[str, Any], outputs_prefix) -> CommandResults:
"""Retrieve alert extra data by ID.
:type client: ``Client``
:param Client: HelloWorld client to use
:param alert_id: alert ID to return
:return:
A ``CommandResults`` object that is then passed to ``return_results``,
that contains an alert
:rtype: ``CommandResults``
"""
alert_id = args.get('alert_id', None)
if not alert_id:
raise ValueError('alert_id not specified')
alert = client.get_alert(alert_id=alert_id)
if 'created' in alert:
created_time_ms = int(alert.get('created', '0')) * 1000
alert['created'] = timestamp_to_datestring(created_time_ms)
readable_output = tableToMarkdown(f'HelloWorld Alert {alert_id}', alert)
return CommandResults(
readable_output=readable_output,
outputs_prefix=outputs_prefix,
outputs_key_field='alert_id',
outputs=alert
)
class StatusEnum(enum.Enum):
"""YML configuration key types."""
ACTIVE = "ACTIVE"
CLOSED = "CLOSED"
@metadata_collector.command(command_name='helloworld-update-alert-status', outputs_prefix='HelloWorld.Alert')
def update_alert_status_command(client: Client, args: Dict[str, Any]) -> CommandResults:
"""Update the status for an alert.
Changes the status of a HelloWorld alert and returns the updated alert info.
Args:
client (Client): HelloWorld client to use.
alert_id: required. Alert ID to update.
status (StatusEnum): required. New status of the alert. Options=[ACTIVE, CLOSED].
Returns:
A ``CommandResults`` object that is then passed to ``return_results``,
that contains an updated alert.
Context Outputs:
alert_id (str): Alert ID.
updated (datetime): Alert update time. Format is ISO8601 (i.e., '2020-04-30T10:35:00.000Z').
alert_status (str): Alert status. Can be 'ACTIVE' or 'CLOSED'.
"""
alert_id = args.get('alert_id', None)
if not alert_id:
raise ValueError('alert_id not specified')
status = args.get('status', None)
if status not in ('ACTIVE', 'CLOSED'):
raise ValueError('status must be either ACTIVE or CLOSED')
alert = client.update_alert_status(alert_id, status)
if 'updated' in alert:
updated_time_ms = int(alert.get('updated', '0')) * 1000
alert['updated'] = timestamp_to_datestring(updated_time_ms)
readable_output = tableToMarkdown(f'HelloWorld Alert {alert_id}', alert)
return CommandResults(
readable_output=readable_output,
outputs_prefix='HelloWorld.Alert',
outputs_key_field='alert_id',
outputs=alert
)
def scan_results_command(client: Client, args: Dict[str, Any]) ->\
Union[Dict[str, Any], CommandResults, List[CommandResults]]:
"""helloworld-scan-results command: Returns results for a HelloWorld scan
:type client: ``Client``
:param Client: HelloWorld client to use
:type args: ``Dict[str, Any]``
:param args:
all command arguments, usually passed from ``demisto.args()``.
``args['scan_id']`` scan ID to retrieve results
``args['format']`` format of the results. Options are 'file' or 'json'
:return:
A ``CommandResults`` compatible to return ``return_results()``,
that contains a scan result when json format is selected, or
A Dict of entries also compatible to ``return_results()`` that
contains the output file when file format is selected.
:rtype: ``Union[Dict[str, Any],CommandResults]``
"""
scan_id = args.get('scan_id', None)
if not scan_id:
raise ValueError('scan_id not specified')
scan_format = args.get('format', 'file')
results = client.scan_results(scan_id=scan_id)
if scan_format == 'file':
return (
fileResult(
filename=f'{scan_id}.json',
data=json.dumps(results, indent=4),
file_type=entryTypes['entryInfoFile']
)
)
elif scan_format == 'json':
cves: List[Common.CVE] = []
command_results: List[CommandResults] = []
entities = results.get('entities', [])
for e in entities:
if 'vulns' in e.keys() and isinstance(e['vulns'], list):
cves.extend(
[Common.CVE(id=c, cvss=None, published=None, modified=None, description=None) for c in e['vulns']])
readable_output = tableToMarkdown(f'Scan {scan_id} results', entities)
command_results.append(CommandResults(
readable_output=readable_output,
outputs_prefix='HelloWorld.Scan',
outputs_key_field='scan_id',
outputs=results
))
cves = list(set(cves))
for cve in cves:
command_results.append(CommandResults(
readable_output=f"CVE {cve}",
indicator=cve
))
return command_results
else:
raise ValueError('Incorrect format, must be "json" or "file"')
''' MAIN FUNCTION '''
def main() -> None:
"""main function, parses params and runs command functions
:return:
:rtype:
"""
api_key = demisto.params().get('apikey')
base_url = urljoin(demisto.params()['url'], '/api/v1')
verify_certificate = not demisto.params().get('insecure', False)
first_fetch_time = arg_to_datetime(
arg=demisto.params().get('first_fetch', '3 days'),
arg_name='First fetch time',
required=True
)
first_fetch_timestamp = int(first_fetch_time.timestamp()) if first_fetch_time else None
assert isinstance(first_fetch_timestamp, int)
proxy = demisto.params().get('proxy', False)
demisto.debug(f'Command being called is {demisto.command()}')
try:
headers = {
'Authorization': f'Bearer {api_key}'
}
client = Client(
base_url=base_url,
verify=verify_certificate,
headers=headers,
proxy=proxy)
if demisto.command() == 'test-module':
result = test_module(client, first_fetch_timestamp)
return_results(result)
elif demisto.command() == 'helloworld-say-hello':
return_results(say_hello_command(client, demisto.args()))
elif demisto.command() == 'helloworld-search-alerts':
return_results(search_alerts_command(client, demisto.args()))
elif demisto.command() == 'helloworld-get-alert':
return_results(get_alert_command(client, demisto.args()))
elif demisto.command() == 'helloworld-update-alert-status':
return_results(update_alert_status_command(client, demisto.args()))
except Exception as e:
demisto.error(traceback.format_exc())
return_error(f'Failed to execute {demisto.command()} command.\nError:\n{str(e)}')
''' ENTRY POINT '''
if __name__ in ('__main__', '__builtin__', 'builtins'):
main()
])