Change Streams in MongoDB

  • Change Streams in MongoDB allow applications to listen for real-time changes in collections, databases, or even the entire cluster. This capability is especially useful for applications that require immediate responses to data changes, such as real-time notifications, auditing, data synchronization, and event-driven architectures.
  • Introduced in MongoDB 3.6, change streams allow developers to track operations like insertions, updates, deletions, and more. They provide an event-driven way to handle data changes without the need for periodic polling, making them both efficient and scalable.
What are Change Streams?
  • Change streams are a MongoDB feature that provides a way to track and react to changes happening within a collection or database. When a change occurs (like inserting a document or updating an existing one), MongoDB emits an event through the change stream that can be consumed by the application in real-time.
Change streams can be used on:
  • Collections: Listen to changes in a specific collection.
  • Databases: Listen to changes in all collections within a database.
  • Clusters: Listen to changes across the entire cluster.
  • MongoDB uses the oplog (operation log) to track these changes and deliver them to the change stream consumers.
Key Features of Change Streams
  • Real-Time Updates: React to data changes in real time as they happen in the database.
  • Scalability: Change streams are efficient because they utilize MongoDB's oplog, allowing for real-time notifications without performance penalties.
  • Resumability: If your application goes down, you can resume watching the change stream from the last event received using a resume token.
  • Filtering Changes: You can filter specific types of changes (e.g., only inserts, or only changes in specific fields) using MongoDB's aggregation framework.
  • Cluster, Database, and Collection Level: You can listen to changes across an entire cluster, a single database, or just a single collection, depending on the level of granularity required.
  • Event-Driven Architecture: Change streams can be used to trigger actions in response to specific changes in the database, enabling an event-driven system.
Types of Events in Change Streams
  • When a change happens, the change stream emits a change document. Each change document contains detailed information about the change event, including:
    • operationType: The type of operation that triggered the event, such as insert, update, delete, etc.
    • documentKey: The unique _id of the document that was modified.
    • fullDocument: The complete document as it exists after the change (for insert or update operations).
    • updateDescription: For update operations, this field provides the changes made, showing the fields that were modified.
  • The common operationType values are:
    • insert: A new document has been inserted into the collection.
    • update: An existing document has been updated.
    • replace: A document has been replaced.
    • delete: A document has been deleted.
    • invalidate: The change stream is no longer valid (e.g., if the collection is dropped).
How Change Streams Work
  • Change streams utilize the oplog (operation log) of MongoDB’s replication system. The oplog is a special capped collection that stores a log of every operation that modifies the data in the database (e.g., insert, update, delete).
When a change happens:
  1. MongoDB records the change in the oplog.
  2. If there is an active change stream watching for changes, MongoDB pushes the event to the change stream client.
  3. The client can then process the event and take appropriate action.
Change Streams Require a Replica Set
  • change streams in MongoDB are only supported on replica sets or sharded clusters. A standalone MongoDB server does not support change streams or the watch() method.
  • Change streams rely on the oplog (operation log) used in replication. The oplog is a special capped collection in MongoDB that records all changes made to the data in the database. The oplog is only maintained in replica sets, as it's primarily used to replicate changes to other members of the replica set.
  • Standalone MongoDB instances don't maintain an oplog because there are no other nodes to replicate data to, hence change streams can't be used on standalone servers.
How to Enable Change Streams: Set Up a Replica Set Locally
  • If you want to experiment with change streams on your local machine, you can convert your MongoDB instance into a replica set. This is a common way to test features like change streams during development.
Steps to Set Up a Replica Set Locally
  • Stop your running MongoDB instance (if applicable).
Start MongoDB with Replica Set Configuration:
  • You need to start MongoDB with the --replSet option to enable replica set functionality.
  • If you're using the MongoDB shell to start MongoDB, run this command:

    mongod --replSet rs0

  • This command starts MongoDB with the replica set name rs0.
Initialize the Replica Set:
  • Once MongoDB is running, open a new MongoDB shell and run the following command to initiate the replica set:

    rs.initiate()

  • This command initializes the replica set configuration for the MongoDB instance. You should see output like this:

    {
        "ok": 1,
        "operationTime": Timestamp(1627647186, 1),
        "$clusterTime": {
            "clusterTime": Timestamp(1627647186, 1),
            "signature": {
                "hash": BinData(0, ""),
                "keyId": NumberLong("0")
            }
        }
    }

  • Verify the Replica Set: To ensure your MongoDB instance is now part of a replica set, you can run the following command in the shell:

    rs.status()

  • This will display the status of your replica set, and if everything is correct, you should see that your instance is now a primary node in the replica set.
Now, Running Change Streams
  • Once your MongoDB instance is running as part of a replica set, you can open a change stream.
  • Conclusion: If you want to use change streams, you must run MongoDB as part of a replica set. For local development, it's quite easy to set up a single-node replica set using the --replSet option. After setting up the replica set, you’ll be able to use features like change streams without encountering errors.
Basic Example of a Change Stream
  • Let’s see how we can use a change stream in the MongoDB shell to listen to changes in a collection called orders.
  • Step 1: Setup the orders Collection

    use ecommerce

    db.orders.insertMany([
        { order_id: 101, customer_name: "John Doe", total: 1000 },
        { order_id: 102, customer_name: "Jane Smith", total: 500 }
    ])

  • This creates a collection named orders with two initial documents.
  • Step 2: Open a Change Stream to Watch for Changes
  • To listen for changes in the orders collection, you can use the watch() method:

    const changeStream = db.orders.watch();

  • This opens a change stream that listens for changes in the orders collection.
  • Step 3: Simulating Some Changes
  • In a different MongoDB shell tab or process, you can modify the orders collection:

    // Insert a new order
    db.orders.insertOne({
        order_id: 103, customer_name: "Alice Cooper", total: 750
    });

    // Update an existing order
    db.orders.updateOne({ order_id: 101 }, { $set: { total: 1200 } });

    // Delete an order
    db.orders.deleteOne({ order_id: 102 });

  • Step 4: Receiving and Handling Changes
  • The change stream will now capture the changes and output them in the first shell where you started the change stream.

    changeStream.forEach(change => printjson(change));

  • You might see the following output:

    {
        "operationType": "insert",
        "fullDocument": {
            "_id": ObjectId("64c23ef349123abf12abcd45"),
            "order_id": 103,
            "customer_name": "Alice Cooper",
            "total": 750
        },
        "ns": {
            "db": "ecommerce",
            "coll": "orders"
        },
        "documentKey": {
            "_id": ObjectId("64c23ef349123abf12abcd45")
        }
    }
   
    {
        "operationType": "update",
        "documentKey": {
            "_id": ObjectId("64c23ef349123abf12abcd34")
        },
        "updateDescription": {
            "updatedFields": {
                "total": 1200
            },
            "removedFields": []
        }
    }
   
    {
        "operationType": "delete",
        "documentKey": {
            "_id": ObjectId("64c23ef349123abf12abcd36")
        }
    }


Explanation of Output:

  • Insert Operation:
    • A new order for Alice Cooper was inserted with a total of 750.
    • operationType: "insert" indicates that a new document was added.
    • fullDocument contains the new document inserted into the orders collection.
  • Update Operation:
    • The order for John Doe (order ID: 101) was updated, and the total field was changed to 1200.
    • operationType: "update" indicates that an existing document was updated.
    • updateDescription.updatedFields shows which fields were modified (total in this case).
  • Delete Operation:
    • The order for Jane Smith (order ID: 102) was deleted.
    • operationType: "delete" indicates that a document was deleted.
    • documentKey provides the _id of the deleted document.
Change Streams with Filters and Aggregation
  • You can apply filters and use aggregation pipelines with change streams to monitor specific types of changes or perform more complex operations on the incoming data.
  • For example, if you only want to listen for insert operations, you can apply a filter like this:

    const pipeline = [
        { $match: { operationType: "insert" } }
    ];

    const changeStream = db.orders.watch(pipeline);

    changeStream.forEach(change => printjson(change));


In this example:

  • We use the $match stage to filter the stream and only receive documents where the operationType is insert.
  • Now, the change stream will only output changes related to insert operations, ignoring updates and deletions.
More Complex Aggregation Example
  • You can also use more advanced aggregation stages in your change stream. For instance, let’s say you want to monitor changes but only care about orders where the total exceeds 1000 after the update:

    const pipeline = [
        {
            $match: {
                $or: [
                    { "fullDocument.total": { $gt: 1000 } },
                    { "updateDescription.updatedFields.total": { $gt: 1000 } }
                ]
            }
        }
    ];

    const changeStream = db.orders.watch(pipeline);

    changeStream.forEach(change => printjson(change));


Here:

  • The change stream will listen for any inserts or updates where the total is greater than 1000.
  • For insert operations, the fullDocument.total is checked.
  • For update operations, the updateDescription.updatedFields.total is checked.
Resuming Change Streams
  • If your application loses its connection to MongoDB or needs to restart, you can resume the change stream from the last received event using a resume token. This ensures that you don’t miss any changes while the connection was down.
How to Use Resume Tokens
  • When MongoDB emits a change event, it includes a special field called resumeToken. You can store this token and use it later to resume the change stream from where it left off.

    let resumeToken = null;

    const changeStream = db.orders.watch();

    changeStream.forEach(change => {
        printjson(change);
        resumeToken = change._id;  // Store the resume token
    });

  • Later, if your application needs to restart:

    const changeStream = db.orders.watch([], { resumeAfter: resumeToken });

    changeStream.forEach(change => printjson(change));

  • This ensures that you don’t miss any events between the initial disconnection and the reconnection.
Use Cases for Change Streams
  • Real-Time Notifications: You can notify users when a document (e.g., an order or a product) changes, triggering emails or notifications.
  • Audit Trails: Use change streams to track modifications to critical documents, creating audit logs for compliance or security purposes.
  • Cache Synchronization: Keep an in-memory cache synchronized with changes happening in the MongoDB database.
  • Data Replication: Synchronize data across different databases or systems by listening for changes and replicating them in real time.
  • Analytics & Monitoring: Monitor data in real time for anomalies, trends, or other events requiring immediate attention.
Conclusion
  • Change streams are a powerful feature of MongoDB that allow you to watch and react to changes in your collections in real-time. They provide an efficient way to build event-driven systems, ensuring that you can keep data in sync across different systems, notify users of important changes, or even maintain audit trails with minimal performance impact.
  • By applying aggregation pipelines and resume tokens, you can customize change streams to meet specific business needs and handle temporary disconnections gracefully.

No comments:

Post a Comment

Primitive Types in TypeScript

In TypeScript, primitive types are the most basic data types, and they are the building blocks for handling data. They correspond to simple ...