Developing the connector

Supported in:
  1. From the IDE screen, click in the upper left hand corner to add a new IDE item. Select the Integration radio button and give the integration the name: "Email Connector".
  2. The integration will be created and listed on the left hand side with a default icon. Clicking on the will bring up the Integration Settings where the Icon, Description, Python Dependencies and Integration Parameters can be defined.
  3. Next, click and add a new IDE item. Select the Connector radio button and give the connector the name: "My Email Connector". Next, select the integration "Email Connector" to associate the connector with the integration.
  4. After creating the connector, set the following connector parameters:
    Parameter Name Description Mandatory Type Default Value Explanation
    Username IMAP User name Yes String email@gmail.com The email address from which the connector will pull the emails into Chronicle SOAR platform
    Password IMAP Password Yes Password The password associated with the email address from which the connector will pull the emails into Chronicle SOAR platform
    IMAP Port Imap port. e.g. 993 Yes Int 993 The Internet Message Access Protocol (IMAP) is a mail protocol used for accessing emails on a remote web server from a local client.
    IMAP Server Address e.g. imap.gmail.com Yes String imap.google.com The incoming mail server for an IMAP account can also be called the IMAP server. In this example, the email provider is google.com, and the incoming mail server is imap.google.com.
    Folder to check for emails Pulls emails only from the specified folder No String Inbox The folder from which the emails will be retrieved, For example: Inbox
  5. Next, in the upper right fill out the fields:
    • "Product Field Name" = device_product, determines which value from the raw fields would be assigned to the product name of the alert. You can find the related field in the code in line 57 which was defined as "Mail" (product).
      event["device_product"] = PRODUCT #The PRODUCT constant is "Mail"
    • "Event Field Name" = event_name, determines which value from the raw fields would be assigned to the event type field. You can find the related field in the code in line 56 which was defined as "Suspicious email".
      event["event_name"] = "Suspicious email"

Edit the email connector

Copy the code below created for the "My Email Connector", paste it in the IDE and follow the instructions.
from SiemplifyConnectors import SiemplifyConnectorExecution
from SiemplifyConnectorsDataModel import AlertInfo
from SiemplifyUtils import output_handler, convert_datetime_to_unix_time, convert_string_to_datetime
import email, imaplib, sys, re

# CONSTANTS
CONNECTOR_NAME = "Mail"
VENDOR = "Mail"
PRODUCT = "Mail"
DEFAULT_PRIORITY = 60 # Default is Medium
RULE_GENERATOR_EXAMPLE = "Mail"
DEFAULT_FOLDER_TO_CHECK_INBOX = "inbox"
DEFAULT_MESSAGES_TO_READ_UNSEEN = "UNSEEN"
URLS_REGEX = r"(?i)\b(?:http(?:s)?:\/\/)?(?:www\.)?[a-zA-Z0-9:%_\+~#=][a-zA-Z0-9:%\._\+~#=]{1,255}\.[a-z]{2,6}\b(?:[-a-zA-Z0-9@:%_\+.~#?&//=]*)"


def create_alert(siemplify, alert_id, email_message_data, datetime_in_unix_time, created_event):
    """Returns an alert which is one event that contains one unread email message"""
    siemplify.LOGGER.info(f"Started processing Alert {alert_id}")

    create_event = None
    alert_info = AlertInfo()
    # Initializes alert_info
    alert_info.display_id = f"{alert_id}" # Each alert needs to have a unique id, otherwise it won't create a case with the same alert id.
    alert_info.ticket_id = f"{alert_id}" # In default, ticket_id = display_id. However, if for some reason the external alert id is different from the display_id, you can save the original external alert id in the "ticket_id" field. 
    alert_info.name = email_message_data['Subject']
    alert_info.rule_generator = RULE_GENERATOR_EXAMPLE # The name of the siem rule which causes the creation of the alert. 
    alert_info.start_time = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
    alert_info.end_time = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime
    alert_info.priority = 60 # Informative = -1,Low = 40,Medium = 60,High = 80,Critical = 100.
    alert_info.device_vendor = VENDOR # The field will be fetched from the Original Alert. If you build this alert manually, state the source vendor of the data. (ie: Microsoft, Mcafee) 
    alert_info.device_product = PRODUCT # The field will be fetched from the Original Alert. If you build this alert manually, state the source product of the data. (ie: ActiveDirectory, AntiVirus)

    siemplify.LOGGER.info(f"Events creating started for alert {alert_id}")
    try:
        if created_event is not None:
            alert_info.events.append(created_event)
            siemplify.LOGGER.info(f"Added Event {alert_id} to Alert {alert_id}")
    # Raise an exception if failed to process the event
    except Exception as e:
        siemplify.LOGGER.error(f"Failed to process event {alert_id}")
        siemplify.LOGGER.exception(e)
    return alert_info


def create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time):
    """Returns the digested data of a single unread email"""
    siemplify.LOGGER.info(f"--- Started processing Event: alert_id: {alert_id} | event_id: {alert_id}") 
    event = {} 
    event["StartTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
    event["EndTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
    event["event_name"] = "Suspicious email" 
    event["device_product"] = PRODUCT # ie: "device_product" is the field name that describes the product the event originated from. 
    event["Subject"] = email_message_data["Subject"] 
    event["SourceUserName"] = email_message_data["From"] 
    event["DestinationUserName"] = email_message_data["To"] 
    event["found_url"] = ",".join(all_found_url_in_emails_body_list) 
    siemplify.LOGGER.info(f"---Finished processing Event: alert_id: {alert_id} | event_id: {alert_id}")
    return event


def find_url_in_email_message_body(siemplify, email_messages_data_list):
    """Search for a url in the email body"""
    all_found_url_in_emails_body_list = []
    for message in email_messages_data_list:
        for part in message.walk():
            if part.get_content_maintype() == 'text\plain':
                continue
            email_message_body = part.get_payload()
            all_found_urls = re.findall(URLS_REGEX, str(email_message_body))
            for url in all_found_urls:
                if url not in all_found_url_in_emails_body_list:
                    all_found_url_in_emails_body_list.append(url)


def get_email_messages_data(imap_host, imap_port, username, password, folder_to_check):
    """Returns all unread email messages"""
    email_messages_data_list = []

    # Login to email using 'imap' module
    mail = imaplib.IMAP4_SSL(imap_host, imap_port)
    mail.login(username, password)

    # Determining the default email folder to pull emails from - 'inbox'
    if folder_to_check is None:
        folder_to_check = DEFAULT_FOLDER_TO_CHECK_INBOX

    # Selecting the email folder to pull the data from
    mail.select(folder_to_check)

    # Storing the email message data
    result, data = mail.search(None, DEFAULT_MESSAGES_TO_READ_UNSEEN)

    # If there are several emails collected in the cycle it will split each
    # email message into a separate item in the list chosen_mailbox_items_list
    if len(data) > 0:
        chosen_mailbox_items_list = data[0].split()
        # Iterating each email message and appending to emails_messages_data_list
        for item in chosen_mailbox_items_list:
            typ, email_data = mail.fetch(item, '(RFC 822)')
            # Decoding from binary string to string
            raw_email = email_data[0][1].decode("utf-8")
            # Turning the email data into an email object
            email_message = email.message_from_string(raw_email)
            # Appending the email message data to email_messages_data_list
            email_messages_data_list.append(email_message)
    return email_messages_data_list


@output_handler
def main(is_test_run):
    alerts = [] # The main output of each connector run that contains the alerts data 
    siemplify = SiemplifyConnectorExecution() # Siemplify main SDK wrapper
    siemplify.script_name = CONNECTOR_NAME

    # In case of running a test
    if (is_test_run):
        siemplify.LOGGER.info("This is an \"IDE Play Button\"\\\"Run Connector once\" test run")

    # Extracting the connector's Params
    username = siemplify.extract_connector_param(param_name="Username") 
    password = siemplify.extract_connector_param(param_name="Password") 
    imap_host = siemplify.extract_connector_param(param_name="IMAP Server Address")
    imap_port = siemplify.extract_connector_param(param_name="IMAP Port")
    folder_to_check = siemplify.extract_connector_param(param_name="Folder to check for emails")

    # Getting the digested email message data
    email_messages_data_list = get_email_messages_data(imap_host, imap_port, username, password, folder_to_check)
    # If the email_messages_data_list is not empty
    if len(email_messages_data_list) > 0:
        for message in email_messages_data_list:
            # Converting the email message datetime from string to unix time by SiemplifyUtils functions
            datetime_email_message = message['Date']
            string_to_datetime = convert_string_to_datetime(datetime_email_message)
            datetime_in_unix_time = convert_datetime_to_unix_time(string_to_datetime)
            found_urls_in_email_body = find_url_in_email_message_body(siemplify, email_messages_data_list)
            # Getting the unique id of each email message and removing the suffix '@mail.gmail.com' from the Message-ID, Each alert id can be ingested to the system only once.
            alert_id = message['Message-ID'].replace('@mail.gmail.com','') 
            # Creating the event by calling create_event() function 
            created_event = create_event(siemplify, alert_id, message, found_urls_in_email_body, datetime_in_unix_time) 
            # Creating the alert by calling create_alert() function 
            created_alert = create_alert(siemplify, alert_id, message, datetime_in_unix_time, created_event)
            # Checking that the created_alert is not None 
            if created_alert is not None: 
                alerts.append(created_alert) 
                siemplify.LOGGER.info(f'Added Alert {alert_id} to package results')
    # If the inbox for the user has no unread emails. 
    else: 
        siemplify.LOGGER.info(f'The inbox for user {username} has no unread emails') 
    # Returning all the created alerts to the cases module in Siemplify 
    siemplify.return_package(alerts) 


if __name__ == '__main__':
# Connectors run in iterations. The interval is configurable from the ConnectorsScreen UI. 
    is_test_run = not (len(sys.argv) < 2 or sys.argv[1] == 'True') 
    main(is_test_run)

Now that we have copied the connectors code we will go over the relevant modules that need to be imported and continue with the main function. Afterwards we will elaborate on each method that was called from the main function.

The relevant imports

A Python module has a set of functions, classes or variables defined and implemented. In order to achieve all the functions below we imported those modules into our script.

from SiemplifyConnectors import SiemplifyConnectorExecution # This module is responsible for executing the connector 
from SiemplifyConnectorsDataModel import AlertInfo # The data model that contains the alert info class
from SiemplifyUtils import output_handler, convert_datetime_to_unix_time, convert_string_to_datetime # The functions that convert time 
import email, imaplib, sys, re

Main function

The main function is the start point of the script. The Python interpreter executes the code sequentially and calls each method that is part of the code.

  1. Extract connector params – as you can see in the code copied into the IDE, we use the siemplify.extract_connector_paramfunction which extracts each of the parameters we configured for the connector (username, password, imap_host, imap_port, folder_to_check).
    # Extracting the connector's Params
    username = siemplify.extract_connector_param(param_name="Username") 
    password = siemplify.extract_connector_param(param_name="Password") 
    imap_host = siemplify.extract_connector_param(param_name="IMAP Server Address") 
    imap_port = siemplify.extract_connector_param(param_name="IMAP Port") 
    folder_to_check = siemplify.extract_connector_param(param_name="Folder to check for emails")
    
  2. We will use the function get_email_messages_data(imap_host, imap_port, username, password, folder_to_check) in order to get all the information collected from the unread emails (We will elaborate on this function in another step).
    # Getting the digested email message data 
    email_messages_data_list = get_email_messages_data(imap_host, imap_port, username, password, folder_to_check) 
  3. After we have received all the information in the email we will check that the information has indeed been collected, and then we will perform a number of actions on each email:
    # If the email_messages_data_list is not empty 
    if len(email_messages_data_list) > 0:
       for message in email_messages_data_list:
           # Converting the email message datetime from string to unix time by SiemplifyUtils functions
    
    • This code extracts the message date by datetime_email_message = message['Date']and then converts this date time to Unix time using Chronicle SOAR functions:
      string_to_datetime = convert_string_to_datetime(datetime_email_message) 
      datetime_in_unix_time = convert_datetime_to_unix_time(string_to_datetime) 
  4. We then search for URLs (if the email has a URL we will use other products in our playbook to check if the URL is malicious) in the email message body by using the function below find_url_in_email_message_body(siemplify email_messages_data_list)(We will elaborate on this function in another step).
  5. found_urls_in_email_body = find_url_in_email_message_body(siemplify, email_messages_data_list)
  6. Extract the unique ID of each email message, and assign it to the alert_id variable.
    # Getting the unique id of each email message and removing the suffix '@mail.gmail.com' from the Message-ID, Each alert id can be ingested to the system only once. 
    alert_id = message['Message-ID'].replace('@mail.gmail.com','') 
  7. After we extracted all the necessary information for ingesting the alert into the Chronicle SOAR platform, we can create the alert and the event (We will elaborate on these functions in another step):
  8. # Creating the event by calling create_event() function 
    created_event = create_event(siemplify, alert_id, message, found_urls_in_email_body, datetime_in_unix_time) 
    # Creating the alert by calling create_alert() function 
    created_alert = create_alert(siemplify, alert_id, message, datetime_in_unix_time, created_event)
  9. Next we will validate the created alert and the created event. After validating we will add the alert to the alert list.
  10. # Checking that the created_alert is not None 
    if created_alert is not None: 
        alerts.append(created_alert)     
        siemplify.LOGGER.info(f"Added Alert {alert_id} to package results")    
    
  11. In a situation that the inbox for the given user has no unread emails we have added the following code:
    else:
        siemplify.LOGGER.info(f"The inbox for user {username} has no unread emails")    
    
  12. At the end we will return the alerts list to the system and each alert will be presented as a case in the case queue.
    # Returning all the created alerts to the cases module in Siemplify 
    siemplify.return_package(alerts)
    
  13. This step is responsible for running the Main function within the times we set in the Connector configuration:
    if __name__ == "__main__": 
        # Connectors run in iterations. The interval is configurable from the Connectors UI. 
        is_test_run = not (len(sys.argv) < 2 or sys.argv[1] == 'True')
        main(is_test_run)  
    

Getting the unread email message

This function is responsible for connecting to the email by the "Imap" and "Email" models and retrieving the information of the email message. Finally, the function returns a list containing all the information of all the unread email messages.

  1. From the main class we will use the function: get_email_messages_data(imap_host, imap_port, username, password, folder_to_check).
    def get_email_messages_data(imap_host, imap_port, username, password, folder_to_check):
        """Returns all unread email messages"""
        email_messages_data_list = []
    
  2. After that we will connect to the email by using the 'imap' module.
    # Login to email using 'imap' module
    mail = imaplib.IMAP4_SSL(imap_host, imap_port)
    mail.login(username, password)
    
  3. We will then determine the folder in the email to check for unread messages. In this example we will extract emails from the 'inbox' folder (DEFAULT_FOLDER_TO_CHECK_INBOX = "inbox").
  4. # Determining the default email folder to pull emails from - 'inbox'
    if folder_to_check is None:
        folder_to_check = DEFAULT_FOLDER_TO_CHECK_INBOX
    # Selecting the email folder to pull the data from mail.select(folder_to_check)
    
  5. We then collect all the unread messages (DEFAULT_MESSAGES_TO_READ_UNSEEN = "UNSEEN"), and then convert this data to a list.
    # Storing the email message data
    result, data = mail.search(None, DEFAULT_MESSAGES_TO_READ_UNSEEN)
    # If there are several emails collected in the cycle it will split each
    # email message into a separate item in the list chosen_mailbox_items_list
    if len(data) > 0:
        chosen_mailbox_items_list = data[0].split()
        # Iterating each email message and appending to emails_messages_data_list
        for item in chosen_mailbox_items_list:
            typ, email_data = mail.fetch(item, '(RFC 822)')
            # Decoding from binary string to string
            raw_email = email_data[0][1].decode("utf-8")
            # Turning the email data into an email object
            email_message = email.message_from_string(raw_email)
            # Appending the email message data to email_messages_data_list
            email_messages_data_list.append(email_message)
    return email_messages_data_list
    

Creating the event

This function is responsible for creating the event by associating each email message component to the event fields respectively.

  1. From the main class we will create the event by using the function: create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time)
  2. def create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time):
        """Returns the digested data of a single unread email"""
        siemplify.LOGGER.info(f"--- Started processing Event: alert_id: {alert_id} | event_id: {alert_id}") 
    
  3. We will create a dictionary with the event fields while the mandatory fields are: event["StartTime"], event["EndTime"], event["event_name"] and event["device_product"] .
  4. event = {} 
    event["StartTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
    event["EndTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime 
    event["event_name"] = "Suspicious email" 
    event["device_product"] = PRODUCT # ie: "device_product" is the field name that describes the product the event originated from. 
    event["Subject"] = email_message_data["Subject"] 
    event["SourceUserName"] = email_message_data["From"] 
    event["DestinationUserName"] = email_message_data["To"] 
    event["found_url"] = ",".join(all_found_url_in_emails_body_list) 
    siemplify.LOGGER.info(f"---Finished processing Event: alert_id: {alert_id} | event_id: {alert_id}")
    return event
    
  5. Each alert contains one or more events. In this example we will demonstrate an alert that contains only one event which is a single email message.
    Therefore, after creating the event we will create the alert that contains all the event information.

Creating the alert info and initializing the alert info characteristics fields

This function is responsible for creating the alert, each alert contains one or more events within it. In our case each alert contains one event which is basically one email message.

  1. From the main class we will create the alert by using the function: create_alert(siemplify, alert_id, email_message_data, datetime_in_unix_time, created_event)
  2. def create_alert(siemplify, alert_id, email_message_data,datetime_in_unix_time, created_event): 
        """Returns an alert which is one event that contains one unread email message"""
        siemplify.LOGGER.info(f"-------------- Started processing Alert {alert_id}")
        create_event = None
    

    Creating the alert info instance and initializing the alert info characteristics fields:

    # Initializes the alert_info Characteristics Fields 
    alert_info.display_id = f"{alert_id}" 
    alert_info.ticket_id = f"{alert_id}" 
    alert_info.name = email_message_data['Subject'] 
    alert_info.rule_generator = RULE_GENERATOR_EXAMPLE 
    alert_info.start_time = datetime_in_unix_time 
    alert_info.end_time = datetime_in_unix_time 
    alert_info.device_vendor = VENDOR 
    alert_info.device_product = PRODUCT 
    
  3. After creating the alert info we will validate the created event and then append the event information to the alert info characteristics.
  4. siemplify.LOGGER.info(f"Events creating started for alert {alert_id}")
    try:
        if created_event is not None:
            alert_info.events.append(created_event)
        siemplify.LOGGER.info(f"Added Event {alert_id} to Alert {alert_id}")
    # Raise an exception if failed to process the event
    except Exception as e:
        siemplify.LOGGER.error(f"Failed to process event {alert_id}")
        siemplify.LOGGER.exception(e)
    return alert_info
    

    Finding the URL in the email body

    This function checks if the body of the email has one or more URLs.
    For each email message we will need to access the email body by searching the email message part that contains text or plain type information.

    def find_url_in_email_message_body(siemplify, email_messages_data_list):
        """
        Search for a url in the email body,
        """
        all_found_url_in_emails_body_list = []
        for message in email_messages_data_list:
            for part in message.walk():
                if part.get_content_maintype() == 'text\plain':
                    continue
                
    

    If the body contains the wanted type of information we will load this information by email_message_body = part.get_payload().

    After loading all the information we can now search the URL by using the regex format:

    
    URLS_REGEX=r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
    

    This extracts the URLs from the email body.

    email_message_body = part.get_payload()
    all_found_urls = re.findall(URLS_REGEX, str(email_message_body))
    for url in all_found_urls:
        if url not in all_found_url_in_emails_body_list:
            all_found_url_in_emails_body_list.append(url)
    siemplify.LOGGER.info(f"The URL found : {all_found_url_in_emails_body_list}") 
    return all_found_url_in_emails_body_list 

    We have finished going through the connector code and we will now configure a connector that will ingest cases into the platform from a selected email inbox in Gmail.