Monday, February 20, 2012

scaling using mongodb : Map-Reduce on sharded collection (part 3)

The idea here is to create a sharded database and then run map-reduce on it to process the data. This is a very basic example that I am trying to emulate. I created a sharded collection "posts" with the following structure. The idea is to find the count of tags throughout the complete sharded collection. I am using two machines named "241" and "243" as shards for mongodb. The mongos service is running on a thrid machine "249".

Input collection structure :

mongos> db.posts.find()
{ "_id" : ObjectId("4f4221149a6777895a000000"), "id" : "0", "content" : "data for content 0", "tags" : [ "tag1", "tag2", "tag3", "tag4" ] }
{ "_id" : ObjectId("4f4221149a6777895a000001"), "id" : "1", "content" : "data for content 1", "tags" : [ "tag2", "tag4", "tag5", "tag7", "tag9" ] }

Output collection structure :

mongos> db.tags.find()
{ "_id" : "tag1", "value" : 14705 }
{ "_id" : "tag3", "value" : 14418 }

Lets see the step by step process for creation, population of test data and running of map-reduce.

Lets create the posts collection by putting in a few records. If you print the collection stats, you will see that it is not sharded.

mongos> db.printCollectionStats()
---
posts
{
        "sharded" : false,
        "primary" : "shard241",
        "ns" : "test.posts",
        "count" : 2,
        "size" : 256,
        "avgObjSize" : 128,
        "storageSize" : 8192,
        "numExtents" : 1,
        "nindexes" : 1,
        "lastExtentSize" : 8192,
        "paddingFactor" : 1,
        "flags" : 1,
        "totalIndexSize" : 8176,
        "indexSizes" : {
                "_id_" : 8176
        },
        "ok" : 1
}
---

To shard the collection, you will need to first create index on "id". And then shard the collection using "id" as the key.

mongos> db.posts.ensureIndex({id:1})
mongos> db.posts.getIndexes()
[
        {
                "v" : 1,
                "key" : {
                        "_id" : 1
                },
                "ns" : "test.posts",
                "name" : "_id_"
        },
        {
                "v" : 1,
                "key" : {
                        "id" : 1
                },
                "ns" : "test.posts",
                "name" : "id_1"
        }
]

mongos> use admin
switched to db admin
mongos> db.runCommand( { shardcollection : "test.posts" , key : { id : 1 } } )
{ "collectionsharded" : "test.posts", "ok" : 1 }





The collection "posts" is now sharded. Lets populate some test data into the collection. Here is the php script that I used to populate data into the collection.

$m = new Mongo( "mongodb://192.168.1.249:10003", array("persist" => "x") );
$db = $m->test;
$table = $db->posts;
$start = 0;
$end = 200000;
for($i=$start; $i<$end; $i++)
{
        $tags = getTag();
        $obj = array("id"=>"$i", "content"=>"data for content $i", "tags"=>$tags);
        $table->insert($obj);
        echo "$i:".implode(',',$tags);
}
$found = $table->count();
echo "Found : $found\n";

function getTag()
{
        $tagArray = array('tag1','tag2','tag3','tag4','tag5','tag6','tag7','tag8','tag9','tag10','tag11','tag12','tag13','tag14','tag15','tag16','tag17','tag18','tag19','tag20','tag21','tag22','tag23','tag24','tag25','tag26','tag27','tag28','tag29','tag30','tag31','tag32','tag33','tag34','tag35','tag36','tag37','tag38','tag39','tag40','tag41','tag43');

        $tags = array();
        $tagcount = rand(2,5);

        $count = sizeof($tagArray);
        for($x=0; $x<$tagcount; $x++)
        {
                $tid = rand(0,$count);

                $tags[] = $tagArray[$tid];
        }
        return $tags;
}
?>



I pushed in 200,000 records into the collection. Here is how the data was sharded between "241" and "243";

mongos> db.printCollectionStats()
---
posts
{
        "sharded" : true,
        "flags" : 1,
        "ns" : "test.posts",
        "count" : 200000,
        "numExtents" : 10,
        "size" : 24430872,
        "storageSize" : 32743424,
        "totalIndexSize" : 15534400,
        "indexSizes" : {
                "_id_" : 6508096,
                "id_1" : 9026304
        },
        "avgObjSize" : 122.15436,
        "nindexes" : 2,
        "nchunks" : 4,
        "shards" : {
                "shard241" : {
                        "ns" : "test.posts",
                        "count" : 109889,
                        "size" : 13423484,
                        "avgObjSize" : 122.15493598947415,
                        "storageSize" : 17978183,
                        "numExtents" : 8,
                        "nindexes" : 2,
                        "lastExtentSize" : 12083200,
                        "paddingFactor" : 1,
                        "flags" : 1,
                        "totalIndexSize" : 8531049,
                        "indexSizes" : {
                                "_id_" : 3573332,
                                "id_1" : 4957718
                        },
                        "ok" : 1
                },
                "shard243" : {
                        "ns" : "test.posts",
                        "count" : 90111,
                        "size" : 10913985,
                        "avgObjSize" : 121.11711711711712,
                        "storageSize" : 33251771,
                        "numExtents" : 8,
                        "nindexes" : 2,
                        "lastExtentSize" : 12083200,
                        "paddingFactor" : 1,
                        "flags" : 1,
                        "totalIndexSize" : 13274730,
                        "indexSizes" : {
                                "_id_" : 6617370,
                                "id_1" : 6657360
                        },
                        "ok" : 1
                }
        },
        "ok" : 1
}
---




Now we will create the map and reduce functions. The map function will check for the tags array for each record in the posts collection. For each element of the tag array, it will emit the tag and a count of 1. Next we create a reduce function which counts the occurrances of each tag and returns the final count. The map function calls the emit(key, value) any number of times to feed data to the reducer. The reduce function will receive an array of emitted values from the map function and reduce them to a single value. The structure of the object returned by the reduce function must be identical to the structure of the map function's emitted value.

mongos> map = function() {
... if(!this.tags) {
... return;
... }
... for ( index in this.tags) {
... emit(this.tags[index],1);
... }
... }
function () {
    if (!this.tags) {
        return;
    }
    for (index in this.tags) {
        emit(this.tags[index], 1);
    }
}
mongos> reduce = function(key, values) {
... var count = 0;
... for(index in values) {
... count += values[index];
... }
... return count;
... }
function (key, values) {
    var count = 0;
    for (index in values) {
        count += values[index];
    }
    return count;
}
To understand how it works, lets say that after some iterations, map emitts the following value { "tag1" , 1 }. Suppose at that point "tag1" has a count of 50. That is the document can be represented as:

{ "tag1", 50 }

It map again emits { "tag1", 1 }, reduce will be called as follows :

reduce( "tag1", [50,1] )

The result will be a simple combination of counts for tag1

{ "tag1", 51 }

To invoke map-reduce run the following commands. The command states that mapreduce is run on "posts" collection. Map function is "map" and reduce function is "reduce". Output is redirected to a collection named "tags".

mongos> result =  db.runCommand( {
... "mapreduce" : "posts",
... "map" : map, //name of map function
... "reduce" : reduce,  //name of reduce function
... "out" : "tags" } )
{
        "result" : "tags",
        "shardCounts" : {
                "192.168.1.241:10000" : {
                        "input" : 109889,
                        "emit" : 499098,
                        "reduce" : 6400,
                        "output" : 43
                },
                "192.168.1.243:10000" : {
                        "input" : 90111,
                        "emit" : 200395,
                        "reduce" : 3094,
                        "output" : 43
                }
        },
        "counts" : {
                "emit" : NumberLong(699493),
                "input" : NumberLong(200000),
                "output" : NumberLong(43),
                "reduce" : NumberLong(9494)
        },
        "ok" : 1,
        "timeMillis" : 9199,
        "timing" : {
                "shards" : 9171,
                "final" : 28
        }
}

See how the output documents. The output has only "no of tags" documents - in our case 43.

mongos> db.tags.find()
{ "_id" : "tag1", "value" : 14643 }
{ "_id" : "tag2", "value" : 14705 }
{ "_id" : "tag3", "value" : 14418 }
{ "_id" : "tag4", "value" : 14577 }
{ "_id" : "tag5", "value" : 14642 }
{ "_id" : "tag6", "value" : 14505 }
{ "_id" : "tag7", "value" : 14623 }
{ "_id" : "tag8", "value" : 14529 }
{ "_id" : "tag9", "value" : 14767 }
{ "_id" : "tag10", "value" : 14489 }
has more
 

mongos> db.tags.count()
43


References

http://cookbook.mongodb.org/patterns/count_tags/
http://www.mongodb.org/display/DOCS/MapReduce/

Thursday, February 16, 2012

Scaling using mongodb : HA and Automatic sharding (part 2)

We discussed creation of HA using replica sets in the earlier post part 1. In this post we will create a sharded cluster of replica sets consisting of multiple mongodb servers. If you would have noticed the parameters passed during starting mongodb on the earlier server 241 & 242, you could see that in addition to the db path, log path and replica set parameters, I have also added the parameter '--shardsvr'. This enables sharding on this instance of mongodb.

For creating a cluster of database servers using mongodb you need data nodes, config servers and router servers. Data nodes are used to store data. The config servers are used to store sharding information. It is used for the client to figure out where the data resides on the data nodes. The router servers are used by the client to communicate with the config servers and data nodes. The clients interface to the database through the router servers.

On each of the config servers, start mongodb with the parameter '--configsvr'.

230:/home/mongodb } ./bin/mongod --configsvr --logappend --logpath ./data/log --pidfilepath ./data/mongodb.pid --fork --dbpath ./data/config --directoryperdb --bind_ip 192.168.1.230 --port 10002
231:/home/mongodb } ./bin/mongod --configsvr --logappend --logpath ./data/log --pidfilepath ./data/mongodb.pid --fork --dbpath ./data/config --directoryperdb --bind_ip 192.168.1.231 --port 10002
And start routing service on the routing servers.

233:/home/mongodb } ./bin/mongos --port 10003 --configdb 192.168.1.230:10002,192.168.1.231:10002 --logappend --logpath ./data/route_log --fork --bind_ip 192.168.1.233 &
The same service can be started on routing server 234. To shard a database and add nodes to the shard, you need to connect to the routing server.

233:/home/mongodb } ./bin/mongo 192.168.1.233:10003
233:/home/mongodb } mongos> use admin
switched to db admin

To add the first replica set (set_a) to the cluster, run the following command.

233:/home/mongodb } mongos> db.runCommand( { addshard : "set_a/192.168.1.241:10000,192.168.1.242:10000,192.168.1.243:10000", name : "shard1" } )
{ "shardAdded" : "shard1", "ok" : 1 }

Similarly replica sets of the other 2 shards can also be added to the mongodb cluster. Eventually you can fire a listShards command to see the shards added to the cluster.

233:/home/mongodb } mongos > db.runCommand( {listshards:1} )
{
        "shards" : [
                {
                        "_id" : "shard1",
                        "host" : "192.168.1.241:10000,192.168.1.242:10000,192.168.1.243:10000"
                },
                {
                        "_id" : "shard2",
                        "host" : "192.168.1.244:10000,192.168.1.245:10000,192.168.1.246:10000"
                },
                {
                        "_id" : "shard3",
                        "host" : "192.168.1.247:10000,192.168.1.248:10000,192.168.1.249:10000"
                }           
        ],                                                                                                                                                        
        "ok" : 1                    
}              

To enable sharding on a database say "test" run the following commands.

233:/home/mongodb } mongos > db.runCommand( { enablesharding : "test" } )
{ "ok" : 1 }

To shard on a particular key. The test collection consists of the following fields id, name and email. Lets shard on id.

233:/home/mongodb } > db.runCommand( { shardcollection : "test.test", key : { id : 1 } } )
{ "collectionsharded" : "test.test", "ok" : 1 }

I used a php script to push more than 20,000 records to the sharded cluster. You can check the status of sharded cluster using the following commands.

233:/home/mongodb } mongos > db.printShardingStatus()
--- Sharding Status ---
  sharding version: { "_id" : 1, "version" : 3 }
  shards:
        {  "_id" : "shard1",  "host" : "192.168.1.241:10000,192.168.1.242:10000,192.168.1.243:10000" }
        {  "_id" : "shard2",  "host" : "192.168.1.244:10000,192.168.1.245:10000,192.168.1.246:10000" }
        {  "_id" : "shard3",  "host" : "192.168.1.247:10000,192.168.1.248:10000,192.168.1.249:10000" }
  databases:
        {  "_id" : "admin",  "partitioned" : false,  "primary" : "config" }
        {  "_id" : "test",  "partitioned" : true,  "primary" : "shard1" }
                test.test chunks:
                                shard1        2
                                shard2        1
                        { "id" : { $minKey : 1 } } -->> { "id" : "1" } on : shard1 { "t" : 2000, "i" : 1 }
                        { "id" : "1" } -->> { "id" : "999" } on : shard2 { "t" : 1000, "i" : 3 }
                        { "id" : "999" } -->> { "id" : { $maxKey : 1 } } on : shard2 { "t" : 2000, "i" : 0 }

233:/home/mongodb } mongos > db.printCollectionStats()
---
test
{
        "sharded" : true,
        "flags" : 1,
        "ns" : "test.test",
        "count" : 19998,
        "numExtents" : 6,
        "size" : 1719236,
        "storageSize" : 2801664,
        "totalIndexSize" : 1537088,
        "indexSizes" : {
                "_id_" : 662256,
                "id_1" : 874832
        },
        "avgObjSize" : 85.97039703970397,
        "nindexes" : 2,
        "nchunks" : 3,
        "shards" : {
                "shard1" : {
                        "ns" : "test.test",
                        "count" : 19987,
                        "size" : 1718312,
                        "avgObjSize" : 85.97148146295092,
                        "storageSize" : 2793472,
                        "numExtents" : 5,
                        "nindexes" : 2,
                        "lastExtentSize" : 2097152,
                        "paddingFactor" : 1,
                        "flags" : 1,
                        "totalIndexSize" : 1520736,
                        "indexSizes" : {
                                "_id_" : 654080,
                                "id_1" : 866656
                        },
                        "ok" : 1
                },
                "shard2" : {
                        "ns" : "test.test",
                        "count" : 11,
                        "size" : 924,
                        "avgObjSize" : 84,
                        "storageSize" : 8192,
                        "numExtents" : 1,
                        "nindexes" : 2,
                        "lastExtentSize" : 8192,
                        "paddingFactor" : 1,
                        "flags" : 1,
                        "totalIndexSize" : 16352,
                        "indexSizes" : {
                                "_id_" : 8176,
                                "id_1" : 8176
                        },
                        "ok" : 1
                }
        },
        "ok" : 1
}
---

To use multiple routing servers in a database connection string - for connecting to mongodb through a programming language for example php the following syntax can be used

mongodb://[username:password@]host1[:port1][,host2[:port2:],...]/db_name

This will connect to atleast one host from the list of hosts provided. If none of the hosts are available, an exception will be thrown.

Wednesday, February 15, 2012

Scaling using mongodb : HA and Automatic sharding (part 1)

Mongodb introduces the concept of automatic sharding and replica sets. Lets see how both can be used to create a highly available cluster of database which is horizontally scalable.

First lets take a look at the HA capability of mongodb. HA is handled in mongodb by something known as Replica Sets. Replica sets are basically two or more mongodb nodes which are copies/replicas of each other. A replica set automatically selects a primary master and provides automated failover and disaster recovery.

Lets create a replica set of 2 nodes first and proceed from there.

I have taken 2 machines 192.168.1.241 and 192.168.1.242 for creating replica sets. Lets download the tar of mongodb 2.0.2 and put it in 241 and 242. We will create separate directories for database and replica db. Here is how we go about it.

241:/home/mongodb } mkdir -p data/db
241:/home/mongodb } mkdir -p repl/db
242:/home/mongodb } mkdir -p data/db
242:/home/mongodb } mkdir -p repl/db

Create primary mongodb replica set server on 241

241:/home/mongodb } ./bin/mongod --shardsvr --replSet set_a --logappend --logpath ./data/log --pidfilepath ./data/mongodb.pid --fork --dbpath ./data/db --directoryperdb --bind_ip 192.168.1.241 --port 10000 &
Create secondary replica set on 242 running on port 10001

242:/home/mongodb } ./bin/mongod --shardsvr --replSet set_a --logappend --logpath ./data/log --pidfilepath ./data/mongodb.pid --fork --dbpath ./data/db --directoryperdb --bind_ip 192.168.1.241 --port 10001 &
Check whether the replica set is working fine.

241:/home/mongodb } ./bin/mongo 192.168.1.241:10000
241: } > rs.status()
{
        "startupStatus" : 3,
        "info" : "run rs.initiate(...) if not yet done for the set",
        "errmsg" : "can't get local.system.replset config from self or any seed (EMPTYCONFIG)",
        "ok" : 0
}
241: } > rs.initiate()
{
        "info2" : "no configuration explicitly specified -- making one",
        "me" : "192.168.1.243:10000",
        "info" : "Config now saved locally.  Should come online in about a minute.",
        "ok" : 1
}
241: } > rs.isMaster()
db.isMaster()
{
        "setName" : "set_a",
        "ismaster" : true,
        "secondary" : false,
        "hosts" : [
                "192.168.1.241:10000"
        ],
        "primary" : "192.168.1.241:10000",
        "me" : "192.168.1.241:10000",
        "maxBsonObjectSize" : 16777216,
        "ok" : 1
}

We can see that 241 is master, but 242 is not added to the replica set. Lets add 242 as a secondary server of the replica set.

241: } >  rs.add("192.168.1.242:10001")
{ "ok" : 1 }

241: } > rs.conf()
{
        "_id" : "set_a",
        "version" : 2,
        "members" : [
                {
                        "_id" : 0,
                        "host" : "192.168.1.241:10000"
                },
                {
                        "_id" : 1,
                        "host" : "192.168.1.242:10001"
                }
        ]
}
241: } > rs.status()
{
        "set" : "set_a",
        "date" : ISODate("2012-02-15T09:42:07Z"),
        "myState" : 1,
        "members" : [
                {
                        "_id" : 0,
                        "name" : "192.168.1.241:10000",
                        "health" : 1,
                        "state" : 1,
                        "stateStr" : "PRIMARY",
                        "optime" : {
                                "t" : 1329298881000,
                                "i" : 1
                        },
                        "optimeDate" : ISODate("2012-02-15T09:41:21Z"),
                        "self" : true
                },
                {
                        "_id" : 1,
                        "name" : "192.168.1.242:10001",
                        "health" : 1,
                        "state" : 3,
                        "stateStr" : "RECOVERING",
                        "uptime" : 46,
                        "optime" : {
                                "t" : 0,
                                "i" : 0
                        },
                        "optimeDate" : ISODate("1970-01-01T00:00:00Z"),
                        "lastHeartbeat" : ISODate("2012-02-15T09:42:07Z"),
                        "pingMs" : 5339686
                }
        ],
        "ok" : 1
}
242 is in recovery mode - replicating the data that is already there on 241. Once the data is completely replicated, the second node starts acting as a secondary node.

241: } > rs.status()
{
        "set" : "set_a",
        "date" : ISODate("2012-02-15T10:33:32Z"),
        "myState" : 1,
        "members" : [
                {
                        "_id" : 0,
                        "name" : "192.168.1.241:10000",
                        "health" : 1,
                        "state" : 1,
                        "stateStr" : "PRIMARY",
                        "optime" : {
                                "t" : 1329298881000,
                                "i" : 1
                        },
                        "optimeDate" : ISODate("2012-02-15T09:41:21Z"),
                        "self" : true
                },
                {
                        "_id" : 1,
                        "name" : "192.168.1.242:10001",
                        "health" : 1,
                        "state" : 2,
                        "stateStr" : "SECONDARY",
                        "uptime" : 3131,
                        "optime" : {
                                "t" : 1329298881000,
                                "i" : 1
                        },
                        "optimeDate" : ISODate("2012-02-15T09:41:21Z"),
                        "lastHeartbeat" : ISODate("2012-02-15T10:33:31Z"),
                        "pingMs" : 0
                }
        ],
        "ok" : 1
}

Now the replica set is working fine and the secondary node will act as primary if the primary goes out of service. Ideally there should be 3 or more nodes added to any replica set.