Source code for camelot.view.search
# ============================================================================
#
# Copyright (C) 2007-2013 Conceptive Engineering bvba. All rights reserved.
# www.conceptive.be / info@conceptive.be
#
# This file is part of the Camelot Library.
#
# This file may be used under the terms of the GNU General Public
# License version 2.0 as published by the Free Software Foundation
# and appearing in the file license.txt included in the packaging of
# this file. Please review this information to ensure GNU
# General Public Licensing requirements will be met.
#
# If you are unsure which license is appropriate for your use, please
# visit www.python-camelot.com or contact info@conceptive.be
#
# This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
# WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
#
# For use of this library in commercial applications, please contact
# info@conceptive.be
#
# ============================================================================
"""
Helper functions to search through a collection of entities
"""
import logging
LOGGER = logging.getLogger('camelot.view.search')
import sqlalchemy.types
from sqlalchemy import sql, orm, schema
import camelot.types
[docs]def create_entity_search_query_decorator( admin, text ):
"""create a query decorator to search through a collection of entities
:param admin: the admin interface of the entity
:param text: the text to search for
:return: a function that can be applied to a query to make the query filter
only the objects related to the requested text or None if no such decorator
could be build
"""
from camelot.view import utils
if len(text.strip()):
# arguments for the where clause
args = []
# join conditions : list of join entities
joins = []
def append_column( c, text, args ):
"""add column c to the where clause using a clause that
is relevant for that type of column"""
arg = None
if issubclass(c.type.__class__, camelot.types.Color):
pass
elif issubclass(c.type.__class__, camelot.types.File):
pass
elif issubclass(c.type.__class__, camelot.types.Code):
codes = [u'%%%s%%'%s for s in text.split(c.type.separator)]
codes = codes + ['%']*(len(c.type.parts) - len(codes))
arg = c.like( codes )
elif issubclass(c.type.__class__, camelot.types.VirtualAddress):
arg = c.like(('%', '%'+text+'%'))
elif issubclass(c.type.__class__, camelot.types.Image):
pass
elif issubclass(c.type.__class__, sqlalchemy.types.Integer):
try:
arg = (c==utils.int_from_string(text))
except ( Exception, utils.ParsingError ):
pass
elif issubclass(c.type.__class__, sqlalchemy.types.Date):
try:
arg = (c==utils.date_from_string(text))
except ( Exception, utils.ParsingError ):
pass
elif issubclass(c.type.__class__, sqlalchemy.types.Float):
try:
float_value = utils.float_from_string(text)
precision = c.type.precision
if isinstance(precision, (tuple)):
precision = precision[1]
delta = 0.1**( precision or 0 )
arg = sql.and_(c>=float_value-delta, c<=float_value+delta)
except ( Exception, utils.ParsingError ):
pass
elif issubclass(c.type.__class__, (sqlalchemy.types.String, )) or \
(hasattr(c.type, 'impl') and \
issubclass(c.type.impl.__class__, (sqlalchemy.types.String, ))):
LOGGER.debug('look in column : %s'%c.name)
arg = sql.operators.ilike_op(c, '%'+text+'%')
if arg is not None:
arg = sql.and_(c != None, arg)
args.append(arg)
for t in text.split(' '):
subexp = []
if admin.search_all_fields:
mapper = orm.class_mapper( admin.entity )
for property in mapper.iterate_properties:
if isinstance( property, orm.properties.ColumnProperty ):
for column in property.columns:
if isinstance( column, schema.Column ):
append_column( column, t, subexp )
for column_name in admin.list_search:
path = column_name.split('.')
target = admin.entity
for path_segment in path:
mapper = orm.class_mapper( target )
property = mapper.get_property( path_segment )
if isinstance(property, orm.properties.PropertyLoader):
joins.append(getattr(target, path_segment))
target = property.mapper.class_
else:
append_column(property.columns[0], t, subexp)
args.append(subexp)
def create_query_decorator(joins, args):
"""Bind the join and args to a query decorator function"""
def query_decorator(query):
"""The actual query decorator, call this function with a query
as its first argument and it will return a query with a where
clause for searching the resultset of the original query"""
for join in joins:
query = query.outerjoin(join)
subqueries = (sql.or_(*arg) for arg in args)
query = query.filter(sql.and_(*subqueries))
return query
return query_decorator
return create_query_decorator(joins, args)