Collecter les journaux Elastic Packet Beats
Ce document explique comment ingérer des journaux Elastic Packet Beats dans Google Security Operations à l'aide de Bindplane. Le parseur initialise d'abord les valeurs par défaut pour différents champs trouvés dans les journaux Elastic Packet Beats. Il extrait ensuite les données des messages de journaux à l'aide d'une combinaison de modèles grok
et de filtres json
, effectue des conversions de types de données et mappe les champs extraits aux champs correspondants dans le modèle de données unifié (UDM) en fonction du type d'ensemble de données d'événement (par exemple, flux, dns, http, tls, dhcpv4).
Avant de commencer
Assurez-vous de remplir les conditions suivantes :
- Une instance Google SecOps.
- Un hôte Windows 2016 ou version ultérieure, ou Linux avec
systemd
. - Si vous exécutez l'agent derrière un proxy, assurez-vous que les ports de pare-feu sont ouverts conformément aux exigences de l'agent Bindplane.
- Accès privilégié à la console de gestion ou à l'appliance Elastic PacketBeats.
- Logstash est installé et configuré.
Obtenir le fichier d'authentification d'ingestion Google SecOps
- Connectez-vous à la console Google SecOps.
- Accédez à Paramètres du SIEM > Agents de collecte.
- Téléchargez le fichier d'authentification d'ingestion.
- Enregistrez le fichier de manière sécurisée sur le système sur lequel Bindplane sera installé.
Obtenir l'ID client Google SecOps
- Connectez-vous à la console Google SecOps.
- Accédez à Paramètres SIEM> Profil.
- Copiez et enregistrez le numéro client de la section Informations sur l'organisation.
Installer l'agent Bindplane
Installez l'agent Bindplane sur votre système d'exploitation Windows ou Linux en suivant les instructions ci-dessous.
Installation de fenêtres
- Ouvrez l'invite de commandes ou PowerShell en tant qu'administrateur.
Exécutez la commande suivante :
msiexec /i "https://github.com/observIQ/bindplane-agent/releases/latest/download/observiq-otel-collector.msi" /quiet
Installation de Linux
- Ouvrez un terminal avec les droits root ou sudo.
Exécutez la commande suivante :
sudo sh -c "$(curl -fsSlL https://github.com/observiq/bindplane-agent/releases/latest/download/install_unix.sh)" install_unix.sh
Ressources d'installation supplémentaires
Pour plus d'options d'installation, consultez le guide d'installation.
Configurer l'agent Bindplane pour ingérer Syslog et l'envoyer à Google SecOps
Accédez au fichier de configuration :
- Trouvez le fichier
config.yaml
. En règle générale, il se trouve dans le répertoire/etc/bindplane-agent/
sous Linux ou dans le répertoire d'installation sous Windows. - Ouvrez le fichier à l'aide d'un éditeur de texte (par exemple,
nano
,vi
ou le Bloc-notes).
- Trouvez le fichier
Modifiez le fichier
config.yaml
comme suit :receivers: udplog: # Replace the port and IP address as required listen_address: "0.0.0.0:514" exporters: chronicle/chronicle_w_labels: compression: gzip # Adjust the path to the credentials file you downloaded in Step 1 creds_file_path: '/path/to/ingestion-authentication-file.json' # Replace with your actual customer ID from Step 2 customer_id: YOUR_CUSTOMER_ID endpoint: malachiteingestion-pa.googleapis.com # Add optional ingestion labels for better organization log_type: 'ELASTIC_PACKETBEATS' raw_log_field: body ingestion_labels: service: pipelines: logs/source0__chronicle_w_labels-0: receivers: - udplog exporters: - chronicle/chronicle_w_labels
- Remplacez le port et l'adresse IP selon les besoins de votre infrastructure.
- Remplacez
YOUR_CUSTOMER_ID
par le numéro client réel. - Remplacez
/path/to/ingestion-authentication-file.json
par le chemin d'accès où le fichier d'authentification a été enregistré dans la section Obtenir le fichier d'authentification pour l'ingestion Google SecOps.
Redémarrez l'agent Bindplane pour appliquer les modifications.
Pour redémarrer l'agent Bindplane sous Linux, exécutez la commande suivante :
sudo systemctl restart observiq-otel-collector
Pour redémarrer l'agent Bindplane sous Windows, vous pouvez utiliser la console Services ou saisir la commande suivante :
net stop observiq-otel-collector && net start observiq-otel-collector
Configurer le transfert Syslog sur Elastic Packet Beats
Étant donné que Packetbeat n'accepte pas la sortie syslog directe, vous devez utiliser Logstash comme intermédiaire.
Configurer Packetbeat pour envoyer des journaux à Logstash
- Connectez-vous à la console de gestion Elastic Packet Beats.
- Accédez à Paramètres > Transfert de journaux.
- Cliquez sur le bouton + Ajouter ou Activer.
- Fournissez les informations de configuration suivantes :
- Nom : saisissez un nom descriptif (par exemple,
Logstash Output
). - Hôte : saisissez l'adresse IP du serveur Logstash.
- Port : saisissez le port d'entrée Beats de Logstash (généralement 5044).
- Protocole : sélectionnez Protocole Beats.
- Format : sélectionnez JSON.
- Fuseau horaire : sélectionnez le fuseau horaire UTC pour une cohérence universelle entre les systèmes.
- Accédez à la section Événements, puis sélectionnez les types de journaux concernés ou tous.
- Nom : saisissez un nom descriptif (par exemple,
Enregistrez la configuration.
Autre option : modifiez directement packetbeat.yml :
# /etc/packetbeat/packetbeat.yml packetbeat.protocols: - type: dns ports: [53] - type: http ports: [80, 8080, 8000, 5000, 8002] send_headers: true send_all_headers: true - type: tls ports: [443, 993, 995, 5223, 8443, 8883, 9243] - type: dhcpv4 ports: [67, 68] # Enable processors for additional fields processors: - add_network_direction: source: private destination: private internal_networks: - private - community_id: # Send to Logstash using beats protocol output.logstash: hosts: ["LOGSTASH_IP:5044"]
Remplacez
LOGSTASH_IP
par l'adresse IP de votre serveur Logstash.
Configurer Logstash pour transférer les données vers BindPlane à l'aide de Syslog
Créez un fichier de configuration du pipeline Logstash :
sudo nano /etc/logstash/conf.d/packetbeat-to-bindplane.conf
Ajoutez la configuration suivante :
# Receive from Packetbeat input { beats { port => 5044 } } # Optional: Add filters for data enrichment filter { # Preserve original message structure mutate { copy => { "@metadata" => "[@metadata_backup]" } } } # Send to BindPlane via syslog output { syslog { host => "BINDPLANE_IP" port => 514 protocol => "udp" rfc => "rfc5424" facility => "local0" severity => "informational" sourcehost => "%{[agent][hostname]}" appname => "packetbeat" procid => "%{[agent][id]}" msgid => "ELASTIC_PACKETBEATS" structured_data => "packetbeat@32473" message => "%{message}" } }
Remplacez
BINDPLANE_IP
par l'adresse IP de votre agent BindPlane.
Redémarrez Logstash pour appliquer la configuration :
sudo systemctl restart logstash
Table de mappage UDM
Champ du journal | Mappage UDM | Logique |
---|---|---|
@timestamp | metadata.event_timestamp | Mappé directement à partir du champ de journal brut @timestamp . |
agent.hostname | observer.hostname | Mappé directement à partir du champ de journal brut agent.hostname . |
agent.id | observer.asset_id | Concaténé avec agent.type pour former le champ observer.asset_id . |
agent.type | observer.application | Mappé directement à partir du champ de journal brut agent.type . |
agent.version | observer.platform_version | Mappé directement à partir du champ de journal brut agent.version . |
audit_category | security_result.category_details | Mappé directement à partir du champ de journal brut audit_category . |
audit_cluster_name | additional.fields.audit_cluster_name.value.string_value | Mappé directement à partir du champ de journal brut audit_cluster_name . |
audit_node_host_address | observer.ip | Mappé directement à partir du champ de journal brut audit_node_host_address . |
audit_node_id | additional.fields.audit_node_id.value.string_value | Mappé directement à partir du champ de journal brut audit_node_id . |
audit_node_name | additional.fields.audit_node_name.value.string_value | Mappé directement à partir du champ de journal brut audit_node_name . |
audit_request_effective_user | observer.user.userid | Mappé directement à partir du champ de journal brut audit_request_effective_user . |
audit_request_initiating_user | additional.fields.audit_request_initiating_user.value.string_value | Mappé directement à partir du champ de journal brut audit_request_initiating_user . |
audit_request_remote_address | observer.ip | Mappé directement à partir du champ de journal brut audit_request_remote_address s'il est différent de audit_node_host_address . |
client.bytes | network.received_bytes (ENTRANT) / network.sent_bytes (SORTANT) | Mappé en fonction du champ network.direction . Si la valeur est INBOUND, elle est mappée sur network.received_bytes . Si la valeur est OUTBOUND, elle est mappée sur network.sent_bytes . |
client.ip | target.ip (ENTRANT) / principal.ip (SORTANT) | Mappé en fonction du champ network.direction . Si la valeur est INBOUND, elle est mappée sur target.ip . Si la valeur est OUTBOUND, elle est mappée sur principal.ip . |
client.port | target.port (ENTRANT) / principal.port (SORTANT) | Mappé en fonction du champ network.direction . Si la valeur est INBOUND, elle est mappée sur target.port . Si la valeur est OUTBOUND, elle est mappée sur principal.port . |
cluster.uuid | additional.fields.uuid.value.string_value | Mappé directement à partir du champ de journal brut cluster.uuid . |
composant | additional.fields.component.value.string_value | Mappé directement à partir du champ de journal brut component . |
destination.bytes | network.sent_bytes | Mappé directement à partir du champ de journal brut destination.bytes pour les événements FLOW. |
destination.ip | target.ip | Directement mappé à partir du champ de journal brut destination.ip si network.direction n'est pas INBOUND ou OUTBOUND. |
destination.mac | target.mac | Mappé directement à partir du champ de journal brut destination.mac pour les événements FLOW. |
destination.port | target.port | Mappé directement à partir du champ de journal brut destination.port pour les événements FLOW. |
dhcpv4.assigned_ip | network.dhcp.requested_address | Mappé directement à partir du champ de journal brut dhcpv4.assigned_ip . |
dhcpv4.client_ip | network.dhcp.yiaddr (ACK) / network.dhcp.ciaddr (REQUEST) / source.ip (REQUEST, si dhcpv4.client_ip est vide) | Mappé en fonction du champ network.dhcp.type . Si la réponse est ACK, elle est mappée sur network.dhcp.yiaddr . Si la valeur est REQUEST, elle est mappée sur network.dhcp.ciaddr . Si la valeur est REQUEST et que dhcpv4.client_ip est vide, elle est mappée sur source.ip . |
dhcpv4.client_mac | network.dhcp.client_identifier | Mappé directement à partir du champ de journal brut dhcpv4.client_mac après sa conversion en octets. |
dhcpv4.op_code | network.dhcp.opcode | Mappé sur network.dhcp.opcode en fonction de la valeur de dhcpv4.op_code . Si dhcpv4.op_code est BOOTREPLY ou BOOTREQUEST , la valeur est directement mappée. Sinon, il est mappé sur UNKNOWN_OPCODE . |
dhcpv4.option.hostname | network.dhcp.client_hostname | Mappé directement à partir du champ de journal brut dhcpv4.option.hostname . |
dhcpv4.option.ip_address_lease_time_sec | network.dhcp.lease_time_seconds | Mappé directement à partir du champ de journal brut dhcpv4.option.ip_address_lease_time_sec après sa conversion en entier non signé. |
dhcpv4.option.message_type | network.dhcp.type | Mappé sur network.dhcp.type en fonction de la valeur de dhcpv4.option.message_type . Voici le mappage : ack → ACK , nack → NAK , discover → DISCOVER , offer → OFFER , request → REQUEST , decline → DECLINE , release → RELEASE , info → INFORM . Si la valeur n'en fait pas partie, elle est mappée sur UNKNOWN_MESSAGE_TYPE . |
dhcpv4.option.server_identifier | network.dhcp.sname | Mappé directement à partir du champ de journal brut dhcpv4.option.server_identifier . |
dns.answers.data | network.dns.answers.data | Mappé directement à partir du champ de journal brut dns.answers.data . |
dns.answers.class | network.dns.answers.class | Mappé sur network.dns.answers.class en fonction de la valeur de dns.answers.class . Le mappage est le suivant : IN -> 1, NONE -> 254, ANY -> 255. |
dns.answers.name | network.dns.answers.name | Mappé directement à partir du champ de journal brut dns.answers.name . |
dns.answers.ttl | network.dns.answers.ttl | Mappé directement à partir du champ de journal brut dns.answers.ttl après sa conversion en entier non signé. |
dns.answers.type | network.dns.answers.type | Mappé sur network.dns.answers.type en fonction de la valeur de dns.answers.type . Le mappage est le suivant : A -> 1, NS -> 2, CNAME -> 5, SOA -> 6, PTR -> 12, MX -> 15, TXT -> 16, AAAA -> 28, SRV -> 33, NAPTR -> 35, DS -> 43, DNSKEY -> 48, IXFR -> 251, AXFR -> 252, TYPE99 -> 99, TKEY -> 249, ANY -> 255, ALL -> 255, URI -> 256, NULL -> 0. |
dns.flags.authoritative | network.dns.authoritative | Directement mappé à partir du champ de journal brut dns.flags.authoritative si la valeur est "true". |
dns.flags.recursion_available | network.dns.recursion_available | Directement mappé à partir du champ de journal brut dns.flags.recursion_available si la valeur est "true". |
dns.flags.recursion_desired | network.dns.recursion_desired | Directement mappé à partir du champ de journal brut dns.flags.recursion_desired si la valeur est "true". |
dns.flags.truncated_response | network.dns.truncated | Directement mappé à partir du champ de journal brut dns.flags.truncated_response si la valeur est "true". |
dns.id | network.dns.id | Mappé directement à partir du champ de journal brut dns.id après sa conversion en entier non signé. |
dns.question.class | network.dns.questions.class | Mappé sur network.dns.questions.class en fonction de la valeur de dns.question.class . Le mappage est le suivant : IN -> 1, NONE -> 254, ANY -> 255. |
dns.question.name | network.dns.questions.name | Mappé directement à partir du champ de journal brut dns.question.name . |
dns.question.type | network.dns.questions.type | Mappé sur network.dns.questions.type en fonction de la valeur de dns.question.type . Le mappage est le suivant : A -> 1, NS -> 2, CNAME -> 5, SOA -> 6, PTR -> 12, MX -> 15, TXT -> 16, AAAA -> 28, SRV -> 33, NAPTR -> 35, DS -> 43, DNSKEY -> 48, IXFR -> 251, AXFR -> 252, TYPE99 -> 99, TKEY -> 249, ANY -> 255, ALL -> 255, URI -> 256, NULL -> 0. |
dns.resolved_ip | network.dns.additional.data | Chaque élément du tableau dns.resolved_ip est traité et mappé au champ network.dns.additional.data . |
dns.response_code | network.dns.response_code | Mappé sur network.dns.response_code en fonction de la valeur de dns.response_code . La cartographie est la suivante : NOERROR -> 0, FORMERR -> 1, SERVFAIL -> 2, NXDOMAIN -> 3, NOTIMP -> 4, REFUSED -> 5, YXDOMAIN -> 6, YXRRSET -> 7, NXRRSET -> 8, NOTAUTH -> 9, NOTZONE -> 10. |
error.message | security_result.summary | Concaténé avec status pour former le champ security_result.summary pour les événements HTTP. |
event.dataset | metadata.product_event_type | Mappé directement à partir du champ de journal brut event.dataset . |
flow.final | Permet de déterminer si le flux est final. Dans le cas contraire, l'événement est supprimé. | |
flow.id | network.session_id | Mappé directement à partir du champ de journal brut flow.id pour les événements FLOW. |
headers.accept_encoding | security_result.about.labels.Accept-Encoding | Mappé directement à partir du champ de journal brut headers.accept_encoding . |
headers.content_length | additional.fields.content_length.value.string_value | Mappé directement à partir du champ de journal brut headers.content_length . |
headers.content_type | additional.fields.content_type.value.string_value | Mappé directement à partir du champ de journal brut headers.content_type . |
headers.http_accept | additional.fields.http_accept.value.string_value | Mappé directement à partir du champ de journal brut headers.http_accept . |
headers.http_host | principal.hostname, principal.asset.hostname | Mappé directement à partir du champ de journal brut headers.http_host . |
headers.http_user_agent | network.http.user_agent | Mappé directement à partir du champ de journal brut headers.http_user_agent . |
headers.request_method | network.http.method | Mappé directement à partir du champ de journal brut headers.request_method . |
headers.x_b3_parentspanid | additional.fields.x_b3_parentspanid.value.string_value | Mappé directement à partir du champ de journal brut headers.x_b3_parentspanid . |
headers.x_b3_sampled | additional.fields.x_b3_sampled.value.string_value | Mappé directement à partir du champ de journal brut headers.x_b3_sampled . |
headers.x_envoy_attempt_count | security_result.about.labels.x_envoy_attempt_count | Mappé directement à partir du champ de journal brut headers.x_envoy_attempt_count . |
headers.x_envoy_original_path | additional.fields.x_envoy_original_path.value.string_value | Mappé directement à partir du champ de journal brut headers.x_envoy_original_path . |
headers.x_forwarded_client_cert | additional.fields.client_cert.value.string_value | Mappé directement à partir du champ de journal brut headers.x_forwarded_client_cert . |
headers.x_forwarded_for | principal.ip, principal.asset.ip | Directement mappé à partir du champ de journal brut headers.x_forwarded_for après extraction de l'adresse IP à l'aide de grok. |
headers.x_forwarded_proto | additional.fields.x_forwarded_proto.value.string_value | Mappé directement à partir du champ de journal brut headers.x_forwarded_proto . |
headers.x_request_id | additional.fields.x_request_id.value.string_value | Mappé directement à partir du champ de journal brut headers.x_request_id . |
hôte | principal.ip, principal.asset.ip | Directement mappé à partir du champ de journal brut host après extraction de l'adresse IP à l'aide de grok. |
http.request.method | network.http.method | Mappé directement à partir du champ de journal brut http.request.method . |
level | security_result.severity | Mappé sur security_result.severity en fonction de la valeur de level . Voici le mappage : INFO → INFORMATIONAL , ERROR → ERROR , WARNING → LOW . |
logger | additional.fields.logger.value.string_value | Mappé directement à partir du champ de journal brut logger . |
méthode | Permet de déterminer si l'événement est un événement DNS. | |
Message | security_result.description | Directement mappé à partir du champ de journal brut msg après suppression des guillemets doubles. |
network.community_id | network.community_id | Mappé directement à partir du champ de journal brut network.community_id . |
network.direction | network.direction | Directement mappé à partir du champ de journal brut network.direction après conversion en majuscules. Si la valeur est INGRESS ou INBOUND , elle est mappée sur INBOUND . Si la valeur est EGRESS ou OUTBOUND , elle est mappée sur OUTBOUND . |
network.protocol | network.application_protocol | Mappé directement à partir du champ de journal brut network.protocol . |
network.transport | network.ip_protocol | Mappé directement à partir du champ de journal brut network.transport pour les événements TLS. |
server.bytes | network.sent_bytes (ENTRANT) / network.received_bytes (SORTANT) | Mappé en fonction du champ network.direction . Si la valeur est INBOUND, elle est mappée sur network.sent_bytes . Si la valeur est OUTBOUND, elle est mappée sur network.received_bytes . |
server.domain | principal.hostname, principal.asset.hostname (INBOUND) / target.hostname, target.asset.hostname (OUTBOUND) | Mappé en fonction du champ network.direction . Si la valeur est INBOUND, elle est mappée sur principal.hostname . Si la valeur est OUTBOUND, elle est mappée sur target.hostname . |
server.ip | principal.ip, principal.asset.ip (INBOUND) / target.ip, target.asset.ip (OUTBOUND) | Mappé en fonction du champ network.direction . Si la valeur est INBOUND, elle est mappée sur principal.ip . Si la valeur est OUTBOUND, elle est mappée sur target.ip . |
server.port | principal.port (ENTRANT) / target.port (SORTANT) | Mappé en fonction du champ network.direction . Si la valeur est INBOUND, elle est mappée sur principal.port . Si la valeur est OUTBOUND, elle est mappée sur target.port . |
source.bytes | network.received_bytes | Mappé directement à partir du champ de journal brut source.bytes pour les événements FLOW. |
source.ip | principal.ip, principal.asset.ip | Mappé directement à partir du champ de journal brut source.ip pour les événements FLOW. |
source.mac | principal.mac | Mappé directement à partir du champ de journal brut source.mac pour les événements FLOW. |
source.port | principal.port | Mappé directement à partir du champ de journal brut source.port pour les événements FLOW. |
état | metadata.description, security_result.summary | Mappé sur metadata.description si level est vide. Concaténé avec error.message pour former le champ security_result.summary pour les événements HTTP et TLS. |
tls.client.ja3 | network.tls.client.ja3 | Mappé directement à partir du champ de journal brut tls.client.ja3 . |
tls.client.server_name | network.tls.client.server_name | Mappé directement à partir du champ de journal brut tls.client.server_name . |
tls.client.supported_ciphers | network.tls.client.supported_ciphers | Chaque élément du tableau tls.client.supported_ciphers est traité et mappé au tableau network.tls.client.supported_ciphers . |
tls.cipher | network.tls.cipher | Mappé directement à partir du champ de journal brut tls.cipher . |
tls.detailed.server_certificate.not_after | network.tls.server.certificate.not_after | Directement mappé à partir du champ de journal brut tls.detailed.server_certificate.not_after après sa conversion en code temporel. |
tls.detailed.server_certificate.not_before | network.tls.server.certificate.not_before | Directement mappé à partir du champ de journal brut tls.detailed.server_certificate.not_before après sa conversion en code temporel. |
tls.detailed.server_certificate.serial_number | network.tls.server.certificate.serial | Mappé directement à partir du champ de journal brut tls.detailed.server_certificate.serial_number . |
tls.detailed.server_certificate.version | network.tls.server.certificate.version | Directement mappé à partir du champ de journal brut tls.detailed.server_certificate.version après sa conversion en chaîne. |
tls.established | network.tls.established | Mappé directement à partir du champ de journal brut tls.established . |
tls.next_protocol | network.tls.next_protocol | Mappé directement à partir du champ de journal brut tls.next_protocol . |
tls.resumed | network.tls.resumed | Mappé directement à partir du champ de journal brut tls.resumed . |
tls.server.hash.sha1 | network.tls.server.certificate.sha1 | Mappé directement à partir du champ de journal brut tls.server.hash.sha1 après conversion en minuscules. |
tls.server.issuer | network.tls.server.certificate.issuer | Mappé directement à partir du champ de journal brut tls.server.issuer . |
tls.server.subject | network.tls.server.certificate.subject | Mappé directement à partir du champ de journal brut tls.server.subject . |
tls.version | network.tls.version | Mappé directement à partir du champ de journal brut tls.version . |
tls.version_protocol | network.tls.version_protocol | Mappé directement à partir du champ de journal brut tls.version_protocol . |
type | Permet de déterminer si l'événement est un événement DNS. | |
url.full | principal.url (ENTRANT) / target.url (SORTANT) | Mappé en fonction du champ network.direction . Si la valeur est INBOUND, elle est mappée sur principal.url . Si la valeur est OUTBOUND, elle est mappée sur target.url . |
user_id | target.user.userid | Mappé directement à partir du champ de journal brut user_id . |
user_name | target.user.user_display_name | Mappé directement à partir du champ de journal brut user_name . |
metadata.event_type | Défini sur GENERIC_EVENT par défaut. Les types d'événements ont été remplacés par des types spécifiques en fonction de la source du journal et des données d'événement. |
|
metadata.vendor_name | Défini sur Elastic par défaut. |
|
metadata.product_name | Défini sur PacketBeat par défaut. |
|
security_result.action | Défini sur ALLOW par défaut. |
|
metadata.log_type | Défini sur ELASTIC_PACKETBEATS par défaut. |
Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.