# ============================================================================
#
# Copyright (C) 2007-2013 Conceptive Engineering bvba. All rights reserved.
# www.conceptive.be / info@conceptive.be
#
# This file is part of the Camelot Library.
#
# This file may be used under the terms of the GNU General Public
# License version 2.0 as published by the Free Software Foundation
# and appearing in the file license.txt included in the packaging of
# this file. Please review this information to ensure GNU
# General Public Licensing requirements will be met.
#
# If you are unsure which license is appropriate for your use, please
# visit www.python-camelot.com or contact info@conceptive.be
#
# This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
# WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
#
# For use of this library in commercial applications, please contact
# info@conceptive.be
#
# ============================================================================
import os
import sys
import logging
import pkgutil
# from collections import defaultdict
from sqlalchemy import create_engine
from PyQt4 import QtCore
from PyQt4 import QtNetwork
from PyQt4.QtCore import Qt
from PyQt4.QtCore import QFileInfo
from PyQt4.QtGui import QBoxLayout, QDialog, QFont, QGridLayout, QHBoxLayout, \
QLabel, QLineEdit, QPushButton, QFileDialog, QComboBox, QWidget, QVBoxLayout
from camelot.view import art
from camelot.view.controls.progress_dialog import ProgressDialog
from camelot.view.controls.editors import ChoicesEditor, TextLineEditor, LanguageEditor
from camelot.view.controls.standalone_wizard_page import HSeparator, StandaloneWizardPage
from camelot.view.controls.combobox_input_dialog import ComboBoxInputDialog
from camelot.core.exception import UserException
from camelot.core.utils import ugettext as _
from camelot.view.model_thread.signal_slot_model_thread import SignalSlotModelThread
from camelot.view.model_thread import object_thread
from camelot.core.dbprofiles import fetch_profiles, use_chosen_profile, \
store_profiles, get_network_proxy, last_used_profile, connection_string_from_profile
logger = logging.getLogger('camelot.view.database_selection')
NEW_PROFILE_LABEL = _('new/edit profile')
def select_database(app_admin):
profiles_dict = fetch_profiles()
if not profiles_dict:
create_new_profile(app_admin, profiles_dict)
selected = select_profile(profiles_dict)
if selected in profiles_dict:
use_chosen_profile(selected)
elif selected == NEW_PROFILE_LABEL:
create_new_profile(app_admin, profiles_dict)
else:
sys.exit(0)
def select_profile(profiles_dict):
title = _('Profile Selection')
input_label = _('Select a stored profile:')
ok_label = _('OK')
cancel_label = _('Quit')
input_dialog = ComboBoxInputDialog(autoaccept=True)
input_dialog.set_window_title(title)
input_dialog.set_label_text(input_label)
input_dialog.set_ok_button_text(ok_label)
input_dialog.set_cancel_button_text(cancel_label)
input_dialog.set_items(sorted(profiles_dict.keys()) + [NEW_PROFILE_LABEL])
_last_used_profile = last_used_profile()
if _last_used_profile:
input_dialog.set_choice_by_text(_last_used_profile)
input_dialog.set_ok_button_default()
last_index = input_dialog.count()-1
custom_font = QFont()
custom_font.setItalic(True)
icon = art.Icon('tango/16x16/actions/document-new.png').getQIcon()
input_dialog.set_data(last_index, custom_font, Qt.FontRole)
input_dialog.set_data(last_index, icon, Qt.DecorationRole)
dialog_code = input_dialog.exec_()
if dialog_code == QDialog.Accepted:
return unicode(input_dialog.get_text())
return None
def new_profile_item_selected(input_dialog):
input_dialog.accept()
def create_new_profile(app_admin, profiles):
wizard = app_admin.database_profile_wizard(profiles)
dialog_code = wizard.exec_()
if dialog_code == QDialog.Rejected:
# no profiles? exit
if not profiles:
sys.exit(0)
# one more time
select_database(app_admin)
[docs]class ProfileWizard(StandaloneWizardPage):
"""Wizard for the creation of a new database
profile.
.. attribute:: languages
.. attribute:: dialects
A list of languages allowed in the profile selection, an empty list will
allow all languages
"""
languages = []
dialects = []
def __init__(self, profiles, parent=None):
super(ProfileWizard, self).__init__(parent)
self._connection_valid = False
self.network_reply = None
self.profiles = profiles
self.setWindowTitle(_('Profile Wizard'))
self.set_banner_logo_pixmap(art.Icon('tango/22x22/categories/preferences-system.png').getQPixmap())
self.set_banner_title(_('Create New/Edit Profile'))
self.set_banner_subtitle(_('Please enter the database settings'))
self.banner_widget().setStyleSheet('background-color: white;')
self.manager = QtNetwork.QNetworkAccessManager( self )
self.manager.finished.connect( self.update_network_status )
#self.manager.networkAccessibleChanged.connect( self.network_accessible_changed )
self.manager.proxyAuthenticationRequired.connect( self.proxy_authentication_required )
self.create_labels_and_widgets()
self.create_buttons()
self.set_tab_order()
self.set_widgets_values()
# note: connections come after labels and widgets are created
# and have default values
self.connect_widgets()
self.connect_buttons()
timer = QtCore.QTimer(self)
timer.timeout.connect( self.new_network_request )
timer.setInterval( 3000 )
timer.start()
self.new_network_request()
def create_labels_and_widgets(self):
assert object_thread( self )
self.profile_label = QLabel(_('Profile Name:'))
self.dialect_label = QLabel(_('Driver:'))
self.host_label = QLabel(_('Server Host:'))
self.port_label = QLabel(_('Port:'))
self.database_name_label = QLabel(_('Database Name:'))
self.username_label = QLabel(_('Username:'))
self.password_label = QLabel(_('Password:'))
self.media_location_label = QLabel(_('Media Location:'))
self.language_label = QLabel(_('Language:'))
self.proxy_host_label = QLabel(_('Proxy Host:'))
self.proxy_port_label = QLabel(_('Port:'))
self.proxy_username_label = QLabel(_('Proxy Username:'))
self.proxy_password_label = QLabel(_('Proxy Password:'))
self.network_status_label = QLabel()
self.not_accessible_media_path_label = QLabel(_('Media location path '\
'is not accessible.'))
self.not_accessible_media_path_label.setStyleSheet('color: red')
self.not_writable_media_path_label = QLabel(_('Media location path '\
'is not writable.'))
self.not_writable_media_path_label.setStyleSheet('color: red')
layout = QGridLayout()
layout.addWidget(self.profile_label, 0, 0, Qt.AlignRight)
layout.addWidget(self.dialect_label, 1, 0, Qt.AlignRight)
layout.addWidget(self.host_label, 2, 0, Qt.AlignRight)
layout.addWidget(self.port_label, 2, 3, Qt.AlignRight)
layout.addWidget(self.database_name_label, 3, 0, Qt.AlignRight)
layout.addWidget(self.username_label, 4, 0, Qt.AlignRight)
layout.addWidget(self.password_label, 5, 0, Qt.AlignRight)
layout.addWidget(self.media_location_label, 7, 0, Qt.AlignRight)
layout.addWidget(self.language_label, 8, 0, Qt.AlignRight)
layout.addWidget(self.proxy_host_label, 10, 0, Qt.AlignRight)
layout.addWidget(self.proxy_port_label, 10, 3, Qt.AlignRight)
layout.addWidget(self.proxy_username_label, 11, 0, Qt.AlignRight)
layout.addWidget(self.proxy_password_label, 12, 0, Qt.AlignRight)
self.profile_editor = QComboBox(self)
self.profile_editor.setEditable(True)
# 32767 is Qt max length for string
# should be more than enough for folders
# http://doc.qt.nokia.com/latest/qlineedit.html#maxLength-prop
self.dialect_editor = ChoicesEditor(parent=self)
self.host_editor = TextLineEditor(self, length=32767)
self.host_editor.set_value('')
self.port_editor = TextLineEditor(self)
self.port_editor.setFixedWidth(60)
self.port_editor.set_value('')
self.database_name_editor = TextLineEditor(self, length=32767)
self.database_name_editor.set_value('')
self.username_editor = TextLineEditor(self)
self.username_editor.set_value('')
self.password_editor = TextLineEditor(self)
self.password_editor.setEchoMode(QLineEdit.Password)
self.password_editor.set_value('')
self.media_location_editor = TextLineEditor(self, length=32767)
self.media_location_editor.set_value('')
self.language_editor = LanguageEditor(languages=self.languages,
parent=self)
#
# try to find a default language
#
system_language = QtCore.QLocale.system().name()
if self.languages:
if system_language in self.languages:
self.language_editor.set_value( system_language )
else:
self.language_editor.set_value( self.languages[0] )
else:
self.language_editor.set_value( system_language )
self.proxy_host_editor = TextLineEditor(self, length=32767)
self.proxy_host_editor.set_value('')
self.proxy_port_editor = TextLineEditor(self)
self.proxy_port_editor.setFixedWidth(60)
self.proxy_port_editor.set_value('')
self.proxy_username_editor = TextLineEditor(self)
self.proxy_username_editor.set_value('')
self.proxy_password_editor = TextLineEditor(self)
self.proxy_password_editor.set_value('')
self.proxy_password_editor.setEchoMode(QLineEdit.Password)
layout.addWidget(self.profile_editor, 0, 1, 1, 1)
layout.addWidget(self.dialect_editor, 1, 1, 1, 1)
layout.addWidget(self.host_editor, 2, 1, 1, 1)
layout.addWidget(self.port_editor, 2, 4, 1, 1)
layout.addWidget(self.database_name_editor, 3, 1, 1, 1)
layout.addWidget(self.username_editor, 4, 1, 1, 1)
layout.addWidget(self.password_editor, 5, 1, 1, 1)
layout.addWidget(HSeparator(), 6, 0, 1, 5)
layout.addWidget(self.media_location_editor, 7, 1, 1, 1)
layout.addWidget(self.language_editor, 8, 1, 1, 1)
layout.addWidget(HSeparator(), 9, 0, 1, 5)
layout.addWidget(self.proxy_host_editor, 10, 1, 1, 1)
layout.addWidget(self.proxy_port_editor, 10, 4, 1, 1)
layout.addWidget(self.proxy_username_editor, 11, 1, 1, 1)
layout.addWidget(self.proxy_password_editor, 12, 1, 1, 1)
layout.addWidget(self.network_status_label, 13, 1, 1, 4)
self.main_widget().setLayout(layout)
def set_widgets_values(self):
self.dialect_editor.clear()
self.profile_editor.clear()
if self.dialects:
dialects = self.dialects
else:
import sqlalchemy.dialects
dialects = [name for _importer, name, is_package in \
pkgutil.iter_modules(sqlalchemy.dialects.__path__) \
if is_package]
self.dialect_editor.set_choices([(dialect, dialect.capitalize()) \
for dialect in dialects])
self.profile_editor.insertItems(1, [''] + \
[item for item in fetch_profiles()])
self.profile_editor.setFocus()
self.update_wizard_values()
def connect_widgets(self):
self.profile_editor.editTextChanged.connect(self.update_wizard_values)
# self.dialect_editor.currentIndexChanged.connect(self.update_wizard_values)
def create_buttons(self):
self.more_button = QPushButton(_('More'))
self.more_button.setCheckable(True)
self.more_button.setAutoDefault(False)
self.cancel_button = QPushButton(_('Cancel'))
self.ok_button = QPushButton(_('OK'))
layout = QHBoxLayout()
layout.setDirection(QBoxLayout.RightToLeft)
layout.addWidget(self.cancel_button)
layout.addWidget(self.ok_button)
layout.addStretch()
layout.addWidget(self.more_button)
self.buttons_widget().setLayout(layout)
self.browse_button = QPushButton(_('Browse'))
self.main_widget().layout().addWidget(self.browse_button, 7, 2, 1, 3)
self.setup_extension()
def setup_extension(self):
self.extension = QWidget()
self.load_button = QPushButton(_('Load profiles'))
self.save_button = QPushButton(_('Save profiles'))
extension_buttons_layout = QHBoxLayout()
extension_buttons_layout.setContentsMargins(0, 0, 0, 0)
extension_buttons_layout.addWidget(self.load_button)
extension_buttons_layout.addWidget(self.save_button)
extension_buttons_layout.addStretch()
extension_layout = QVBoxLayout()
extension_layout.setContentsMargins(0, 0, 0, 0)
extension_layout.addWidget(HSeparator())
extension_layout.addLayout(extension_buttons_layout)
self.extension.setLayout(extension_layout)
self.main_widget().layout().addWidget(self.extension, 15, 0, 1, 5)
self.extension.hide()
def set_tab_order(self):
all_widgets = [self.profile_editor, self.dialect_editor,
self.host_editor, self.port_editor, self.database_name_editor,
self.username_editor, self.password_editor,
self.media_location_editor, self.browse_button,
self.language_editor,
self.proxy_host_editor, self.proxy_port_editor,
self.proxy_username_editor, self.proxy_password_editor,
self.ok_button, self.cancel_button]
i = 1
while i != len(all_widgets):
self.setTabOrder(all_widgets[i-1], all_widgets[i])
i += 1
def connect_buttons(self):
self.cancel_button.pressed.connect(self.reject)
self.ok_button.pressed.connect(self.proceed)
self.browse_button.pressed.connect(self.fill_media_location)
self.more_button.toggled.connect(self.extension.setVisible)
self.save_button.pressed.connect(self.save_profiles_to_file)
self.load_button.pressed.connect(self.load_profiles_from_file)
def proceed(self):
if self.is_connection_valid():
profilename, info = self.collect_info()
if profilename in self.profiles:
self.profiles[profilename].update(info)
else:
self.profiles[profilename] = info
store_profiles(self.profiles)
use_chosen_profile(profilename)
self.accept()
def is_connection_valid(self):
profilename, info = self.collect_info()
mt = SignalSlotModelThread(lambda:None)
mt.start()
progress = ProgressDialog(_('Verifying database settings'))
mt.post(lambda:self.test_connection( info ),
progress.finished, progress.exception)
progress.exec_()
return self._connection_valid
def test_connection(self, profile):
self._connection_valid = False
connection_string = connection_string_from_profile( profile )
engine = create_engine(connection_string, pool_recycle=True)
try:
connection = engine.raw_connection()
cursor = connection.cursor()
cursor.close()
connection.close()
self._connection_valid = True
except Exception, e:
self._connection_valid = False
raise UserException( _('Could not connect to database, please check host and port'),
resolution = _('Verify driver, host and port or contact your system administrator'),
detail = unicode(e) )
def toggle_ok_button(self, enabled):
self.ok_button.setEnabled(enabled)
def current_profile(self):
text = unicode(self.profile_editor.currentText())
self.toggle_ok_button(bool(text))
return text
# FIXME can't get this to work properly (call in update_wizard_values(): port_editor )
# def _related_default_port(self, dialect_editor):
# class Missing(defaultdict):
# def __missing__(self, key):
# return ''
# ports = Missing(mysql='3306', postgresql='5432')
# return ports[dialect_editor.get_value()]
def update_wizard_values(self):
network_proxy = get_network_proxy()
# self.dialect_editor.set_value(self.get_profile_value('dialect') or 'mysql')
# self.host_editor.setText(self.get_profile_value('host') or '127.0.0.1')
# self.port_editor.setText(self.get_profile_value('port') or '3306')
self.dialect_editor.set_value(self.get_profile_value('dialect') or 'postgresql')
self.host_editor.setText(self.get_profile_value('host') or self.host_editor.text())
self.port_editor.setText(self.get_profile_value('port') or self.port_editor.text())
# self.port_editor.setText(self.get_profile_value('port') or self._related_default_port(self.dialect_editor))
self.database_name_editor.setText(self.get_profile_value('database') or self.database_name_editor.text())
self.username_editor.setText(self.get_profile_value('user') or self.username_editor.text())
self.password_editor.setText(self.get_profile_value('pass') or self.password_editor.text())
self.media_location_editor.setText(self.get_profile_value('media_location') or self.media_location_editor.text())
self.language_editor.set_value(self.get_profile_value('locale_language') or self.language_editor.get_value())
self.proxy_host_editor.setText(self.get_profile_value('proxy_host') or str(network_proxy.hostName()))
self.proxy_port_editor.setText(self.get_profile_value('proxy_port') or str(network_proxy.port()))
self.proxy_username_editor.setText(self.get_profile_value('proxy_username') or str(network_proxy.user()))
self.proxy_password_editor.setText(self.get_profile_value('proxy_password') or str(network_proxy.password()))
self.network_status_label.setText('')
self.network_status_label.setStyleSheet('')
@QtCore.pyqtSlot(QtNetwork.QNetworkProxy, QtNetwork.QAuthenticator)
def proxy_authentication_required(self, proxy, authenticator):
pass
@QtCore.pyqtSlot(QtNetwork.QNetworkReply)
def update_network_status(self, reply):
if reply.isFinished():
error = reply.error()
if error == QtNetwork.QNetworkReply.NoError:
self.network_status_label.setText(_('Internet available.'))
self.network_status_label.setStyleSheet('color: green')
return
self.network_status_label.setText(_('Internet not available.\n%s.'%reply.errorString()))
self.network_status_label.setStyleSheet('color: red')
@QtCore.pyqtSlot()
def new_network_request(self):
if self.network_reply and not self.network_reply.isFinished():
self.network_reply.abort()
if self.proxy_host_editor.text():
proxy = QtNetwork.QNetworkProxy( QtNetwork.QNetworkProxy.HttpProxy,
self.proxy_host_editor.text(),
int( str( self.proxy_port_editor.text() ) ) )#,
#self.proxy_username_editor.text(),
#self.proxy_password_editor.text() )
self.manager.setProxy( proxy )
else:
self.manager.setProxy( QtNetwork.QNetworkProxy() )
self.network_reply = self.manager.get( QtNetwork.QNetworkRequest( QtCore.QUrl('http://aws.amazon.com') ) )
def get_profile_value(self, key):
current = self.current_profile()
if current in self.profiles:
return self.profiles[current][key]
return ''
def collect_info(self):
logger.info('collecting new database profile info')
info = {}
profilename = self.current_profile()
info['dialect'] = self.dialect_editor.get_value()
info['host'] = self.host_editor.get_value()
info['port'] = self.port_editor.text()
info['database'] = self.database_name_editor.get_value()
info['user'] = self.username_editor.get_value()
info['pass'] = self.password_editor.get_value()
info['media_location'] = self.media_location_editor.get_value()
info['locale_language'] = self.language_editor.get_value()
info['proxy_host'] = self.proxy_host_editor.get_value()
info['proxy_port'] = self.proxy_port_editor.get_value()
info['proxy_username'] = self.proxy_username_editor.get_value()
info['proxy_password'] = self.proxy_password_editor.get_value()
return profilename, info
def fill_media_location(self):
caption = _('Select media location')
selected = unicode(QFileDialog.getExistingDirectory(self, caption))
if not selected:
return
info = QFileInfo(selected)
if not info.isReadable():
self.main_widget().layout().addWidget(
self.not_accessible_media_path_label, 13, 1, 1, 4)
return
if not info.isWritable():
self.main_widget().layout().addWidget(
self.not_writable_media_path_label, 13, 1, 1, 4)
return
self.media_location_editor.setText(selected)
def save_profiles_to_file(self):
caption = _('Save Profiles To a File')
filters = _('Profiles file (*.ini)')
path = QFileDialog.getSaveFileName(self, caption, 'profiles', filters)
if not path:
logger.debug('Could not save profiles to file; no path.')
return
store_profiles(self.profiles, to_file=path)
def load_profiles_from_file(self):
caption = _('Load Profiles From a File')
filters = _('Profiles file (*.ini)')
path = QFileDialog.getOpenFileName(self, caption, 'profiles', filters)
if not path:
logger.debug('Could not load profiles from file; no path.')
return
self.profiles = fetch_profiles(from_file=path)
if self.profiles:
store_profiles(self.profiles)
os.execv(sys.executable, [sys.executable] + sys.argv)