Schema Validation In MongoDB

  • In MongoDB, schema validation is a way to ensure that documents inserted into a collection adhere to a specific structure or pattern. This is done by defining rules or constraints using a JSON Schema. Schema validation is not mandatory in MongoDB because of its flexible schema model, but it can be helpful when you want to enforce data integrity or avoid errors due to improperly formatted data.
  • Here is a detailed breakdown of schema validation in MongoDB:
What is Schema Validation?
  • Schema validation defines rules that must be followed when inserting, updating, or replacing documents in a collection. It ensures that the documents conform to a specific structure, type, or other constraints, which helps in maintaining data quality.
Schema Validation Concepts
  • JSON Schema: MongoDB’s schema validation is based on the JSON Schema standard. JSON Schema provides a way to describe the structure of JSON data.
  • Validation Level: Defines how strictly the rules will be enforced.
    • 'strict': Validation is applied for both inserts and updates.
    • 'moderate': Validation is applied only for new documents and updates that affect fields mentioned in the schema.
  • Validation Action: Defines the behavior when documents don't meet the validation rules.
    • 'error': Rejects documents that don’t follow the validation rules.
    • 'warn': Allows invalid documents but logs a warning.
  • Validation Rules: You define rules (constraints) that documents must adhere to. These could include:
    • Data types (e.g., string, number, object)
    • Required fields
    • Field value constraints (e.g., regex, minimum/maximum values)
    • Nested object validation
Creating Schema Validation
  • Schema validation is usually done during collection creation, but it can also be applied to an existing collection.
Example of a Simple Schema Validation
  • Let’s say we are creating a 'users' collection where each document must have the following structure:
    • 'name': a required string field.
    • 'age': an optional integer field that must be between 18 and 60.
    • 'email': a required string field that must match a valid email format.
    • 'address': an object containing:
      • 'city': a required string field.
      • 'zipcode': a required string field with a 5-digit format.
  • Here’s how we can define schema validation for this collection:

  db.createCollection("users", {
    validator: {
      $jsonSchema: {
        bsonType: "object",
        required: ["name", "email", "address"],
        properties: {
          name: {
            bsonType: "string",
            description: "must be a string and is required"
          },
          age: {
            bsonType: "int",
            minimum: 18,
            maximum: 60,
            description: "must be an integer in the range 18 to 60"
          },
          email: {
            bsonType: "string",
            pattern: "^.+@.+$",
            description: "must be a valid email and is required"
          },
          address: {
            bsonType: "object",
            required: ["city", "zipcode"],
            properties: {
              city: {
                bsonType: "string",
                description: "must be a string and is required"
              },
              zipcode: {
                bsonType: "string",
                pattern: "^[0-9]{5}$",
                description: "must be a 5-digit string and is required"
              }
            }
          }
        }
      }
    },
    validationLevel: "strict",
    validationAction: "error"
  });


Breakdown of the Example

  • 'bsonType: "object"': This indicates that each document in the 'users' collection must be a BSON object (i.e., a document).
  • 'required: ["name", "email", "address"]': This specifies that the fields 'name', 'email', and 'address' must be present in every document.
  • Field Properties:
    • 'name': Must be a string and is required.
    • 'age': Must be an integer, and its value must fall between 18 and 60, but it's optional.
    • 'email': Must be a string, and it must match a basic email pattern ('^.+@.+$').
    • 'address': Must be an object containing the fields 'city' and 'zipcode'. Each of these fields must be a string, and 'zipcode' must follow a 5-digit format.
Validation Levels
  • Strict: All insert and update operations will be checked against the validation rules. For example, inserting a document like:

  db.users.insertOne({
    name: "John Doe",
    age: 30,
    email: "john@example.com",
    address: { city: "New York", zipcode: "10001" }
  });

  • would succeed because it meets the validation rules.
  • But, inserting this:

  db.users.insertOne({
    name: "Jane Doe",
    age: 17,  // invalid age
    email: "janeexample.com", // invalid email format
    address: { city: "Los Angeles", zipcode: "9001" }  // invalid zipcode
  });

  • would throw an error because 'age' is less than 18, the email is not valid, and the zipcode is not 5 digits.
  • Moderate: Only new documents or updated fields that are mentioned in the schema will be validated. So, if you are updating just the 'name' of an existing document, it won’t validate other fields like 'age' or 'email'.
Validation Actions
  • Error: Invalid documents will be rejected, and an error message will be thrown. This is helpful when you want to enforce strict data integrity.
  • Warn: The operation will still succeed, but MongoDB will log a warning that the document does not conform to the validation rules. This is useful for transitioning to schema validation without immediately enforcing strict validation.
Updating an Existing Collection's Validation
  • If you already have a collection, you can add schema validation using the 'collMod' command:

  db.runCommand({
    collMod: "users",
    validator: {
      $jsonSchema: {
        bsonType: "object",
        required: ["name", "email", "address"],
        properties: {
          name: {
            bsonType: "string",
            description: "must be a string and is required"
          },
          email: {
            bsonType: "string",
            pattern: "^.+@.+$",
            description: "must be a valid email and is required"
          }
        }
      }
    },
    validationLevel: "strict",
    validationAction: "error"
  });


Common Use Cases of Schema Validation

  • Data Quality: Ensure that all documents conform to a set of rules, preventing incorrect data entry.
  • Type Enforcement: Enforce types like 'string', 'number', 'date', etc., so that data can be relied upon during processing.
  • Range or Format Constraints: Ensure fields like 'age' fall within a specific range or that fields like 'email' or 'zipcode' follow a specific format.
  • Required Fields: Make sure certain fields are always present in a document.
Best Practices
  • Define schema validation during collection creation to enforce clean data from the beginning.
  • Use moderate validation during data migration or when transitioning to a more structured schema.
  • Use the 'warn' action for testing new validation rules without disrupting operations.
  • Schema validation in MongoDB is a powerful feature that lets you enforce structure in a flexible, NoSQL database. It helps ensure data integrity while still maintaining the flexibility that MongoDB is known for.

Change Streams in MongoDB

  • Change Streams in MongoDB allow applications to listen for real-time changes in collections, databases, or even the entire cluster. This capability is especially useful for applications that require immediate responses to data changes, such as real-time notifications, auditing, data synchronization, and event-driven architectures.
  • Introduced in MongoDB 3.6, change streams allow developers to track operations like insertions, updates, deletions, and more. They provide an event-driven way to handle data changes without the need for periodic polling, making them both efficient and scalable.
What are Change Streams?
  • Change streams are a MongoDB feature that provides a way to track and react to changes happening within a collection or database. When a change occurs (like inserting a document or updating an existing one), MongoDB emits an event through the change stream that can be consumed by the application in real-time.
Change streams can be used on:
  • Collections: Listen to changes in a specific collection.
  • Databases: Listen to changes in all collections within a database.
  • Clusters: Listen to changes across the entire cluster.
  • MongoDB uses the oplog (operation log) to track these changes and deliver them to the change stream consumers.
Key Features of Change Streams
  • Real-Time Updates: React to data changes in real time as they happen in the database.
  • Scalability: Change streams are efficient because they utilize MongoDB's oplog, allowing for real-time notifications without performance penalties.
  • Resumability: If your application goes down, you can resume watching the change stream from the last event received using a resume token.
  • Filtering Changes: You can filter specific types of changes (e.g., only inserts, or only changes in specific fields) using MongoDB's aggregation framework.
  • Cluster, Database, and Collection Level: You can listen to changes across an entire cluster, a single database, or just a single collection, depending on the level of granularity required.
  • Event-Driven Architecture: Change streams can be used to trigger actions in response to specific changes in the database, enabling an event-driven system.
Types of Events in Change Streams
  • When a change happens, the change stream emits a change document. Each change document contains detailed information about the change event, including:
    • operationType: The type of operation that triggered the event, such as insert, update, delete, etc.
    • documentKey: The unique _id of the document that was modified.
    • fullDocument: The complete document as it exists after the change (for insert or update operations).
    • updateDescription: For update operations, this field provides the changes made, showing the fields that were modified.
  • The common operationType values are:
    • insert: A new document has been inserted into the collection.
    • update: An existing document has been updated.
    • replace: A document has been replaced.
    • delete: A document has been deleted.
    • invalidate: The change stream is no longer valid (e.g., if the collection is dropped).
How Change Streams Work
  • Change streams utilize the oplog (operation log) of MongoDB’s replication system. The oplog is a special capped collection that stores a log of every operation that modifies the data in the database (e.g., insert, update, delete).
When a change happens:
  1. MongoDB records the change in the oplog.
  2. If there is an active change stream watching for changes, MongoDB pushes the event to the change stream client.
  3. The client can then process the event and take appropriate action.
Change Streams Require a Replica Set
  • change streams in MongoDB are only supported on replica sets or sharded clusters. A standalone MongoDB server does not support change streams or the watch() method.
  • Change streams rely on the oplog (operation log) used in replication. The oplog is a special capped collection in MongoDB that records all changes made to the data in the database. The oplog is only maintained in replica sets, as it's primarily used to replicate changes to other members of the replica set.
  • Standalone MongoDB instances don't maintain an oplog because there are no other nodes to replicate data to, hence change streams can't be used on standalone servers.
How to Enable Change Streams: Set Up a Replica Set Locally
  • If you want to experiment with change streams on your local machine, you can convert your MongoDB instance into a replica set. This is a common way to test features like change streams during development.
Steps to Set Up a Replica Set Locally
  • Stop your running MongoDB instance (if applicable).
Start MongoDB with Replica Set Configuration:
  • You need to start MongoDB with the --replSet option to enable replica set functionality.
  • If you're using the MongoDB shell to start MongoDB, run this command:

    mongod --replSet rs0

  • This command starts MongoDB with the replica set name rs0.
Initialize the Replica Set:
  • Once MongoDB is running, open a new MongoDB shell and run the following command to initiate the replica set:

    rs.initiate()

  • This command initializes the replica set configuration for the MongoDB instance. You should see output like this:

    {
        "ok": 1,
        "operationTime": Timestamp(1627647186, 1),
        "$clusterTime": {
            "clusterTime": Timestamp(1627647186, 1),
            "signature": {
                "hash": BinData(0, ""),
                "keyId": NumberLong("0")
            }
        }
    }

  • Verify the Replica Set: To ensure your MongoDB instance is now part of a replica set, you can run the following command in the shell:

    rs.status()

  • This will display the status of your replica set, and if everything is correct, you should see that your instance is now a primary node in the replica set.
Now, Running Change Streams
  • Once your MongoDB instance is running as part of a replica set, you can open a change stream.
  • Conclusion: If you want to use change streams, you must run MongoDB as part of a replica set. For local development, it's quite easy to set up a single-node replica set using the --replSet option. After setting up the replica set, you’ll be able to use features like change streams without encountering errors.
Basic Example of a Change Stream
  • Let’s see how we can use a change stream in the MongoDB shell to listen to changes in a collection called orders.
  • Step 1: Setup the orders Collection

    use ecommerce

    db.orders.insertMany([
        { order_id: 101, customer_name: "John Doe", total: 1000 },
        { order_id: 102, customer_name: "Jane Smith", total: 500 }
    ])

  • This creates a collection named orders with two initial documents.
  • Step 2: Open a Change Stream to Watch for Changes
  • To listen for changes in the orders collection, you can use the watch() method:

    const changeStream = db.orders.watch();

  • This opens a change stream that listens for changes in the orders collection.
  • Step 3: Simulating Some Changes
  • In a different MongoDB shell tab or process, you can modify the orders collection:

    // Insert a new order
    db.orders.insertOne({
        order_id: 103, customer_name: "Alice Cooper", total: 750
    });

    // Update an existing order
    db.orders.updateOne({ order_id: 101 }, { $set: { total: 1200 } });

    // Delete an order
    db.orders.deleteOne({ order_id: 102 });

  • Step 4: Receiving and Handling Changes
  • The change stream will now capture the changes and output them in the first shell where you started the change stream.

    changeStream.forEach(change => printjson(change));

  • You might see the following output:

    {
        "operationType": "insert",
        "fullDocument": {
            "_id": ObjectId("64c23ef349123abf12abcd45"),
            "order_id": 103,
            "customer_name": "Alice Cooper",
            "total": 750
        },
        "ns": {
            "db": "ecommerce",
            "coll": "orders"
        },
        "documentKey": {
            "_id": ObjectId("64c23ef349123abf12abcd45")
        }
    }
   
    {
        "operationType": "update",
        "documentKey": {
            "_id": ObjectId("64c23ef349123abf12abcd34")
        },
        "updateDescription": {
            "updatedFields": {
                "total": 1200
            },
            "removedFields": []
        }
    }
   
    {
        "operationType": "delete",
        "documentKey": {
            "_id": ObjectId("64c23ef349123abf12abcd36")
        }
    }


Explanation of Output:

  • Insert Operation:
    • A new order for Alice Cooper was inserted with a total of 750.
    • operationType: "insert" indicates that a new document was added.
    • fullDocument contains the new document inserted into the orders collection.
  • Update Operation:
    • The order for John Doe (order ID: 101) was updated, and the total field was changed to 1200.
    • operationType: "update" indicates that an existing document was updated.
    • updateDescription.updatedFields shows which fields were modified (total in this case).
  • Delete Operation:
    • The order for Jane Smith (order ID: 102) was deleted.
    • operationType: "delete" indicates that a document was deleted.
    • documentKey provides the _id of the deleted document.
Change Streams with Filters and Aggregation
  • You can apply filters and use aggregation pipelines with change streams to monitor specific types of changes or perform more complex operations on the incoming data.
  • For example, if you only want to listen for insert operations, you can apply a filter like this:

    const pipeline = [
        { $match: { operationType: "insert" } }
    ];

    const changeStream = db.orders.watch(pipeline);

    changeStream.forEach(change => printjson(change));


In this example:

  • We use the $match stage to filter the stream and only receive documents where the operationType is insert.
  • Now, the change stream will only output changes related to insert operations, ignoring updates and deletions.
More Complex Aggregation Example
  • You can also use more advanced aggregation stages in your change stream. For instance, let’s say you want to monitor changes but only care about orders where the total exceeds 1000 after the update:

    const pipeline = [
        {
            $match: {
                $or: [
                    { "fullDocument.total": { $gt: 1000 } },
                    { "updateDescription.updatedFields.total": { $gt: 1000 } }
                ]
            }
        }
    ];

    const changeStream = db.orders.watch(pipeline);

    changeStream.forEach(change => printjson(change));


Here:

  • The change stream will listen for any inserts or updates where the total is greater than 1000.
  • For insert operations, the fullDocument.total is checked.
  • For update operations, the updateDescription.updatedFields.total is checked.
Resuming Change Streams
  • If your application loses its connection to MongoDB or needs to restart, you can resume the change stream from the last received event using a resume token. This ensures that you don’t miss any changes while the connection was down.
How to Use Resume Tokens
  • When MongoDB emits a change event, it includes a special field called resumeToken. You can store this token and use it later to resume the change stream from where it left off.

    let resumeToken = null;

    const changeStream = db.orders.watch();

    changeStream.forEach(change => {
        printjson(change);
        resumeToken = change._id;  // Store the resume token
    });

  • Later, if your application needs to restart:

    const changeStream = db.orders.watch([], { resumeAfter: resumeToken });

    changeStream.forEach(change => printjson(change));

  • This ensures that you don’t miss any events between the initial disconnection and the reconnection.
Use Cases for Change Streams
  • Real-Time Notifications: You can notify users when a document (e.g., an order or a product) changes, triggering emails or notifications.
  • Audit Trails: Use change streams to track modifications to critical documents, creating audit logs for compliance or security purposes.
  • Cache Synchronization: Keep an in-memory cache synchronized with changes happening in the MongoDB database.
  • Data Replication: Synchronize data across different databases or systems by listening for changes and replicating them in real time.
  • Analytics & Monitoring: Monitor data in real time for anomalies, trends, or other events requiring immediate attention.
Conclusion
  • Change streams are a powerful feature of MongoDB that allow you to watch and react to changes in your collections in real-time. They provide an efficient way to build event-driven systems, ensuring that you can keep data in sync across different systems, notify users of important changes, or even maintain audit trails with minimal performance impact.
  • By applying aggregation pipelines and resume tokens, you can customize change streams to meet specific business needs and handle temporary disconnections gracefully.

Operation log or oplog in MongoDB

  • The operation log or oplog is a crucial component in MongoDB's replication architecture. It is a special capped collection that resides on the primary node of a MongoDB replica set and keeps a chronological record of all write operations that modify the data in the database.
Purpose of Oplog
  • The oplog allows MongoDB to implement replication, where changes made to the primary node are propagated to all secondary nodes in the replica set. The secondary nodes read from the oplog and apply these changes to stay synchronized with the primary node. This ensures data consistency and high availability across the replica set.
Key Concepts of Oplog
  • Oplog as a Capped Collection
    • The oplog is a capped collection, meaning it has a fixed size and uses a circular buffer. When it reaches its allocated space limit, it overwrites the oldest entries.
    • This size can be configured based on the system's needs, and MongoDB automatically manages the capped nature of the oplog.
  • Chronological Record of Writes
    • The oplog maintains a time-ordered record of all write operations on the primary node. These include:
      • Insert operations
      • Update operations
      • Delete operations
    • Each entry in the oplog corresponds to one of these operations, recording enough information to replay the operation on the secondary nodes.
  • Replication in Replica Sets
    • In a replica set, the oplog ensures that secondary nodes replicate all changes made to the primary node.
    • Secondary nodes continuously query the oplog to retrieve any new changes made on the primary node and then apply these changes to their local dataset.
  • Oplog Entries: Each entry in the oplog represents a write operation and consists of:
    • ts (Timestamp): The time when the operation occurred.
    • h (Hash): A unique identifier for the operation.
    • op (Operation Type): The type of operation (i for insert, u for update, d for delete, etc.).
    • ns (Namespace): The name of the collection where the operation occurred, formatted as db.collection.
    • o (Object): The actual content of the operation (for example, the document inserted or the updates applied).
    • o2 (Object2): For some operations (like updates), this field contains additional data, like the filter used for the update.
Structure of an Oplog Entry
  • Here’s an example of a simple oplog entry:

    {
        "ts": Timestamp(1627821406, 1),
        "t": NumberLong(1),
        "h": NumberLong("105208253870888999"),
        "v": 2,
        "op": "i",
        "ns": "ecommerce.orders",
        "o": {
            "_id": ObjectId("64c7a45700123f3b2e9f34ad"),
            "order_id": 101,
            "customer_name": "John Doe",
            "total": 1200
        }
    }

  • Explanation of Fields:
    • ts (Timestamp): Timestamp(1627821406, 1) – This records the timestamp of the operation.
    • h (Hash): 105208253870888999 – A unique identifier for this operation.
    • op (Operation Type): i – This specifies the type of operation, in this case, insert.
    • ns (Namespace): ecommerce.orders – The collection where the operation occurred (ecommerce is the database, and orders is the collection).
    • o (Object): The document that was inserted into the orders collection.
Oplog Size and Retention
  • Configurable Size: The size of the oplog is configurable when the replica set is initialized. It determines how many operations can be logged before older entries start being overwritten.
  • Retention Period: The retention period of oplog entries depends on the amount of write traffic and the oplog size. The more frequent the changes, the faster the oplog fills up and overwrites older operations.
  • Monitoring the Oplog: Administrators can monitor the oplog size and usage to ensure it’s large enough to handle the replication lag. If secondary nodes are unable to keep up and older oplog entries are overwritten, the secondary may need to perform a full resynchronization.
  • To check the current size of the oplog in a MongoDB instance, you can use this command in the MongoDB shell:

    db.printReplicationInfo()

  • This outputs the oplog size and the time window it covers based on the current write load.
Types of Operations in the Oplog
  • Insert Operation (op: "i"): When a new document is inserted into a collection, an insert operation is recorded in the oplog.
  • The o field will contain the full document that was inserted.
Example:


    {
        "op": "i",
        "ns": "ecommerce.orders",
        "o": {
            "_id": ObjectId("64c7a45700123f3b2e9f34ad"),
            "order_id": 101,
            "customer_name": "John Doe",
            "total": 1200
        }
    }

  • Update Operation (op: "u"): When a document is updated, the oplog entry for the update operation contains:
    • The filter used to locate the document to be updated.
    • The fields that were updated (in the o field).
    • The document’s unique identifier in the o2 field.
  • Example:

    {
        "op": "u",
        "ns": "ecommerce.orders",
        "o": { "$set": { "total": 1300 } },
        "o2": { "_id": ObjectId("64c7a45700123f3b2e9f34ad") }
    }

  • Delete Operation (op: "d"): For delete operations, the oplog records the unique identifier of the document that was deleted.
  • Example:

    {
        "op": "d",
        "ns": "ecommerce.orders",
        "o": { "_id": ObjectId("64c7a45700123f3b2e9f34ad") }
    }

  • No-Op (op: "n"): A no-op operation indicates an operation that doesn't affect any documents, such as a heartbeat or an internal process.
  • Commands (op: "c"): Commands are operations like creating or dropping collections, indexes, or performing transactions.
  • Example (for a drop collection command):

    {
        "op": "c",
        "ns": "ecommerce.$cmd",
        "o": { "drop": "orders" }
    }

  • How the Oplog Supports Replication
  • Primary Node: The primary node of a replica set writes all changes (inserts, updates, deletes) to the oplog.
  • Secondary Nodes: Secondary nodes continuously pull changes from the primary's oplog by reading the entries and applying those operations to their own datasets.
  • The secondary node queries the primary node’s oplog with a timestamp to ensure it gets only the changes that occurred after the last applied operation.
  • Replication Lag: If a secondary falls behind the primary (due to network issues or resource constraints), there is a replication lag. The oplog's size needs to be large enough to allow the secondary to catch up without missing operations. If the oplog entries are overwritten before the secondary can replicate them, the secondary will need a full data resync.
How the Oplog Relates to Change Streams
  • Change Streams: Change streams in MongoDB are powered by the oplog. When a client opens a change stream, MongoDB watches the oplog for new entries that match the client’s subscription (e.g., a new document inserted or updated).
  • Resume Tokens: In the context of change streams, MongoDB emits a resume token with each event, which is tied to the oplog’s timestamp. If a change stream disconnects, the application can use this resume token to pick up where it left off, using the timestamp stored in the oplog.
Monitoring the Oplog
  • Checking Oplog Status: MongoDB provides utilities to monitor the oplog’s status and ensure that it has sufficient capacity to handle the replication load.
  • To check the oplog’s status in the shell:

    rs.printReplicationInfo()

  • Output Example:

    configured oplog size:   1024MB
    log length start to end: 6171 secs (1.71hrs)
    oplog first event time:  Wed Sep 22 2021 13:34:40 GMT+0000 (UTC)
    oplog last event time:   Wed Sep 22 2021 15:39:41 GMT+0000 (UTC)
    now:                     Wed Sep 22 2021 15:39:45 GMT+0000 (UTC)

  • This gives details like:
    • The configured size of the oplog.
    • The time range of operations stored in the oplog.
    • The timestamp of the oldest and newest oplog entries.
Conclusion
  • The oplog is a fundamental part of MongoDB's replication mechanism, ensuring that all data changes on the primary node are reliably propagated to secondary nodes. It enables features like replication, automatic failover, and change streams. Understanding how the oplog works is key for building highly available, scalable MongoDB architectures.

Primitive Types in TypeScript

In TypeScript, primitive types are the most basic data types, and they are the building blocks for handling data. They correspond to simple ...