Change data capture from Neo4j to Amazon Neptune using Amazon Managed Streaming for Apache Kafka

After you perform a point-in-time data migration from Neo4j to Amazon Neptune, you may want to capture and replicate ongoing updates in real time. For more information about automating point-in-time graph data migration from Neo4j to Neptune, see Migrating a Neo4j graph database to Amazon Neptune with a fully automated utility. This post walks you through the steps to automate the capture and replication from Neo4j to Neptune, using an example solution on the cdc-neo4j-msk-neptune GitHub repo.

Continuous replication of databases using the change data capture (CDC) pattern allows you to stream your data and make it available to other systems. This post focuses on modernizing your graph database by streaming data from Neo4j using CDC so that you have the latest changes copied into Neptune. By using the Event Interception strategy of the Strangler pattern to modernize Neo4j, you can incrementally push all your changes to Neptune and modify your applications to use Neptune. Neptune is a fast, reliable, fully managed graph database service that makes it easier to build and run applications that work with highly connected datasets. The core of Neptune is a purpose-built, high-performance graph database engine optimized for storing billions of relationships and querying the graph with millisecond latency.

Architecture overview

The solution in this post automates the deployment of the following architecture in your AWS account. This architecture shows the AWS resources the solution provisions to build a loosely coupled system for the replication.

The architecture contains the following elements:

  1. An AWS Cloud Development Kit (AWS CDK) app that an end-user triggers, which bootstraps all the required AWS resources inside an Amazon VPC
  2. An Amazon Elastic Compute Cloud (Amazon EC2) instance to run purpose-built services running in Docker containers for the replication
  3. A single-node Neptune DB cluster with one graph database instance that serves as the target of this replication
  4. An Amazon Managed Streaming for Apache Kafka (Amazon MSK) cluster with two nodes that serves as the publish-subscribe broker for this replication

Launching the example solution

With this solution, you should expect to see changes made to the nodes and relationships in the Neo4j graph database reflected in Neptune in real time.

To get started, clone the AWS CDK app from the GitHub repo. After making sure you meet the prerequisites, follow the instructions on GitHub to run the solution.

Deep dive into the solution

CDC is an architecture pattern that identifies changes in data in a source system and acts on those changes. In this solution, you determine data changes in a Neo4j graph and act on them by transforming them and updating a target Neptune graph in a simple three-step process:

  1. Provision AWS resources
  2. Process and replicate the changed data
  3. Test the end-to-end solution

Provisioning AWS resources

For a fully automated experience, it’s important to provision the required resources and configure their plumbing, such as applying the right AWS Identity and Access Management (IAM) roles and policies. This enables you to run and test it in your AWS account. This automation provides isolation by creating a separate VPC and launching resources in it. This makes it easy for you to set up and tear down without worrying about any dependencies on your existing environment. After following the steps to run the solution, you see an output similar to the following code:

As a result, you create the following resources in your AWS account:

AWS ResourceUsage
Amazon VPCThe VPC creates an isolated network that makes sure the solution is created and destroyed without affecting the rest of your AWS development account. Inside the VPC, the app creates one public and one private subnet in two Availability Zones.
Amazon EC2A single EC2 instance is used to run the purpose-built services in Docker containers.
Security Groups and IAM policiesThe EC2 instance needs to talk to Neptune and Amazon MSK for the replication to work. The setup app creates security groups, IAM roles, and policies to ensure that services can securely connect and talk to each other.
Amazon MSKNeo4j Streams for Kafka emits changes from a source database to Kafka in real time. Amazon MSK is the fully managed Kafka service that you use in this solution to integrate with Neo4j and Neptune.
NeptuneYou use this fully managed AWS graph database service as the modernization target.

Processing and replicating the changed data

The EC2 instance you provision runs the following services:

  • startup-service – This Docker container determines Neptune and Amazon MSK endpoints.
  • neo4j-service – This Docker container runs Neo4j version 4.0.0 and has apoc version 4.0.0.6 and neo4j-streams version 4.0.0 plugins installed. This service is configured to publish all changes to the following default values. Follow the instructions in the GitHub repo to find out how to change these default values.
    NodesRelationshipsAmazon MSK Topic Name
    Person{*}ACTED_IN{*}movie-topic
    Movie{*)
  • kafka-topic-service – This Docker container creates a new Amazon MSK topic. The neo4j-service publishes changed data to this topic, and the transformation-service subscribes to this topic to get the changed data. You can also configure Amazon MSK to create new topics using auto.create.topics.enable automatically by creating a custom configuration.
  • transformation-service – The Neptune property graph is very similar to Neo4j’s, including support for multiple labels on vertices, and multi-valued properties (sets but not lists). Neo4j allows homogeneous lists of simple types that contain duplicate values to be stored as properties on both nodes and edges. Neptune, on the other hand, provides for set and single cardinality for vertex properties, and single cardinality for edge properties. The transformation-service is designed to accept changed data from Neo4j before transforming it into Neptune’s graph data model.

Data flow architecture

The following diagram illustrates the data flow architecture and how these services work with each other.

The data flow contains the following steps:

  • The user-data shell script of the instance uses docker-compose to launch the four Docker containers. Using user data scripts is a common pattern to run startup scripts when an instance is launched. For this solution, you use it to launch and configure the services.
  • The first service to start is startup-service. You need this service to query AWS CloudFormation describe-stack for the MSK cluster endpoint address. You need this as a separate step because the cluster endpoint isn’t available until the cluster is created. After getting the endpoint address, the service queries it to retrieve Kafka Bootstrap and Zookeeper addresses and port. You use these addresses to configure the Neo4j Streams plugin so that it can send changes to Amazon MSK.
  • startup-service queries the CloudFormation stack for the Neptune endpoint. Although the Amazon CDK stack outputs the Neptune cluster endpoint, it’s a runtime output and isn’t available while the stack is running.
  • kafka-topic-service creates a new topic in Amazon MSK.
  • When the Neo4j graph database running in neo4j-service receives a Cypher script to run, it publishes changed data to the Amazon MSK topic. An interactive user or any other service writing to the Neo4j graph can perform the operation.
  • transformation-service subscribed to the Amazon MSK topic receives the data and processes it by transforming it from Neo4j’s data model to Neptune data.
  • transformation-service pushes transformed data to Neptune.

Testing the end-to-end solution

The following diagram illustrates the steps to perform an end-to-end testing of the solution.

You complete the following steps:

  1. SSH into your EC2 instance.
  2. Run the following shell script to enter the neo4j-service Docker container:
    docker container exec -it neo4j-service cypher-shell

  3. At the neo4j prompt, run the following Cypher scripts:
    CREATE (TheMatrix:Movie {title:'The Matrix', released:1999, tagline:'Welcome to the Real World'});
    CREATE (Keanu:Person {name:'Keanu Reeves', born:1964});
    CREATE (Keanu)-[:ACTED_IN {roles:['Neo']}]->(TheMatrix);

    This service saves all the debug information in a local file.

  4. As an optional step, to see the logs, run the following shell script:
    docker container logs transformation-service

  5. Run the following shell script to launch an Apache TinkerPop Gremlin console configured to send all queries to Neptune (this step verifies that the Neptune graph is in sync with changes in the source):
    docker run -it -e NEPTUNE_HOST --entrypoint /replace-host.sh sanjeets/neptune-gremlinc-345

  6. At the Gremlin prompt, run the following shell scripts in order:
    :remote console
    g.V().count()

Extending the solution

This solution has a loosely coupled architecture. If you want to replace the transformation-service with your own, you can easily do so by providing a new implementation in a Docker container. You have to change the Docker compose file 02-docker-compose.yml to replace the transformation-service.

Similarly, you can replace other services in the solution. For example, you could replace the Neo4j Docker container. Instead of using the Gremlin console in a Docker container, if you prefer, you can quickly and easily query your Neptune databases with Jupyter notebooks, which are fully managed, interactive development environments with live code and narrative text. Notebooks are hosted and billed through Amazon SageMaker.

Scaling the solution

The modular architecture of this solution allows you to scale the transformation-service independently to meet a high throughput change data capture requirement. Also, by monitoring Amazon Neptune, you should be able to scale it up or down as needed. The following patterns will help you run this solution at scale in real-world scenarios.

Scaling the transformation-service with Amazon MSK

For simplicity, this solution uses a single Kafka consumer and a single partition. If you want this solution to scale, you may want to create multiple partitions and multiple consumers in a consumer group, as shown in the following architecture. This takes care of a large volume of CDC from the source database by allowing you to launch multiple instances of the transformation-service container. Your new architecture looks similar to the following diagram.

How Neptune scales with load

Neptune DB clusters and instances scale at three different levels: storage, instance, and read. Depending upon the optimization, after closely monitoring your Neptune cluster, you can independently fine-tune the aforementioned scaling levels.

Monitoring Neptune

The following screenshot shows various metrics available by default as a dashboard view on the Neptune console.

To monitor the CDC performance (for example, to inspect the raw request and the payload containing the Gremlin or SPARQL query), you might want to make the following changes:

Cost of running the solution

The following tables outline an hourly estimate of running this solution with on-demand pricing in us-west-2. Changing the instance type defaults in the cdk.json file changes your cost. The storage, I/O, and data transfer rates are assumptions made to simplify calculation. All the prices are as of this writing and might change over time. For more information and to perform a thorough calculation, see the pricing page of each service.

ServiceInstance Type (A)EBS Storage (B)Data Transfer (C)Price per HourEstimated Hourly Cost (A+B+C)
Amazon EC2t3a.xlarge100 GBFree within same AZSee Amazon EC2 pricing$0.1504 + $0.01 + 0 = $0.1604
Service Instance type (A)Data stored (B)Data transfer (C)I/O (D)Price per hourEstimated hourly cost (A+B+C+D)
Neptunedb.r5.large100 GBFree within same AZ< 1 millionSee Amazon Neptune pricing$0.348 + $0.01 + $0 + $0.20 = $0.558
ServiceInstance type (A)Data stored (B)Data transfer (C)Price per hourEstimated hourly cost (A+B+C)
Amazon MSKkafka.m5.large100 GBFree within same AZSee Amazon MSK pricing$0.21 + $0.01 + $0 = $0.22

Storage calculations for Amazon Elastic Block Store (Amazon EBS), Neptune, and Amazon MSK are based on GB-months. The following calculation breaks it down to per hour pricing:

Calculation for 100 GB storage per GB-month for 1 hour 
= $0.10 * 100 * 1/ (24 * 30) ~ $0.01

The approximate total hourly cost, rounded to 2 decimal points, is $0.16+$0.56+$0.22 = $0.94.

Conclusion

This post provides a solution for existing users of Neo4j to easily automate streaming CDC to Neptune using Amazon MSK in a few simple steps. The post explains how to scale, extend, and monitor it. Finally, it also explains how to calculate the hourly cost of running such a solution. So, what are you waiting for? Run this solution in your own AWS account or read more about Amazon Neptune to see how it can help serve your graph database needs.

 


About the Author

 

Sanjeet Sahay is a Sr. Partner Solutions Architect with Amazon Web Services.