Skip to content
This repository was archived by the owner on Sep 19, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions mongodisco/bsonfile_input.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Copyright 2012 10gen, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

'''
File: bsonfile_input.py
Description: Implementation of :class:`disco.worker.classic.func.InputStream`
An input stream for reading bson-formatted input files.

To use, call MongoJob.run(bson_input=True, input=urls, create_input_splits=False)
Note that option input_uri will pull from a mongodb uri and override a bson file input.
'''
from bson import decode_all

class BsonInputStream(object):
"""Input stream of bson-formatted files"""

def __init__(self, stream):
self.docs = decode_all(stream.read())

def __len__(self):
return len(self.docs)

def __iter__(self):
#most important method
return self.docs.__iter__()

def read(self, size=-1):
#raise a non-implemented error to see if this ever pops up
raise Exception("read is not implemented- investigate why this was called")

def input_stream(stream, size, url, params):
# This looks like a mistake, but it is intentional.
# Due to the way that Disco imports and uses this
# function, we must re-import the module here.
from mongodisco.bsonfile_input import BsonInputStream
return BsonInputStream(stream)
21 changes: 21 additions & 0 deletions mongodisco/bsonfile_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Copyright 2012 10gen, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from mongodisco.bsonfile_output import output_stream
from mongodisco.bsonfile_input import input_stream
from disco.worker.classic.func import task_output_stream, task_input_stream


bsonfile_output_stream = (task_output_stream,output_stream)
bsonfile_input_stream = (task_input_stream, input_stream)
58 changes: 58 additions & 0 deletions mongodisco/bsonfile_output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Copyright 2012 10gen, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

'''
File: bsonfile_output.py
Description: Implementation of :class:`disco.worker.classic.func.OutputStream`
An output stream for writing bson-formatted output files.

To use, call MongoJob.run(bson_output=True)
Note that option output_uri will push to a mongodb uri and override a bson file output.
'''

from bson import BSON

class BsonFileOutput(object):
'''
Output stream for bson files
'''

def __init__(self, stream, params):
self.stream = stream

config = {}
for key, value in params.__dict__.iteritems():
config[key] = value

self.key_name = config.get('job_output_key','_id')
self.value_name = config.get('job_output_value', 'value')

def add(self, key, val):
result_dict = {}
result_dict[self.key_name] = key
result_dict[self.value_name] = val
bytes = BSON.encode(result_dict)

self.stream.write(bytes)

def close(self):
pass


def output_stream(stream,partition,url,params):
# This looks like a mistake, but it is intentional.
# Due to the way that Disco imports and uses this
# function, we must re-import the module here.
from mongodisco.bsonfile_output import BsonFileOutput
return BsonFileOutput(stream, params)
25 changes: 20 additions & 5 deletions mongodisco/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from disco.job import Job
from disco.core import classic_iterator
from mongodisco.mongodb_io import mongodb_output_stream, mongodb_input_stream
from mongodisco.bsonfile_io import bsonfile_output_stream, bsonfile_input_stream
from mongodisco.splitter import calculate_splits
import logging

Expand All @@ -32,6 +33,8 @@ class MongoJob(Job):
# "job_output_value" : "value",
# # "input_uri" : "mongodb://localhost/test.in",
# # "output_uri" : "mongodb://localhost/test.out",
# "bson_input" : False, # Format input is bson files (i.e. mongodump not mongodb)
# "bson_output" : False, # Format output as bson files (i.e. mongodump not mongodb)
# "print_to_stdout": False,
# "job_wait": True,
# "split_size" : 8,
Expand All @@ -49,7 +52,7 @@ class MongoJob(Job):
# "query" : {}
# }

def run(self, map, reduce, **jobargs):
def run(self, map=None, reduce=None, **jobargs):
"""Run a map-reduce job with either ``input_uri`` or ``output_uri``
as a "mongodb://..." URI.

Expand All @@ -59,24 +62,36 @@ def run(self, map, reduce, **jobargs):
consider "input" and "output" (sans _uri)
"""

if not any(uri in jobargs for uri in ('input_uri', 'output_uri')):
if not any(uri in jobargs for uri in ('input_uri', 'output_uri', 'bson_input', 'bson_output')):
logging.info('You did not specify "input_uri" or "output_uri" '
'with MongoJob. This may be in error.')

if 'mongodb://' in jobargs.get('input_uri', ''):
jobargs['map_input_stream'] = mongodb_input_stream
elif jobargs.get('bson_input', False):
jobargs['map_input_stream'] = bsonfile_input_stream

if 'mongodb://' in jobargs.get('output_uri', ''):
jobargs['reduce_output_stream'] = mongodb_output_stream
elif jobargs.get('bson_output', False):
jobargs['reduce_output_stream'] = bsonfile_output_stream

jobargs['map'] = map
jobargs['reduce'] = reduce
jobargs.setdefault('input', calculate_splits(jobargs))
if map:
jobargs['map'] = map
if reduce:
jobargs['reduce'] = reduce

if 'input' not in jobargs:
jobargs.setdefault('input', calculate_splits(jobargs))

jobargs.setdefault('required_modules', []).extend([
'mongodisco.mongodb_io',
'mongodisco.mongodb_input',
'mongodisco.mongodb_output',
'mongodisco.mongo_util',
'mongodisco.bsonfile_io',
'mongodisco.bsonfile_input',
'mongodisco.bsonfile_output'
])

super(MongoJob, self).run(**jobargs)
Expand Down