Sync Postgresql Data With Elasticsearch

Better Stack Team
Updated on October 26, 2024

Syncing PostgreSQL data with Elasticsearch involves setting up a system that regularly updates Elasticsearch with changes from a PostgreSQL database. This can be achieved through several methods, including using data synchronization tools, writing custom scripts, or employing dedicated ETL (Extract, Transform, Load) tools.

Here’s a general approach to syncing PostgreSQL data with Elasticsearch:

1. Using Logstash

Logstash provides built-in support for PostgreSQL as an input source and Elasticsearch as an output destination. This method is effective for real-time or near-real-time syncing.

Logstash Configuration

  1. Install Logstash Plugins: Ensure you have the necessary plugins installed. For PostgreSQL, you need the logstash-input-jdbc plugin.

     
    bin/logstash-plugin install logstash-input-jdbc
    
  2. Create a Logstash Configuration File: Define a configuration file (logstash.conf) to pull data from PostgreSQL and send it to Elasticsearch.

     
    input {
      jdbc {
        jdbc_driver_library => "/path/to/postgresql-42.2.5.jar"
        jdbc_driver_class => "org.postgresql.Driver"
        jdbc_connection_string => "jdbc:postgresql://localhost:5432/your_database"
        jdbc_user => "your_user"
        jdbc_password => "your_password"
        statement => "SELECT * FROM your_table"
        schedule => "* * * * *"  # Cron schedule for periodic sync
      }
    }
    
    output {
      elasticsearch {
        hosts => ["<http://localhost:9200>"]
        index => "your_index"
        document_id => "%{id}"  # Ensure each document in Elasticsearch has a unique ID
      }
      stdout { codec => rubydebug }
    }
    
    
 
- **`jdbc_driver_library`**: Path to the PostgreSQL JDBC driver.
- **`jdbc_connection_string`**: JDBC connection string to your PostgreSQL database.
- **`statement`**: SQL query to fetch data from PostgreSQL.
- **`schedule`**: Cron syntax for how often to run the query.
- **`document_id`**: Ensures updates to existing documents are handled properly.
  1. Run Logstash: Start Logstash with your configuration file.

     
    bin/logstash -f /path/to/logstash.conf
    

2. Using Custom Scripts

If you prefer more control or need custom logic, you can write scripts to sync PostgreSQL data with Elasticsearch. Here’s a basic example using Python.

Python Script Example

  1. Install Required Libraries:

     
    pip install psycopg2 elasticsearch
    
  2. Python Script:

     
    import psycopg2
    from elasticsearch import Elasticsearch, helpers
    
    # PostgreSQL connection
    conn = psycopg2.connect(
        dbname="your_database",
        user="your_user",
        password="your_password",
        host="localhost"
    )
    cursor = conn.cursor()
    
    # Elasticsearch connection
    es = Elasticsearch(["<http://localhost:9200>"])
    
    # Fetch data from PostgreSQL
    cursor.execute("SELECT * FROM your_table")
    rows = cursor.fetchall()
    
    # Prepare data for Elasticsearch
    actions = []
    for row in rows:
        doc = {
            "_index": "your_index",
            "_id": row[0],  # Use a unique identifier
            "_source": {
                "field1": row[1],
                "field2": row[2],
                # Add other fields as needed
            }
        }
        actions.append(doc)
    
    # Bulk insert into Elasticsearch
    helpers.bulk(es, actions)
    
    cursor.close()
    conn.close()
    
    
 
- **`psycopg2`**: PostgreSQL adapter for Python.
- **`elasticsearch`**: Official Elasticsearch client for Python.
- **`helpers.bulk`**: Efficiently inserts multiple documents into Elasticsearch.

3. Using ETL Tools

ETL tools like Apache NiFi, Talend, or StreamSets can be used for more complex data integration scenarios, including real-time and batch processing.

Example with Apache NiFi

  1. Set Up NiFi: Download and set up Apache NiFi from the official website.
  2. Create Data Flows:
    • Use the ExecuteSQL processor to query data from PostgreSQL.
    • Use the ConvertRecord processor to transform data into JSON.
    • Use the PutElasticsearchHttp processor to send data to Elasticsearch.
  3. Configure Processors:
    • ExecuteSQL: Configure JDBC driver and SQL query.
    • ConvertRecord: Convert SQL results to JSON format.
    • PutElasticsearchHttp: Set Elasticsearch connection details and index settings.

4. Using Change Data Capture (CDC)

For real-time synchronization, consider using CDC tools like Debezium or Attunity that capture changes in PostgreSQL and stream them to Elasticsearch.

Debezium Example

  1. Set Up Debezium: Use Debezium with Apache Kafka to capture changes from PostgreSQL and forward them to Elasticsearch.
  2. Configure PostgreSQL Connector: Configure the Debezium PostgreSQL connector to monitor changes.
  3. Configure Elasticsearch Sink Connector: Use a Kafka Connect Elasticsearch sink connector to write data to Elasticsearch.

Summary

  • Logstash: Use the logstash-input-jdbc plugin for periodic synchronization of PostgreSQL data to Elasticsearch.
  • Custom Scripts: Write custom scripts in languages like Python for more control.
  • ETL Tools: Use tools like Apache NiFi for complex or real-time data integration.
  • CDC Tools: Employ change data capture tools for continuous data syncing.

Choose the method that best fits your requirements for data volume, update frequency, and complexity.

Got an article suggestion? Let us know
Explore more
Licensed under CC-BY-NC-SA

This work is licensed under a Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License.

Make your mark

Join the writer's program

Are you a developer and love writing and sharing your knowledge with the world? Join our guest writing program and get paid for writing amazing technical guides. We'll get them to the right readers that will appreciate them.

Write for us
Writer of the month
Marin Bezhanov
Marin is a software engineer and architect with a broad range of experience working...
Build on top of Better Stack

Write a script, app or project on top of Better Stack and share it with the world. Make a public repository and share it with us at our email.

community@betterstack.com

or submit a pull request and help us build better products for everyone.

See the full list of amazing projects on github