cypher

Writes a batch of messages to any graph database that supports the Neo4j and Bolt URI schemes. For each incoming message, the connector can perform operations to store or delete data within the database using the Cypher query language.

Introduced in version 4.37.0.

  • Common

  • Advanced

# Common configuration fields, showing default values
output:
  label: ""
  cypher:
    uri: neo4j://demo.neo4jlabs.com # No default (required)
    cypher: 'MERGE (p:Person {name: $name})' # No default (required)
    database_name: ""
    args_mapping: root.name = this.displayName # No default (optional)
    basic_auth:
      enabled: false
      username: ""
      password: ""
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
    max_in_flight: 64
# All configuration fields, showing default values
output:
  label: ""
  cypher:
    uri: neo4j://demo.neo4jlabs.com # No default (required)
    cypher: 'MERGE (p:Person {name: $name})' # No default (required)
    database_name: ""
    args_mapping: root.name = this.displayName # No default (optional)
    basic_auth:
      enabled: false
      username: ""
      password: ""
      realm: ""
    tls:
      skip_cert_verify: false
      enable_renegotiation: false
      root_cas: ""
      root_cas_file: ""
      client_certs: []
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
      processors: [] # No default (optional)
    max_in_flight: 64

Examples

Write to Neo4j Aura

This is an example of how to write to Neo4j Aura

output:
  cypher:
    uri: neo4j+s://example.databases.neo4j.io
    cypher: |
      MERGE (product:Product {id: $id})
        ON CREATE SET product.name = $product,
                       product.title = $title,
                       product.description = $description,
    args_mapping: |
      root = {}
      root.id = this.product.id
      root.product = this.product.summary.name
      root.title = this.product.summary.displayName
      root.description = this.product.fullDescription
    basic_auth:
      enabled: true
      username: "${NEO4J_USER}"
      password: "${NEO4J_PASSWORD}"

Fields

args_mapping

Mappings from incoming messages to the data, which are passed into the cypher expression as parameters. All mappings must be objects. By default, this field processes the entire payload.

Type: string

# Examples:
args_mapping: root.name = this.displayName
args_mapping: root = {"orgId": this.org.id, "name": this.user.name}

basic_auth

Configure basic authentication for requests to your graphing database.

Type: object

basic_auth.enabled

Whether to use basic authentication in requests.

Type: bool

Default: false

basic_auth.password

The password to use for authentication. Used together with username for basic authentication or with encrypted private keys for secure access.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

Default: ""

basic_auth.realm

The realm or process for authentication challenges.

Type: string

Default: ""

basic_auth.username

The username of the account credentials to authenticate as. Used together with password for basic authentication.

Type: string

Default: ""

batching

Configure a batching policy.

Type: object

# Examples:
batching:
  byte_size: 5000
  count: 0
  period: 1s
batching:
  count: 10
  period: 1s
batching:
  check: this.contains("END BATCH")
  count: 0
  period: 1m

batching.byte_size

The number of bytes at which the batch is flushed. Set to 0 to disable size-based batching.

Type: int

Default: 0

batching.check

A Bloblang query that returns a boolean value indicating whether a message should end a batch.

Type: string

Default: ""

# Examples:
check: this.type == "end_of_transaction"

batching.count

The number of messages after which the batch is flushed. Set to 0 to disable count-based batching.

Type: int

Default: 0

batching.period

The period of time after which an incomplete batch is flushed regardless of its size. This field accepts Go duration format strings such as 100ms, 1s, or 5s.

Type: string

Default: ""

# Examples:
period: 1s
period: 1m
period: 500ms

batching.processors[]

For aggregating and archiving message batches, you can add a list of processors to apply to a batch as it is flushed. All resulting messages are flushed as a single batch even when you configure processors to split the batch into smaller batches.

Type: processor

# Examples:
processors:
  - archive:
      format: concatenate

  - archive:
      format: lines

  - archive:
      format: json_array

cypher

The cypher expression to execute against the graph database.

Type: string

# Examples:
cypher: MERGE (p:Person {name: $name})
cypher: |-
  MATCH (o:Organization {id: $orgId})
  MATCH (p:Person {name: $name})
  MERGE (p)-[:WORKS_FOR]->(o)

database_name

Set the target database against which expressions are evaluated.

Type: string

Default: ""

max_in_flight

The maximum number of message batches to have in flight at a given time. Increase this value to improve throughput.

Type: int

Default: 64

tls

Configure Transport Layer Security (TLS) settings to secure network connections. This includes options for standard TLS as well as mutual TLS (mTLS) authentication where both client and server authenticate each other using certificates. Key configuration options include enabled to enable TLS, client_certs for mTLS authentication, root_cas/root_cas_file for custom certificate authorities, and skip_cert_verify for development environments.

Type: object

tls.client_certs[]

A list of client certificates for mutual TLS (mTLS) authentication. Configure this field to enable mTLS, authenticating the client to the server with these certificates.

You must set tls.enabled: true for the client certificates to take effect.

Certificate pairing rules: For each certificate item, provide either:

  • Inline PEM data using both cert and key or

  • File paths using both cert_file and key_file.

Mixing inline and file-based values within the same item is not supported.

Type: object

Default: []

# Examples:
client_certs:
  - cert: foo
    key: bar

  - cert_file: ./example.pem
    key_file: ./example.key

tls.client_certs[].cert

A plain text certificate to use.

Type: string

Default: ""

tls.client_certs[].cert_file

The path of a certificate to use.

Type: string

Default: ""

tls.client_certs[].key

A plain text certificate key to use.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

Default: ""

tls.client_certs[].key_file

The path of a certificate key to use.

Type: string

Default: ""

tls.client_certs[].password

A plain text password for when the private key is password encrypted in PKCS#1 or PKCS#8 format. The obsolete pbeWithMD5AndDES-CBC algorithm is not supported for the PKCS#8 format.

Because the obsolete pbeWithMD5AndDES-CBC algorithm does not authenticate the ciphertext, it is vulnerable to padding oracle attacks that can let an attacker recover the plaintext.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

Default: ""

# Examples:
password: foo
password: ${KEY_PASSWORD}

tls.enable_renegotiation

Whether to allow the remote server to request renegotiation. Enable this option if you’re seeing the error message local error: tls: no renegotiation.

Requires version 3.45.0 or later.

Type: bool

Default: false

tls.root_cas

Specify a root certificate authority to use (optional). This is a string that represents a certificate chain from the parent-trusted root certificate, through possible intermediate signing certificates, to the host certificate. Use either this field for inline certificate data or root_cas_file for file-based certificate loading.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

Default: ""

# Examples:
root_cas: |-
  -----BEGIN CERTIFICATE-----
  ...
  -----END CERTIFICATE-----

tls.root_cas_file

Specify the path to a root certificate authority file (optional). This is a file, often with a .pem extension, which contains a certificate chain from the parent-trusted root certificate, through possible intermediate signing certificates, to the host certificate. Use either this field for file-based certificate loading or root_cas for inline certificate data.

Type: string

Default: ""

# Examples:
root_cas_file: ./root_cas.pem

tls.skip_cert_verify

Whether to skip server-side certificate verification. Set to true only for testing environments as this reduces security by disabling certificate validation. When using self-signed certificates or in development, this may be necessary, but should never be used in production. Consider using root_cas or root_cas_file to specify trusted certificates instead of disabling verification entirely.

Type: bool

Default: false

uri

The connection URI for your graphing database. For more information, see Neo4j’s documentation.

Type: string

# Examples:
uri: neo4j://demo.neo4jlabs.com
uri: neo4j+s://aura.databases.neo4j.io
uri: neo4j+ssc://self-signed.demo.neo4jlabs.com
uri: bolt://127.0.0.1:7687
uri: bolt+s://core.db.server:7687
uri: bolt+ssc://10.0.0.43