Developing the connector
- From the IDE screen, click
in the upper left hand corner to add a new IDE item. Select the
Integration radio button and give the integration the name: "Email Connector". - The integration will be created and listed on the left hand side with a default icon. Clicking on the will bring up the Integration Settings where the Icon, Description, Python Dependencies and Integration Parameters can be defined.
- Next, click
- After creating the connector, set the following connector parameters:
Parameter Name Description Mandatory Type Default Value Explanation Username IMAP User name Yes String email@gmail.com The email address from which the connector will pull the emails into Chronicle SOAR platform Password IMAP Password Yes Password The password associated with the email address from which the connector will pull the emails into Chronicle SOAR platform IMAP Port Imap port. e.g. 993 Yes Int 993 The Internet Message Access Protocol (IMAP) is a mail protocol used for accessing emails on a remote web server from a local client. IMAP Server Address e.g. imap.gmail.com Yes String imap.google.com The incoming mail server for an IMAP account can also be called the IMAP server. In this example, the email provider is google.com, and the incoming mail server is imap.google.com. Folder to check for emails Pulls emails only from the specified folder No String Inbox The folder from which the emails will be retrieved, For example: Inbox - Next, in the upper right fill out the fields:
- "Product Field Name" = device_product, determines which value from the raw fields would be assigned to the product name of the alert. You can find the
related field in the code in line 57 which was defined as "Mail" (product).
event["device_product"] = PRODUCT #The PRODUCT constant is "Mail"
- "Event Field Name" = event_name, determines which value from the raw fields would be assigned to the event type field. You can find the related field in the code in line 56 which was defined as "Suspicious email".
event["event_name"] = "Suspicious email"
Edit the email connector
Copy the code below created for the "My Email Connector", paste it in the IDE and follow the instructions.from SiemplifyConnectors import SiemplifyConnectorExecution from SiemplifyConnectorsDataModel import AlertInfo from SiemplifyUtils import output_handler, convert_datetime_to_unix_time, convert_string_to_datetime import email, imaplib, sys, re # CONSTANTS CONNECTOR_NAME = "Mail" VENDOR = "Mail" PRODUCT = "Mail" DEFAULT_PRIORITY = 60 # Default is Medium RULE_GENERATOR_EXAMPLE = "Mail" DEFAULT_FOLDER_TO_CHECK_INBOX = "inbox" DEFAULT_MESSAGES_TO_READ_UNSEEN = "UNSEEN" URLS_REGEX = r"(?i)\b(?:http(?:s)?:\/\/)?(?:www\.)?[a-zA-Z0-9:%_\+~#=][a-zA-Z0-9:%\._\+~#=]{1,255}\.[a-z]{2,6}\b(?:[-a-zA-Z0-9@:%_\+.~#?&//=]*)" def create_alert(siemplify, alert_id, email_message_data, datetime_in_unix_time, created_event): """Returns an alert which is one event that contains one unread email message""" siemplify.LOGGER.info(f"Started processing Alert {alert_id}") create_event = None alert_info = AlertInfo() # Initializes alert_info alert_info.display_id = f"{alert_id}" # Each alert needs to have a unique id, otherwise it won't create a case with the same alert id. alert_info.ticket_id = f"{alert_id}" # In default, ticket_id = display_id. However, if for some reason the external alert id is different from the display_id, you can save the original external alert id in the "ticket_id" field. alert_info.name = email_message_data['Subject'] alert_info.rule_generator = RULE_GENERATOR_EXAMPLE # The name of the siem rule which causes the creation of the alert. alert_info.start_time = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime alert_info.end_time = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime alert_info.priority = 60 # Informative = -1,Low = 40,Medium = 60,High = 80,Critical = 100. alert_info.device_vendor = VENDOR # The field will be fetched from the Original Alert. If you build this alert manually, state the source vendor of the data. (ie: Microsoft, Mcafee) alert_info.device_product = PRODUCT # The field will be fetched from the Original Alert. If you build this alert manually, state the source product of the data. (ie: ActiveDirectory, AntiVirus) siemplify.LOGGER.info(f"Events creating started for alert {alert_id}") try: if created_event is not None: alert_info.events.append(created_event) siemplify.LOGGER.info(f"Added Event {alert_id} to Alert {alert_id}") # Raise an exception if failed to process the event except Exception as e: siemplify.LOGGER.error(f"Failed to process event {alert_id}") siemplify.LOGGER.exception(e) return alert_info def create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time): """Returns the digested data of a single unread email""" siemplify.LOGGER.info(f"--- Started processing Event: alert_id: {alert_id} | event_id: {alert_id}") event = {} event["StartTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime event["EndTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime event["event_name"] = "Suspicious email" event["device_product"] = PRODUCT # ie: "device_product" is the field name that describes the product the event originated from. event["Subject"] = email_message_data["Subject"] event["SourceUserName"] = email_message_data["From"] event["DestinationUserName"] = email_message_data["To"] event["found_url"] = ",".join(all_found_url_in_emails_body_list) siemplify.LOGGER.info(f"---Finished processing Event: alert_id: {alert_id} | event_id: {alert_id}") return event def find_url_in_email_message_body(siemplify, email_messages_data_list): """Search for a url in the email body""" all_found_url_in_emails_body_list = [] for message in email_messages_data_list: for part in message.walk(): if part.get_content_maintype() == 'text\plain': continue email_message_body = part.get_payload() all_found_urls = re.findall(URLS_REGEX, str(email_message_body)) for url in all_found_urls: if url not in all_found_url_in_emails_body_list: all_found_url_in_emails_body_list.append(url) def get_email_messages_data(imap_host, imap_port, username, password, folder_to_check): """Returns all unread email messages""" email_messages_data_list = [] # Login to email using 'imap' module mail = imaplib.IMAP4_SSL(imap_host, imap_port) mail.login(username, password) # Determining the default email folder to pull emails from - 'inbox' if folder_to_check is None: folder_to_check = DEFAULT_FOLDER_TO_CHECK_INBOX # Selecting the email folder to pull the data from mail.select(folder_to_check) # Storing the email message data result, data = mail.search(None, DEFAULT_MESSAGES_TO_READ_UNSEEN) # If there are several emails collected in the cycle it will split each # email message into a separate item in the list chosen_mailbox_items_list if len(data) > 0: chosen_mailbox_items_list = data[0].split() # Iterating each email message and appending to emails_messages_data_list for item in chosen_mailbox_items_list: typ, email_data = mail.fetch(item, '(RFC 822)') # Decoding from binary string to string raw_email = email_data[0][1].decode("utf-8") # Turning the email data into an email object email_message = email.message_from_string(raw_email) # Appending the email message data to email_messages_data_list email_messages_data_list.append(email_message) return email_messages_data_list @output_handler def main(is_test_run): alerts = [] # The main output of each connector run that contains the alerts data siemplify = SiemplifyConnectorExecution() # Siemplify main SDK wrapper siemplify.script_name = CONNECTOR_NAME # In case of running a test if (is_test_run): siemplify.LOGGER.info("This is an \"IDE Play Button\"\\\"Run Connector once\" test run") # Extracting the connector's Params username = siemplify.extract_connector_param(param_name="Username") password = siemplify.extract_connector_param(param_name="Password") imap_host = siemplify.extract_connector_param(param_name="IMAP Server Address") imap_port = siemplify.extract_connector_param(param_name="IMAP Port") folder_to_check = siemplify.extract_connector_param(param_name="Folder to check for emails") # Getting the digested email message data email_messages_data_list = get_email_messages_data(imap_host, imap_port, username, password, folder_to_check) # If the email_messages_data_list is not empty if len(email_messages_data_list) > 0: for message in email_messages_data_list: # Converting the email message datetime from string to unix time by SiemplifyUtils functions datetime_email_message = message['Date'] string_to_datetime = convert_string_to_datetime(datetime_email_message) datetime_in_unix_time = convert_datetime_to_unix_time(string_to_datetime) found_urls_in_email_body = find_url_in_email_message_body(siemplify, email_messages_data_list) # Getting the unique id of each email message and removing the suffix '@mail.gmail.com' from the Message-ID, Each alert id can be ingested to the system only once. alert_id = message['Message-ID'].replace('@mail.gmail.com','') # Creating the event by calling create_event() function created_event = create_event(siemplify, alert_id, message, found_urls_in_email_body, datetime_in_unix_time) # Creating the alert by calling create_alert() function created_alert = create_alert(siemplify, alert_id, message, datetime_in_unix_time, created_event) # Checking that the created_alert is not None if created_alert is not None: alerts.append(created_alert) siemplify.LOGGER.info(f'Added Alert {alert_id} to package results') # If the inbox for the user has no unread emails. else: siemplify.LOGGER.info(f'The inbox for user {username} has no unread emails') # Returning all the created alerts to the cases module in Siemplify siemplify.return_package(alerts) if __name__ == '__main__': # Connectors run in iterations. The interval is configurable from the ConnectorsScreen UI. is_test_run = not (len(sys.argv) < 2 or sys.argv[1] == 'True') main(is_test_run)
Now that we have copied the connectors code we will go over the relevant modules that need to be imported and continue with the main function. Afterwards we will elaborate on each method that was called from the main function.
The relevant imports
A Python module has a set of functions, classes or variables defined and implemented. In order to achieve all the functions below we imported those modules into our script.
from SiemplifyConnectors import SiemplifyConnectorExecution # This module is responsible for executing the connector from SiemplifyConnectorsDataModel import AlertInfo # The data model that contains the alert info class from SiemplifyUtils import output_handler, convert_datetime_to_unix_time, convert_string_to_datetime # The functions that convert time import email, imaplib, sys, re
Main function
The main function is the start point of the script. The Python interpreter executes the code sequentially and calls each method that is part of the code.
- Extract connector params – as you can see in the code
copied into the IDE, we use the
siemplify.extract_connector_param
function which extracts each of the parameters we configured for the connector (username, password, imap_host, imap_port, folder_to_check).# Extracting the connector's Params username = siemplify.extract_connector_param(param_name="Username") password = siemplify.extract_connector_param(param_name="Password") imap_host = siemplify.extract_connector_param(param_name="IMAP Server Address") imap_port = siemplify.extract_connector_param(param_name="IMAP Port") folder_to_check = siemplify.extract_connector_param(param_name="Folder to check for emails")
- We will use the function
get_email_messages_data(imap_host, imap_port, username, password, folder_to_check)
in order to get all the information collected from the unread emails (We will elaborate on this function in another step).# Getting the digested email message data email_messages_data_list = get_email_messages_data(imap_host, imap_port, username, password, folder_to_check)
- After we have received all the information in the email we will check that
the information has indeed been collected, and then we will perform a number of
actions on each email:
# If the email_messages_data_list is not empty if len(email_messages_data_list) > 0: for message in email_messages_data_list: # Converting the email message datetime from string to unix time by SiemplifyUtils functions
- This code extracts the message date by
datetime_email_message = message['Date']
and then converts this date time to Unix time using Chronicle SOAR functions:
string_to_datetime = convert_string_to_datetime(datetime_email_message) datetime_in_unix_time = convert_datetime_to_unix_time(string_to_datetime)
- We then search for URLs (if the email has a URL we will use other products
in our playbook to check if the URL is malicious) in the email message body by
using the function below
find_url_in_email_message_body(siemplify email_messages_data_list)
(We will elaborate on this function in another step). - Extract the unique ID of each email message, and assign it to the alert_id
variable.
# Getting the unique id of each email message and removing the suffix '@mail.gmail.com' from the Message-ID, Each alert id can be ingested to the system only once. alert_id = message['Message-ID'].replace('@mail.gmail.com','')
- After we extracted all the necessary information for ingesting the alert into the Chronicle SOAR platform, we can create the alert and the event (We will elaborate on these functions in another step):
- Next we will validate the created alert and the created event. After validating we will add the alert to the alert list.
- In a situation that the inbox for the given user has no unread emails we
have added the following code:
else: siemplify.LOGGER.info(f"The inbox for user {username} has no unread emails")
- At the end we will return the alerts list to the system and each alert will
be presented as a case in the case queue.
# Returning all the created alerts to the cases module in Siemplify siemplify.return_package(alerts)
- This step is responsible for running the Main function within the times we
set in the Connector configuration:
if __name__ == "__main__": # Connectors run in iterations. The interval is configurable from the Connectors UI. is_test_run = not (len(sys.argv) < 2 or sys.argv[1] == 'True') main(is_test_run)
found_urls_in_email_body = find_url_in_email_message_body(siemplify, email_messages_data_list)
# Creating the event by calling create_event() function created_event = create_event(siemplify, alert_id, message, found_urls_in_email_body, datetime_in_unix_time) # Creating the alert by calling create_alert() function created_alert = create_alert(siemplify, alert_id, message, datetime_in_unix_time, created_event)
# Checking that the created_alert is not None if created_alert is not None: alerts.append(created_alert) siemplify.LOGGER.info(f"Added Alert {alert_id} to package results")
Getting the unread email message
This function is responsible for connecting to the email by the "Imap" and "Email" models and retrieving the information of the email message. Finally, the function returns a list containing all the information of all the unread email messages.
- From the main class we will use the function:
get_email_messages_data(imap_host, imap_port, username, password, folder_to_check)
.def get_email_messages_data(imap_host, imap_port, username, password, folder_to_check): """Returns all unread email messages""" email_messages_data_list = []
- After that we will connect to the email by using the 'imap' module.
# Login to email using 'imap' module mail = imaplib.IMAP4_SSL(imap_host, imap_port) mail.login(username, password)
- We will then determine the folder in the email to check for unread messages.
In this example we will extract emails from the 'inbox' folder
(DEFAULT_FOLDER_TO_CHECK_INBOX = "inbox")
. - We then collect all the unread messages (DEFAULT_MESSAGES_TO_READ_UNSEEN =
"UNSEEN"), and then convert this data to a list.
# Storing the email message data result, data = mail.search(None, DEFAULT_MESSAGES_TO_READ_UNSEEN) # If there are several emails collected in the cycle it will split each # email message into a separate item in the list chosen_mailbox_items_list if len(data) > 0: chosen_mailbox_items_list = data[0].split() # Iterating each email message and appending to emails_messages_data_list for item in chosen_mailbox_items_list: typ, email_data = mail.fetch(item, '(RFC 822)') # Decoding from binary string to string raw_email = email_data[0][1].decode("utf-8") # Turning the email data into an email object email_message = email.message_from_string(raw_email) # Appending the email message data to email_messages_data_list email_messages_data_list.append(email_message) return email_messages_data_list
# Determining the default email folder to pull emails from - 'inbox' if folder_to_check is None: folder_to_check = DEFAULT_FOLDER_TO_CHECK_INBOX # Selecting the email folder to pull the data from mail.select(folder_to_check)
Creating the event
This function is responsible for creating the event by associating each email message component to the event fields respectively.
- From the main class we will create the event by using the
function:
create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time)
-
We will create a dictionary with the event fields while the mandatory fields are:
event["StartTime"], event["EndTime"], event["event_name"] and event["device_product"]
. - Each alert contains one or more
events. In this example we will demonstrate an alert that contains only one
event which is a single email message.
Therefore, after creating the event we will create the alert that contains all the event information.
def create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time): """Returns the digested data of a single unread email""" siemplify.LOGGER.info(f"--- Started processing Event: alert_id: {alert_id} | event_id: {alert_id}")
event = {} event["StartTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime event["EndTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime event["event_name"] = "Suspicious email" event["device_product"] = PRODUCT # ie: "device_product" is the field name that describes the product the event originated from. event["Subject"] = email_message_data["Subject"] event["SourceUserName"] = email_message_data["From"] event["DestinationUserName"] = email_message_data["To"] event["found_url"] = ",".join(all_found_url_in_emails_body_list) siemplify.LOGGER.info(f"---Finished processing Event: alert_id: {alert_id} | event_id: {alert_id}") return event
Creating the alert info and initializing the alert info characteristics fields
This function is responsible for creating the alert, each alert contains one or more events within it. In our case each alert contains one event which is basically one email message.
- From the main class we will create
the alert by using the function:
create_alert(siemplify, alert_id, email_message_data, datetime_in_unix_time, created_event)
- After creating the alert info we will validate the created event and then append the event information to the alert info characteristics.
def create_alert(siemplify, alert_id, email_message_data,datetime_in_unix_time, created_event): """Returns an alert which is one event that contains one unread email message""" siemplify.LOGGER.info(f"-------------- Started processing Alert {alert_id}") create_event = None
Creating the alert info instance and initializing the alert info characteristics fields:
# Initializes the alert_info Characteristics Fields alert_info.display_id = f"{alert_id}" alert_info.ticket_id = f"{alert_id}" alert_info.name = email_message_data['Subject'] alert_info.rule_generator = RULE_GENERATOR_EXAMPLE alert_info.start_time = datetime_in_unix_time alert_info.end_time = datetime_in_unix_time alert_info.device_vendor = VENDOR alert_info.device_product = PRODUCT
siemplify.LOGGER.info(f"Events creating started for alert {alert_id}") try: if created_event is not None: alert_info.events.append(created_event) siemplify.LOGGER.info(f"Added Event {alert_id} to Alert {alert_id}") # Raise an exception if failed to process the event except Exception as e: siemplify.LOGGER.error(f"Failed to process event {alert_id}") siemplify.LOGGER.exception(e) return alert_info
Finding the URL in the email body
This function checks if the body of the email has one or more URLs.
For each email message we will need to access
the email body by searching the email message part that contains text or plain
type information.
def find_url_in_email_message_body(siemplify, email_messages_data_list):
"""
Search for a url in the email body,
"""
all_found_url_in_emails_body_list = []
for message in email_messages_data_list:
for part in message.walk():
if part.get_content_maintype() == 'text\plain':
continue
If the body contains the wanted type of information we will
load this information by email_message_body =
part.get_payload()
.
After loading all the information we can now search the URL by using the regex format:
URLS_REGEX=r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
This extracts the URLs from the email body.
email_message_body = part.get_payload() all_found_urls = re.findall(URLS_REGEX, str(email_message_body)) for url in all_found_urls: if url not in all_found_url_in_emails_body_list: all_found_url_in_emails_body_list.append(url) siemplify.LOGGER.info(f"The URL found : {all_found_url_in_emails_body_list}") return all_found_url_in_emails_body_list
We have finished going through the connector code and we will now configure a connector that will ingest cases into the platform from a selected email inbox in Gmail.