コネクタの開発

  1. IDE 画面で、左上隅にある をクリックして新しい IDE 項目を追加します。[Integration] ラジオボタンを選択し、統合の名前として「Email Connector」を指定します。
  2. 統合が作成され、左側にデフォルトのアイコンが表示されます。 をクリックすると、統合設定が表示されます。ここで、アイコン、説明、Python の依存関係、統合パラメータを定義できます。
  3. 次に、[] をクリックして新しい IDE アイテムを追加します。[Connector] ラジオボタンを選択し、コネクタの名前「My Email Connector」を指定します。次に、統合「Email Connector」を選択して、コネクタを統合に関連付けます。
  4. コネクタを作成したら、次のコネクタ パラメータを設定します。
    パラメータ名 説明 必須 種類 デフォルト値 説明
    ユーザー名 IMAP ユーザー名 はい 文字列 email@gmail.com コネクタが Chronicle SOAR プラットフォームにメールを pull するメールアドレス
    パスワード IMAP パスワード はい パスワード コネクタが Chronicle SOAR プラットフォームにメールを pull するメールアドレスに関連付けられたパスワード
    IMAP ポート IMAP ポート。例: 993 はい Int 993 インターネット メッセージ アクセス プロトコル(IMAP)は、リモート ウェブサーバー上のメールにローカル クライアントからアクセスするために使用されるメール プロトコルです。
    IMAP サーバー アドレス 例: imap.gmail.com はい 文字列 imap.google.com IMAP アカウントの受信メールサーバーは、IMAP サーバーとも呼ばれます。この例では、メール プロバイダは google.com、受信メールサーバーは imap.google.com です。
    メールを確認するフォルダ 指定したフォルダからのみメールを pull する × 文字列 受信トレイ メールの取得元であるフォルダ。例: Inbox
  5. 次に、右上の以下のフィールドに入力します。
    • "Product Field Name"= device_product は、アラートのプロダクト名に割り当てる未加工フィールドの値を決定します。関連するフィールドは、57 行目の「Mail」(プロダクト)として定義されたコードの行にあります。
      event["device_product"] = PRODUCT #The PRODUCT constant is "Mail"
    • "Event Field Name" = event_name は、イベントタイプ フィールドに割り当てる未加工フィールドの値を決定します。関連するフィールドは、56 行目の「不審なメール」として定義されたコードで確認できます。
      event["event_name"] = "Suspicious email"

メールコネクタを編集する

「My Email Connector」用に作成した以下のコードをコピーし、IDE に貼り付けて手順に従います。

from SiemplifyConnectors import
SiemplifyConnectorExecution from SiemplifyConnectorsDataModel import AlertInfo
from SiemplifyUtils import output_handler, convert_datetime_to_unix_time,
convert_string_to_datetime import email, imaplib, sys, re
# CONSTANTS
CONNECTOR_NAME = "Mail" VENDOR = "Mail" PRODUCT = "Mail" DEFAULT_PRIORITY = 60 #
Default is Medium RULE_GENERATOR_EXAMPLE = "Mail" DEFAULT_FOLDER_TO_CHECK_INBOX
= "inbox" DEFAULT_MESSAGES_TO_READ_UNSEEN = "UNSEEN" URLS_REGEX =
r"(?i)\b(?:http(?:s)?:\/\/)?(?:www\.)?[a-zA-Z0-9:%_\+~#=][a-zA-Z0-9:%\._\+~#=]{1,255}\.[a-z]{2,6}\b(?:[-a-zA-Z0-9@:%_\+.~#?&//=]*)"
def create_alert(siemplify, alert_id, email_message_data, datetime_in_unix_time,
created_event):
    """
    Returns an alert which is one event that contains one unread email message
    """
    siemplify.LOGGER.info(f"-------------- Started processing Alert {alert_id}")
    create_event = None
    alert_info = AlertInfo()
    # Initializes the alert_info Characteristics Fields
    alert_info.display_id = f"{alert_id}" # Each alert needs to have a unique
    id, otherwise it won't create a case with the same alert id.
    alert_info.ticket_id = f"{alert_id}" # In default, ticket_id = display_id.
    However, if for some reason the external alert id is different from the
    display_id, you can save the original external alert id in the "ticket_id"
    field. alert_info.name = email_message_data['Subject']
    alert_info.rule_generator = RULE_GENERATOR_EXAMPLE # The name of the siem
    rule which causes the creation of the alert. alert_info.start_time =
    datetime_in_unix_time # Time should be saved in UnixTime. You may use
    SiemplifyUtils.convert_datetime_to_unix_time, or
    SiemplifyUtils.convert_string_to_datetime alert_info.end_time =
    datetime_in_unix_time # Time should be saved in UnixTime. You may use
    SiemplifyUtils.convert_datetime_to_unix_time, or
    SiemplifyUtils.convert_string_to_datetime alert_info.priority = 60 #
    Informative = -1,Low = 40,Medium = 60,High = 80,Critical = 100.
    alert_info.device_vendor = VENDOR # The field will be fetched from the
    Original Alert. If you build this alert manually, state the source vendor of
    the data. (ie: Microsoft, Mcafee) alert_info.device_product = PRODUCT # The
    field will be fetched from the Original Alert. If you build this alert
    manually, state the source product of the data. (ie: ActiveDirectory,
    AntiVirus)
    # ----------------------------- Alert Fields initialization END -----------------------------#
    siemplify.LOGGER.info(f"---------- Events creating started for alert  {alert_id}")
    try:
        if created_event is not None:
            alert_info.events.append(created_event)
        siemplify.LOGGER.info(f"Added Event {alert_id} to Alert {alert_id}")
    # Raise an exception if failed to process the event
    except Exception as e:
        siemplify.LOGGER.error(f"Failed to process event {alert_id}")
        siemplify.LOGGER.exception(e)
    return alert_info
def create_event(siemplify, alert_id, email_message_data,
all_found_url_in_emails_body_list, datetime_in_unix_time):
    """
    Returns the digested data of a single unread email
    """
    siemplify.LOGGER.info(f"--- Started processing Event: alert_id: {alert_id} |
    event_id: {alert_id}") event = {} event["StartTime"] = datetime_in_unix_time
    # Time should be saved in UnixTime. You may use
    SiemplifyUtils.convert_datetime_to_unix_time, or
    SiemplifyUtils.convert_string_to_datetime event["EndTime"] =
    datetime_in_unix_time # Time should be saved in UnixTime. You may use
    SiemplifyUtils.convert_datetime_to_unix_time, or
    SiemplifyUtils.convert_string_to_datetime event["event_name"] = "Suspicious
    email" event["device_product"] = PRODUCT # ie: "device_product" is the field
    name that describes the product the event originated from. event["Subject"]
    = email_message_data["Subject"] event["SourceUserName"] =
    email_message_data["From"] event["DestinationUserName"] =
    email_message_data["To"] event["found_url"] =
    ",".join(all_found_url_in_emails_body_list) siemplify.LOGGER.info(f"---
    Finished processing Event: alert_id: {alert_id} | event_id: {alert_id}")
    return event
def find_url_in_email_message_body(siemplify, email_messages_data_list):
    """
    Search for a url in the email body,
    """
    all_found_url_in_emails_body_list = []
    for message in email_messages_data_list:
        for part in message.walk():
            if part.get_content_maintype() == 'text\plain':
                continue

email_message_body = part.get_payload()
        all_found_urls = re.findall(URLS_REGEX, str(email_message_body))
        for url in all_found_urls:
            if url not in all_found_url_in_emails_body_list:
                all_found_url_in_emails_body_list.append(url)

>def get_email_messages_data(imap_host, imap_port, username, password,
>folder_to_check):
    """
    Returns all unread email messages
    """
    email_messages_data_list = []
    # Login to email using 'imap' module
    mail = imaplib.IMAP4_SSL(imap_host, imap_port)
    mail.login(username, password)
    # Determining the default email folder to pull emails from - 'inbox'
    if folder_to_check is None:
        folder_to_check = DEFAULT_FOLDER_TO_CHECK_INBOX
    # Selecting the email folder to pull the data from
    mail.select(folder_to_check)
    # Storing the email message data
    result, data = mail.search(None, DEFAULT_MESSAGES_TO_READ_UNSEEN)
    # If there are several emails collected in the cycle it will split each
    # email message into a separate item in the list chosen_mailbox_items_list
    if len(data) > 0:
        chosen_mailbox_items_list = data[0].split()
        # Iterating each email message and appending to emails_messages_data_list
        for item in chosen_mailbox_items_list:
            typ, email_data = mail.fetch(item, '(RFC 822)')
            # Decoding from binary string to string
            raw_email = email_data[0][1].decode("utf-8")
            # Turning the email data into an email object
            email_message = email.message_from_string(raw_email)
            # Appending the email message data to email_messages_data_list
            email_messages_data_list.append(email_message)
    return email_messages_data_list
@output_handler
def main(is_test_run):
    alerts = [] # The main output of each connector run that contains the alerts
    data siemplify = SiemplifyConnectorExecution() # Siemplify main SDK wrapper
    siemplify.script_name = CONNECTOR_NAME
    #In case of running a test
    if (is_test_run):
        siemplify.LOGGER.info("This is an \"IDE Play Button\"\\\"Run Connector once\" test run")
    #Extracting the connector's Params
    username = siemplify.extract_connector_param(param_name="Username") password
    = siemplify.extract_connector_param(param_name="Password") imap_host =
    siemplify.extract_connector_param(param_name="IMAP Server Address")
    imap_port = siemplify.extract_connector_param(param_name="IMAP Port")
    folder_to_check = siemplify.extract_connector_param(param_name="Folder to
    check for emails") print(folder_to_check)
    #Getting the digested email message data
    email_messages_data_list = get_email_messages_data(imap_host, imap_port,
    username, password, folder_to_check)
    #If the email_messages_data_list is not empty
    if len(email_messages_data_list) > 0:
        for message in email_messages_data_list:
            # Converting the email message datetime from string to unix time by
            # SiemplifyUtils functions:
            datetime_email_message = message['Date']
            string_to_datetime = convert_string_to_datetime(datetime_email_message)
            datetime_in_unix_time = convert_datetime_to_unix_time(string_to_datetime)
            found_urls_in_email_body = find_url_in_email_message_body(siemplify, email_messages_data_list)

# Getting the unique id of each email message and removing the suffix
# '@mail.gmail.com' from the Message-ID, Each alert id can be ingested to the
# system only once. alert_id =
# message['Message-ID'].replace('@mail.gmail.com','') # Creating the event by
# calling create_event() function created_event = create_event(siemplify,
# alert_id, message, found_urls_in_email_body, datetime_in_unix_time) # Creating
# the alert by calling create_alert() function created_alert =
# create_alert(siemplify, alert_id, message, datetime_in_unix_time,
# created_event) # Checking that the created_alert is not None if created_alert
# is not None: alerts.append(created_alert) siemplify.LOGGER.info(f'Added Alert
# {alert_id} to package results') # If the inbox for the user has no unread
# emails. else: siemplify.LOGGER.info(f'The inbox for user {username} has no
# unread emails') # Returning all the created alerts to the cases module in
# Siemplify siemplify.return_package(alerts) if __name__ == '__main__': #
# Connectors run in iterations. The interval is configurable from the
# ConnectorsScreen UI. is_test_run = not (len(sys.argv) < 2 or sys.argv[1] ==
# 'True') main(is_test_run)

コネクタコードをコピーしたので、インポートする必要がある関連モジュールを確認し、main 関数を続行します。その後、main 関数から呼び出された各メソッドについて詳しく説明します。

関連するインポート

Python モジュールには、一連の関数、クラス、変数が定義、実装されます。以下のすべての関数を実現するために、これらのモジュールをスクリプトにインポートしました。

from
SiemplifyConnectors import SiemplifyConnectorExecution # This module is
responsible for executing the connector from SiemplifyConnectorsDataModel import
AlertInfo #The data model that contains the alert info class from SiemplifyUtils
import output_handler, convert_datetime_to_unix_time, convert_string_to_datetime
#The functions that convert time import email, imaplib, sys, re 

main 関数

main 関数はスクリプトの開始点です。Python インタープリタはコードを順次実行し、コードの一部である各メソッドを呼び出します。

  1. コネクタ パラメータを抽出します。IDE にコピーされたコードからわかるように、コネクタ用に構成した各パラメータ(username、password、imap_host、imap_port、folder_to_check)を抽出する siemplify.extract_connector_param 関数を使用します。
    #Extracting the connector's Params
    username = siemplify.extract_connector_param(param_name="Username")
    password = siemplify.extract_connector_param(param_name="Password")
    imap_host = siemplify.extract_connector_param(param_name="IMAP Server Address")
    imap_port = siemplify.extract_connector_param(param_name="IMAP Port")
    folder_to_check = siemplify.extract_connector_param(param_name="Folder to check for emails")
    
  2. 関数 get_email_messages_data(imap_host, imap_port, username, password, folder_to_check) を使用して、未読メールから収集されたすべての情報を取得します(この関数の詳細については、別のステップで説明します)。
    #Getting the digested email message data
    email_messages_data_list = get_email_messages_data(imap_host, imap_port,
    username, password, folder_to_check) 
  3. メールに記載されたすべての情報を受領した後、情報が実際に収集されたかどうかを確認し、各メールに対していくつかの処理を実施します。
    #If the email_messages_data_list is not empty
    if len(email_messages_data_list) > 0:
       for message in email_messages_data_list:
           # Converting the email message datetime from string to unix time by SiemplifyUtils functions
    
    • このコードでは、datetime_email_message = message['Date'] によってメッセージの日付を抽出し、Chronicle SOAR 関数を使用して、この日時を Unix 時間に変換します。
      string_to_datetime =
      convert_string_to_datetime(datetime_email_message) datetime_in_unix_time =
      convert_datetime_to_unix_time(string_to_datetime) 
  4. 次に、以下の関数 find_url_in_email_message_body(siemplify email_messages_data_list)(この関数については、別のステップで詳しく説明します)を使用して、メール メッセージ本文の URL を検索します(メールに URL がある場合は、ハンドブック内の他のプロダクトを使用して、その URL が悪意のあるものかどうかを確認します)。
  5. found_urls_in_email_body = find_url_in_email_message_body(siemplify, email_messages_data_list)
  6. 各メール メッセージの一意の ID を抽出し、alert_id 変数に割り当てます。
    # Getting the unique id of each email message and removing the
    suffix '@mail.gmail.com' from the Message-ID, Each alert id can be ingested to
    the system only once. alert_id =
    message['Message-ID'].replace('@mail.gmail.com','') 
  7. アラートを Chronicle SOAR プラットフォームに取り込むために必要なすべての情報を抽出したら、アラートとイベントを作成できます(これらの機能については別のステップで詳しく説明します)。
  8. # Creating the event by calling create_event() function created_event = create_event(siemplify, alert_id, message, found_urls_in_email_body, datetime_in_unix_time) # Creating the alert by calling create_alert() function created_alert = create_alert(siemplify, alert_id, message, datetime_in_unix_time, created_event)
  9. 次に、作成されたアラートと作成されたイベントを検証します。検証後、アラートがアラートリストに追加されます。
  10. # Checking that
    the created_alert is not None if created_alert is not None:
           alerts.append(created_alert)
           siemplify.LOGGER.info(f"Added Alert {alert_id} to package results")
    
  11. ユーザーの受信トレイに未読メールがない場合は、次のコードを追加しました。
    else:
        siemplify.LOGGER.info(f"The inbox for user {username} has no unread emails")
    
  12. 最終的に、アラートリストをシステムに返し、各アラートがケースキューにケースとして表示されます。
    # Returning all the created alerts to the cases module in Siemplify
    siemplify.return_package(alerts)
    
  13. このステップにより、コネクタ構成で設定した時間内に main 関数が実行されます。
     if __name__ == "__main__":
    # Connectors run in iterations. The interval is configurable from the
    # ConnectorsScreen UI.
    is_test_run = not (len(sys.argv) < 2 or sys.argv[1] == 'True')
    main(is_test_run)
    

未読メール メッセージの取得

この関数は、「imap」モデルと「Email」モデルによってメールに接続し、メール メッセージの情報を取得します。最後に、この関数は未読メールに関するすべての情報を含むリストを返します。

  1. main クラスから、関数 get_email_messages_data(imap_host, imap_port, username, password, folder_to_check) を使用します。
     def get_email_messages_data(imap_host, imap_port,
    username, password, folder_to_check): """
        Returns all unread email messages
        """
        email_messages_data_list = []
    
  2. その後、「imap」モジュールを使用してメールに接続します。
    # Login to email using 'imap' module
      mail = imaplib.IMAP4_SSL(imap_host, imap_port)
      mail.login(username, password)
    
  3. 次に、メール内のフォルダを特定して、未読メッセージを確認します。 この例では、「inbox」フォルダ (DEFAULT_FOLDER_TO_CHECK_INBOX = "inbox") からメールを抽出します。
  4.   # Determining the default email folder to pull emails from - 'inbox'
        if folder_to_check is None:
          folder_to_check = DEFAULT_FOLDER_TO_CHECK_INBOX
      # Selecting the email folder to pull the data from
      mail.select(folder_to_check)
    
  5. 次に、すべての未読メッセージ(DEFAULT_MESSAGES_TO_READ_UNSEEN = "UNSEEN")を収集し、そのデータをリストに変換します。
    # Storing the email message data result, data = mail.search(None,
    # DEFAULT_MESSAGES_TO_READ_UNSEEN) # If there are several emails collected in
    # the cycle it will split each email message into a separate item in the list
    # chosen_mailbox_items_list
        if len(data) > 0:
           chosen_mailbox_items_list = data[0].split()
           # Iterating each email message and appending to emails_messages_data_list
           #
           for item in chosen_mailbox_items_list:
                typ, email_data = mail.fetch(item, '(RFC 822)') raw_email =
                email_data[0][1].decode("utf-8") # Decoding from binary string to
                string email_message = email.message_from_string(raw_email) #
                Turning the email data into an email object
                email_messages_data_list.append(email_message) # Appending the email
                message data to email_messages_data_list
        return email_messages_data_list
    

イベントの作成

この関数は、各メール メッセージ コンポーネントをそれぞれイベント フィールドに関連付けて、イベントを作成します。

  1. メインクラスから、関数 create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time) を使用してイベントを作成します。
  2.  def create_event(siemplify, alert_id, email_message_data,
    all_found_url_in_emails_body_list, datetime_in_unix_time):
          """
          Returns the digested data of a single unread email
          """
          siemplify.LOGGER.info(f"--- Started processing Event: alert_id: {alert_id}
          | event_id: {alert_id}")
    
  3. イベント フィールドを含む辞書を作成します。必須フィールドは event["StartTime"], event["EndTime"], event["event_name"] and event["device_product"] です。
  4.   event = {} event["StartTime"] = datetime_in_unix_time # Time should be saved
      in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or
      SiemplifyUtils.convert_string_to_datetime event["EndTime"] =
      datetime_in_unix_time # Time should be saved in UnixTime. You may use
      SiemplifyUtils.convert_datetime_to_unix_time, or
      SiemplifyUtils.convert_string_to_datetime event["event_name"] = "Suspicious
      email" event["device_product"] = PRODUCT # ie: "device_product" is the field
      name that describes the product the event originated from. event["Subject"] =
      email_message_data["Subject"] event["SourceUserName"] =
      email_message_data["From"] event["DestinationUserName"] =
      email_message_data["To"] event["found_url"] =
      ",".join(all_found_url_in_emails_body_list)
    
      siemplify.LOGGER.info(f"--- Finished processing Event: alert_id: {alert_id} |
      event_id: {alert_id}") return event
    
  5. 各アラートには 1 つ以上のイベントが含まれます。この例では、1 つのメール メッセージである 1 つのイベントのみを含むアラートを示します。
    したがって、イベントを作成した後に、すべてのイベント情報を含むアラートを作成します。

アラート情報の作成とアラート情報の特性フィールドの初期化

この関数はアラートを作成する役割を担い、各アラートにはアラート内に 1 つ以上のイベントが含まれます。ここでは各アラートに 1 つのイベントが含まれ、基本的には 1 つのメール メッセージです。

  1. main クラスから、関数 create_alert(siemplify, alert_id, email_message_data, datetime_in_unix_time, created_event) を使用してアラートを作成します。
  2.  def create_alert(siemplify, alert_id, email_message_data,
    datetime_in_unix_time, created_event):
        """
    Returns an alert which is one event that contains one unread email message
    """
    siemplify.LOGGER.info(f"-------------- Started processing Alert {alert_id}")
    create_event = None

    アラート情報インスタンスを作成し、アラート情報特性フィールドを初期化します。

    # Initializes the alert_info Characteristics Fields
    alert_info.display_id = f"{alert_id}"
    alert_info.ticket_id = f"{alert_id}"
    alert_info.name = email_message_data['Subject']
    alert_info.rule_generator = RULE_GENERATOR_EXAMPLE
    alert_info.start_time = datetime_in_unix_time
    alert_info.end_time = datetime_in_unix_time
    alert_info.device_vendor = VENDOR
    alert_info.device_product = PRODUCT
    # ----------------------------- Alert Fields initialization END -----------------------------#
    
  3. アラート情報を作成したら、作成されたイベントを検証し、イベント情報をアラート情報の特性に追加します。
  4.     siemplify.LOGGER.info(f"---------- Events creating started for alert  {alert_id}") try:
         if created_event is not None:
            alert_info.events.append(created_event)
            siemplify.LOGGER.info(f"Added Event {alert_id} to Alert {alert_id}")
         # Raise an exception if failed to process the event    except Exception as e:
          siemplify.LOGGER.error(f"Failed to process event {alert_id}")
          siemplify.LOGGER.exception(e)
    
          return alert_info
    

    メール本文の URL の検索

    この関数は、メールの本文に 1 つ以上の URL が含まれているかどうかをチェックします。
    メール メッセージごとに、テキストまたはプレーン タイプの情報を含むメール メッセージの部分を検索して、メール本文にアクセスする必要があります。

     def find_url_in_email_message_body(siemplify,
    email_messages_data_list):
        """
        Search for a url in the email body,
        """
        all_found_url_in_emails_body_list = []
        for message in email_messages_data_list:
            for part in message.walk():
                if part.get_content_maintype() == 'text\plain':
                    continue
    

    本文に目的のタイプ情報が含まれている場合は、email_message_body = part.get_payload() によってこの情報を読み込みます。

    すべての情報を読み込んだ後、正規表現の形式を使用して URL を検索できます。

    
    URLSREGEX=r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-@.&+]|[!*(),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
    

    これにより、メール本文から URL が抽出されます。

     email_message_body = part.get_payload()
    all_found_urls = re.findall(URLS_REGEX, str(email_message_body))
    for url in all_found_urls:
    if url not in all_found_url_in_emails_body_list: all_found_url_in_emails_body_list.append(url) siemplify.LOGGER.info(f"The URL found : {all_found_url_in_emails_body_list}")

    return all_found_url_in_emails_body_list

    コネクタコードの確認が完了しました。次に、Gmail で選択したメールの受信トレイからプラットフォームにケースを取り込むコネクタを構成します。