Python源码示例:requests.OAuth2Session()
示例1
def google_login():
# to avoid flask-login displaying the login error message
session.pop("_flashes", None)
next_url = request.args.get("next")
# Google does not allow to append param to redirect_url
# we need to pass the next url by session
if next_url:
session["google_next_url"] = next_url
google = OAuth2Session(GOOGLE_CLIENT_ID, scope=_scope, redirect_uri=_redirect_uri)
authorization_url, state = google.authorization_url(_authorization_base_url)
# State is used to prevent CSRF, keep this for later.
session["oauth_state"] = state
return redirect(authorization_url)
示例2
def facebook_login():
# to avoid flask-login displaying the login error message
session.pop("_flashes", None)
next_url = request.args.get("next")
# Facebook does not allow to append param to redirect_uri
# we need to pass the next url by session
if next_url:
session["facebook_next_url"] = next_url
facebook = OAuth2Session(
FACEBOOK_CLIENT_ID, scope=_scope, redirect_uri=_redirect_uri
)
facebook = facebook_compliance_fix(facebook)
authorization_url, state = facebook.authorization_url(_authorization_base_url)
# State is used to prevent CSRF, keep this for later.
session["oauth_state"] = state
return redirect(authorization_url)
示例3
def demo():
"""Step 1: User Authorization.
Redirect the user/resource owner to the OAuth provider (i.e. SimpleLogin)
using an URL with a few key OAuth parameters.
"""
simplelogin = OAuth2Session(
client_id, redirect_uri="http://127.0.0.1:5000/callback"
)
authorization_url, state = simplelogin.authorization_url(authorization_base_url)
# State is used to prevent CSRF, keep this for later.
session["oauth_state"] = state
return redirect(authorization_url)
# Step 2: User authorization, this happens on the provider.
示例4
def callback():
""" Step 3: Retrieving an access token.
The user has been redirected back from the provider to your registered
callback URL. With this redirection comes an authorization code included
in the redirect URL. We will use that to obtain an access token.
"""
simplelogin = OAuth2Session(client_id, state=session["oauth_state"])
token = simplelogin.fetch_token(
token_url, client_secret=client_secret, authorization_response=request.url
)
# At this point you can fetch protected resources but lets save
# the token and show how this is done from a persisted token
# in /profile.
session["oauth_token"] = token
return redirect(url_for(".profile"))
示例5
def _get_oauth_token(self):
"""
Get Monzo access token via OAuth2 `authorization code` grant type.
Official docs:
https://monzo.com/docs/#acquire-an-access-token
:returns: OAuth 2 access token
:rtype: dict
"""
url = urljoin(self.api_url, '/oauth2/token')
oauth = OAuth2Session(
client_id=self._client_id,
redirect_uri=config.REDIRECT_URI,
)
token = oauth.fetch_token(
token_url=url,
code=self._auth_code,
client_secret=self._client_secret,
)
return token
示例6
def session_from_client_secrets_file(client_secrets_file, scopes, **kwargs):
"""Creates a :class:`requests_oauthlib.OAuth2Session` instance from a
Google-format client secrets file.
Args:
client_secrets_file (str): The path to the `client secrets`_ .json
file.
scopes (Sequence[str]): The list of scopes to request during the
flow.
kwargs: Any additional parameters passed to
:class:`requests_oauthlib.OAuth2Session`
Returns:
Tuple[requests_oauthlib.OAuth2Session, Mapping[str, Any]]: The new
oauthlib session and the validated client configuration.
.. _client secrets:
https://developers.google.com/api-client-library/python/guide
/aaa_client_secrets
"""
with open(client_secrets_file, "r") as json_file:
client_config = json.load(json_file)
return session_from_client_config(client_config, scopes, **kwargs)
示例7
def start_session(self, session):
"""
Starts a oauth session with the information stored in the browser session.
The OAuth2Session will take care of most of the work. Just have to
set a token_updater to update a token for BitBucket.
Input:
session: django.HttpRequest.session
"""
if self._token_key in session:
extra = {
'client_id' : self._client_id,
'client_secret' : self._secret_id,
'auth' : (self._client_id, self._secret_id),
}
def token_updater(token):
update_session_token(session, self, token)
return OAuth2Session(
self._client_id,
token=session[self._token_key],
auto_refresh_url=self._token_url,
auto_refresh_kwargs=extra,
token_updater=token_updater,
)
return None
示例8
def start_session_for_user(self, user):
"""
Grabs the token for the user in the DB, then
starts a oauth session as that user.
Input:
user: models.GitUser: user to start the session for
Return:
OAuth2Session
"""
token = self.user_token_to_oauth_token(user)
extra = {
'client_id' : self._client_id,
'client_secret' : self._secret_id,
'auth' : (self._client_id, self._secret_id),
}
def token_updater(token):
update_user_token(user, token)
return OAuth2Session(
self._client_id,
token=token,
auto_refresh_url=self._token_url,
auto_refresh_kwargs=extra,
token_updater=token_updater,
)
示例9
def sign_in(self, request):
"""
Endpoint for the user signing in. Will start
the OAuth2 authentication process.
After this step the user will be redirected
to the server sign in page. After that, the server
will automatically call our registered callback,
which is "callback" above.
That will get access to the token that will be
used in all future communications.
"""
token = request.session.get(self._token_key)
request.session['source_url'] = request.GET.get('next', None)
if token:
messages.info(request, "Already signed in on %s" % self._hostname)
return self.do_redirect(request)
oauth_session = OAuth2Session(self._client_id, scope=self._scope, redirect_uri=self._redirect_uri)
authorization_url, state = oauth_session.authorization_url(self._auth_url)
request.session[self._state_key] = state
# Get rid of these keys on login
for key in self._addition_keys:
request.session.pop(key, None)
return redirect(authorization_url)
示例10
def _get_oauth_session(self):
"""Creates a new OAuth session
:return:
- OAuth2Session object
"""
return self._get_session(
OAuth2Session(
client_id=self.client_id,
token=self.token,
token_updater=self.token_updater,
auto_refresh_url=self.token_url,
auto_refresh_kwargs={
"client_id": self.client_id,
"client_secret": self.client_secret,
},
)
)
示例11
def __init__(
self,
client_id: str,
client_secret: str,
redirect_uri: Optional[str] = None,
token: Optional[Dict[str, str]] = None,
token_updater: Optional[Callable[[str], None]] = None,
):
self.client_id = client_id
self.client_secret = client_secret
self.token_updater = token_updater
extra = {"client_id": self.client_id, "client_secret": self.client_secret}
self._oauth = OAuth2Session(
client_id=client_id,
token=token,
redirect_uri=redirect_uri,
auto_refresh_kwargs=extra,
token_updater=token_updater,
)
示例12
def __init__(self, credentials):
self.credentials = credentials
self.token = {
'access_token': credentials.access_token,
'refresh_token': credentials.refresh_token,
'token_type': credentials.token_type,
'expires_in': str(int(credentials.token_expiry) - ts()),
}
oauth_client = WebApplicationClient(credentials.client_id,
token=self.token, default_token_placement='query')
self.client = OAuth2Session(
credentials.client_id,
token=self.token,
client=oauth_client,
auto_refresh_url='{}/oauth2/token'.format(NokiaAuth.URL),
auto_refresh_kwargs={
'client_id': credentials.client_id,
'client_secret': credentials.consumer_secret,
},
token_updater=self.set_token
)
示例13
def callback(self, auth_storage):
linkedin = OAuth2Session(self._client_id, state=auth_storage["oauth_state"], redirect_uri=web.ctx.home + self._callback_page)
try:
linkedin.fetch_token(token_url, include_client_id=True, client_secret=self._client_secret,
authorization_response=web.ctx.home + web.ctx.fullpath)
r = linkedin.get('https://api.linkedin.com/v2/me?projection=(id,localizedFirstName,localizedLastName)')
profile = json.loads(r.content.decode('utf-8'))
r = linkedin.get('https://api.linkedin.com/v2/clientAwareMemberHandles?q=members&projection=(elements*(primary,type,handle~))')
result = json.loads(r.content.decode('utf-8'))
for contact in result["elements"]:
if contact["type"] == "EMAIL":
profile["emailAddress"] = contact["handle~"]["emailAddress"]
break
return str(profile["id"]), profile["localizedFirstName"] + " " + profile["localizedLastName"], profile["emailAddress"], {}
except Exception as e:
return None
示例14
def share(self, auth_storage, course, task, submission, language):
facebook = OAuth2Session(self._client_id, state=auth_storage["oauth_state"], redirect_uri=web.ctx.home + self._callback_page)
if facebook:
r = facebook.post("https://graph.facebook.com/me/objects/website",
{
"object": json.dumps({
"og:title": _("INGInious | {course} - {task}").format(
course=course.get_name(language),
task=task.get_name(language)
),
"og:description": _("Check out INGInious course {course} and beat my score of {score}% on task {task} !").format(
course=course.get_name(language),
task=task.get_name(language),
score=submission["grade"]
),
"og:url": web.ctx.home + "/course/" + course.get_id() + "/" + task.get_id(),
"og:image": "http://www.inginious.org/assets/img/header.png"})
})
result = json.loads(r.content.decode('utf-8'))
return "id" in result
示例15
def session(self):
if not hasattr(self, '_oauth_session'):
if not current_user.is_anonymous:
kwargs = {
'token': current_user.token,
}
if self.refresh_url is not None:
kwargs['auto_refresh_url'] = self.refresh_url
kwargs['token_updater'] = token_saver
kwargs['auto_refresh_kwargs'] = {
'client_id': self.client_id,
'client_secret': self.client_secret,
}
current_app.logger.debug(u"Creating a new OAuth2Session for "
u"'{}'".format(current_user))
self._oauth_session = OAuth2Session(self.client_id, **kwargs)
return self._oauth_session
示例16
def get_token(parameters: dict, offline=False):
oauth = OAuth2Session(AuthParameters.CLIENT_ID, redirect_uri=get_redirect_uri(offline))
try:
if 'code' in parameters:
token = oauth.fetch_token(urljoin(DEX_URL, AuthParameters.TOKEN_PATH),
code=parameters['code'],
client_secret=AuthParameters.CLIENT_SECRET)
elif 'refresh_token' in parameters:
extra = {'client_id': AuthParameters.CLIENT_ID,
'client_secret': AuthParameters.CLIENT_SECRET}
token = oauth.refresh_token(urljoin(DEX_URL, AuthParameters.TOKEN_PATH),
refresh_token=parameters['refresh_token'],
**extra)
except Exception as e:
raise MissingTokenException(e)
return token
示例17
def get_nebula_client(client_id, client_secret, account_id, use_ssl):
client_scope = ["read", "write", "execute"]
headers = {"x-mwb-clientid": client_id,
"x-mwb-accountid": account_id}
client = BackendApplicationClient(client_id, scope=client_scope)
nebula = OAuth2Session(client=client, scope=client_scope)
nebula.headers.update(headers)
token = nebula.fetch_token(
token_url=nebula_url('/oauth2/token'),
client_secret=client_secret, scope=client_scope,
verify=use_ssl
)
return "Bearer " + token.get('access_token')
# Test connectivity to the Nebula cloud
示例18
def refresh():
token = __get_session_token__()
refresh_token_arg = request.args.get('refresh_token')
if refresh_token_arg != None:
refresh_token = refresh_token_arg
else:
try:
refresh_token = token['refresh_token']
except KeyError:
refresh_token = ''
questradeAPI = OAuth2Session(client_id, token=token)
token = questradeAPI.refresh_token(token_url, refresh_token=refresh_token)
__set_session_token__(token)
return redirect(url_for('.token'))
示例19
def login(self, **kwargs):
try:
oauth2_session = requests_oauthlib.OAuth2Session(client_id=self.get_config_value('CLIENT_ID'),
redirect_uri=self.callback_url(**kwargs),
scope=self.get_config_value('SCOPES'),
state=flask.session[self.state_session_key])
token = oauth2_session.fetch_token(self.get_config_value('TOKEN_URL'),
code=flask.request.args['code'], client_secret=self.get_config_value('CLIENT_SECRET'))
except Exception as exc:
flask.current_app.logger.exception("exception while retrieving token")
self.login_failure_func(exc)
try:
profile = self.get_profile(oauth2_session)
except Exception as exc:
flask.current_app.logger.exception("exception while retrieving profile")
self.login_failure_func(exc)
# flask.current_app.logger.debug("profile: {!s}".format(pprint.pformat(profile)))
return self.login_success_func(token, profile, **kwargs)
示例20
def get_mock_gbdx_session(token='dummytoken'):
''' make a mock session so tests can make requests to VCRpy
without having to authorize
Kwargs:
token(str): pass a real token to get a real session '''
s = OAuth2Session(client=LegacyApplicationClient('asdf'),
auto_refresh_url='fdsa',
auto_refresh_kwargs={'client_id':'asdf',
'client_secret':'fdsa'})
s.token = token
s.access_token = token
return s
# Set up a GBDX session for tests to use.
# By default this object will be a mocked connection since we're testing against cassettes.
# To get a real connection to regenerate VCRpy cassettes set the envvar "WRITE_CASSETTE"
# before you run the tests. See /tests/readme.md for more information
示例21
def __init__(
self,
client_id: str,
consumer_secret: str,
callback_uri: str,
scope: Iterable[AuthScope] = tuple(),
mode: Optional[str] = None,
):
"""Initialize new object."""
self._client_id: Final = client_id
self._consumer_secret: Final = consumer_secret
self._callback_uri: Final = callback_uri
self._scope: Final = scope
self._mode: Final = mode
self._session: Final = OAuth2Session(
self._client_id,
redirect_uri=self._callback_uri,
scope=",".join((scope.value for scope in self._scope)),
)
示例22
def initialize_api_client(self):
"""
Initialize OAuth2Session client depending on client credentials flow or authorization grant flow
"""
if self.code is None:
# client credentials flow
self.api_client = OAuth2Session(self.client_id, client=BackendApplicationClient(self.client_id),
token=self.token)
else:
# authorization grant flow
refresh_url = self.token_url if self.auto_refresh else None
self.api_client = OAuth2Session(self.client_id, redirect_uri=self.redirect_uri, scope=self.scope,
auto_refresh_url=refresh_url, token_updater=self.token_refresh_callback,
auto_refresh_kwargs={
'client_id': self.client_id,
'client_secret': self.client_secret})
示例23
def step1(request):
# initialization
constants = settings.CONSTANTS
CLIENT_ID = constants['CLIENT_ID']
STEP_1_TEMPLATE_NAME = constants['STEP_1_TEMPLATE_NAME']
REDIRECT_URI = constants['REDIRECT_URI']
AUTHORIZATION_BASE_URL = constants['AUTHORIZATION_BASE_URL']
context = {'initialize':''}
# GET as a result of initial invocation
if request.method == 'GET':
# create a 'requests' Oauth2Session
azure_session = OAuth2Session(CLIENT_ID, redirect_uri=REDIRECT_URI)
# do the outreach to https://login.windows.net/common/oauth2/authorize
authorization_url, state = azure_session.authorization_url(AUTHORIZATION_BASE_URL % 'common')
resp = requests.get(authorization_url)
# go to the login page of AAD & authenticate
return redirect(resp.url)
示例24
def __init__(self, client_id, client_secret):
"""
Initialize the client for oauth flow.
:param client_id: The client id from oura portal.
:type client_id: str
:param client_secret: The client secret from oura portal.
:type client_secret: str
"""
self.client_id = client_id
self.client_secret = client_secret
self.session = OAuth2Session(
client_id,
auto_refresh_url=self.TOKEN_BASE_URL,
)
示例25
def authorize(self):
token_url = self._BASE_URL + "oauth2/token"
client = LegacyApplicationClient(client_id=self.client_id)
self.oauth = OAuth2Session(client=client, scope=['read_station'])
self.oauth.fetch_token(token_url=token_url,
username=self.username, password=self.password,
client_id=self.client_id, client_secret=self.client_secret)
示例26
def github_login():
next_url = request.args.get("next")
if next_url:
redirect_uri = _redirect_uri + "?next=" + encode_url(next_url)
else:
redirect_uri = _redirect_uri
github = OAuth2Session(
GITHUB_CLIENT_ID, scope=["user:email"], redirect_uri=redirect_uri
)
authorization_url, state = github.authorization_url(_authorization_base_url)
# State is used to prevent CSRF, keep this for later.
session["oauth_state"] = state
return redirect(authorization_url)
示例27
def make_session(token: str = None,
state: str = None,
scope: List[str] = None) -> OAuth2Session:
return OAuth2Session(
client_id=OAUTH2_CLIENT_ID,
token=token,
state=state,
scope=scope,
redirect_uri=redirect_uri(),
auto_refresh_kwargs={
'client_id': OAUTH2_CLIENT_ID,
'client_secret': OAUTH2_CLIENT_SECRET,
},
auto_refresh_url=TOKEN_URL,
token_updater=token_updater)
示例28
def _make_session(self, token: str = None, state: str = None, scope: list = None) -> OAuth2Session:
"""A low level method used for creating OAuth2 session.
Parameters
----------
token : str, optional
The authorization token to use which was previously received from authorization code grant.
state : str, optional
The state to use for OAuth2 session.
scope : list, optional
List of valid `Discord OAuth2 Scopes
<https://discordapp.com/developers/docs/topics/oauth2#shared-resources-oauth2-scopes>`_.
Returns
-------
OAuth2Session
An instance of OAuth2Session class.
"""
return OAuth2Session(
client_id=self.client_id,
token=token or session.get("DISCORD_OAUTH2_TOKEN"),
state=state or session.get("DISCORD_OAUTH2_STATE"),
scope=scope,
redirect_uri=self.redirect_uri,
auto_refresh_kwargs={
'client_id': self.client_id,
'client_secret': self.client_secret,
},
auto_refresh_url=configs.DISCORD_TOKEN_URL,
token_updater=self._token_updater)
示例29
def _connect(self):
"""
Retrieve token from USMA API and create an authenticated session
:returns OAuth2Session: authenticated client session
"""
oauth_client = BackendApplicationClient(client_id=self.client_id)
oauth_session = OAuth2Session(client=oauth_client)
token = oauth_session.fetch_token(token_url=self.url + "oauth/token",
client_id=self.client_id,
client_secret=self.client_secret)
#verify=False)
return OAuth2Session(client_id=self.client_id,
token=token)
示例30
def _get_response(self, method, endpoint, params=None):
"""
Helper method to handle HTTP requests and catch API errors
:param method: valid HTTP method
:type method: str
:param endpoint: API endpoint
:type endpoint: str
:param params: extra parameters passed with the request
:type params: dict
:returns: API response
:rtype: Response
"""
url = urljoin(self.api_url, endpoint)
try:
response = getattr(self._session, method)(url, params=params)
# Check if Monzo API returned HTTP 401, which could mean that the
# token is expired
if response.status_code == 401:
raise TokenExpiredError
except TokenExpiredError:
# For some reason 'requests-oauthlib' automatic token refreshing
# doesn't work so we do it here semi-manually
self._refresh_oath_token()
self._session = OAuth2Session(
client_id=self._client_id,
token=self._token,
)
response = getattr(self._session, method)(url, params=params)
if response.status_code != requests.codes.ok:
raise MonzoAPIError(
"Something went wrong: {}".format(response.json())
)
return response