Python源码示例:requests.OAuth1()
示例1
def dologin():
"""Attempt to login."""
if not (
'access_token_key' in session and
'access_token_secret' in session
):
raise NameError("No access keys")
access_token = AccessToken(
session['access_token_key'],
session['access_token_secret']
)
session['username'] = handshaker.identify(access_token)['username']
auth = OAuth1(
client_key=consumer_token.key,
client_secret=consumer_token.secret,
resource_owner_key=access_token.key,
resource_owner_secret=access_token.secret
)
return auth
示例2
def _create_oauth_session(self, oauth, timeout):
verify = self._options["verify"]
from oauthlib.oauth1 import SIGNATURE_RSA
from requests_oauthlib import OAuth1
oauth = OAuth1(
oauth["consumer_key"],
rsa_key=oauth["key_cert"],
signature_method=SIGNATURE_RSA,
resource_owner_key=oauth["access_token"],
resource_owner_secret=oauth["access_token_secret"],
)
self._session = ResilientSession(timeout)
self._session.verify = verify
self._session.auth = oauth
示例3
def request(self, method, url, **kwargs):
"""Build remote url request. Constructs necessary auth."""
user_token = kwargs.pop('token', self.token)
token, secret, _ = self.parse_raw_token(user_token)
callback = kwargs.pop('oauth_callback', None)
verifier = kwargs.get('data', {}).pop('oauth_verifier', None)
oauth = OAuth1(
resource_owner_key=token,
resource_owner_secret=secret,
client_key=self.consumer_key,
client_secret=self.consumer_secret,
verifier=verifier,
callback_uri=callback,
)
kwargs['auth'] = oauth
return super(OAuthProvider, self).request(method, url, **kwargs)
示例4
def init_auth(file):
(CONSUMER_KEY,
CONSUMER_SECRET,
OAUTH_TOKEN,
OAUTH_TOKEN_SECRET) = open(file, 'r').read().splitlines()
auth_obj = requests_oauthlib.OAuth1(
CONSUMER_KEY, CONSUMER_SECRET,
OAUTH_TOKEN, OAUTH_TOKEN_SECRET)
if verify_credentials(auth_obj):
print('Validated credentials OK')
return auth_obj
else:
print('Credentials validation failed')
sys.exit(1)
示例5
def init_auth(file):
(CONSUMER_KEY,
CONSUMER_SECRET,
OAUTH_TOKEN,
OAUTH_TOKEN_SECRET) = open(file, 'r').read().splitlines()
auth_obj = requests_oauthlib.OAuth1(
CONSUMER_KEY, CONSUMER_SECRET,
OAUTH_TOKEN, OAUTH_TOKEN_SECRET)
if verify_credentials(auth_obj):
print('Validated credentials OK')
return auth_obj
else:
print('Credentials validation failed')
sys.exit(1)
示例6
def init_auth(file):
(CONSUMER_KEY,
CONSUMER_SECRET,
OAUTH_TOKEN,
OAUTH_TOKEN_SECRET) = open(file, 'r').read().splitlines()
auth_obj = requests_oauthlib.OAuth1(
CONSUMER_KEY, CONSUMER_SECRET,
OAUTH_TOKEN, OAUTH_TOKEN_SECRET)
if verify_credentials(auth_obj):
print('Validated credentials OK')
return auth_obj
else:
print('Credentials validation failed')
sys.exit(1)
示例7
def get_oauth():
filepath = "twitter.oauth"
with open(filepath, 'r'):
pass
config = configparser.SafeConfigParser()
config.read(filepath)
params = ['consumer_key', 'consumer_secret', 'oauth_token', 'oauth_token_secret']
if 'oauth' not in config.sections():
raise RuntimeError("Bad oauth file format %s, section missing: %s" % (filepath, 'oauth'))
oauth_config = dict(config.items('oauth'))
for param in params:
if param not in oauth_config:
raise RuntimeError("Bad oauth file format %s, not found param: %s" % (filepath, param))
oauth = OAuth1(oauth_config['consumer_key'],
client_secret=oauth_config['consumer_secret'],
resource_owner_key=oauth_config['oauth_token'],
resource_owner_secret=oauth_config['oauth_token_secret'])
return oauth
示例8
def get_profile_data(resource_owner_key, resource_owner_secret):
oauth = OAuth1(
settings.SOCIAL_AUTH_LINKEDIN_KEY,
client_secret=settings.SOCIAL_AUTH_LINKEDIN_SECRET,
resource_owner_key=resource_owner_key,
resource_owner_secret=resource_owner_secret
)
linkedin_api_endpoint = LINKEDIN_PROFILE_API_BASE_URL % ','.join(LINKEDIN_PROFILE_FIELDS)
response = requests.get(
url=linkedin_api_endpoint,
auth=oauth
)
# TODO: uncomment to purposely raise Exception and see format
#raise Exception(response.text)
linkedin_profile_data = json.loads(response.text)
return linkedin_profile_data
示例9
def send(self, message, fail_silently=False, options=None):
payload = {
"status": message
}
self._set_payload_from_options(payload, options, "twitter", [
"in_reply_to_status_id", "possibly_sensitive", "lat", "long",
"place_id", "display_coordinates", "trim_user", "media_ids"])
auth = OAuth1(self.api_key, self.api_secret, self.access_token,
self.access_token_secret)
try:
response = requests.post(self.url, auth=auth, data=payload)
if response.status_code != requests.codes.ok:
raise HttpError(response.status_code, response.text)
except Exception:
if not fail_silently:
raise
示例10
def get_twitter_request_token():
request_token_url = 'https://api.twitter.com/oauth/request_token'
oauth = OAuth1(
os.getenv('TWITTER_CONSUMER_KEY'),
client_secret=os.getenv('TWITTER_CONSUMER_SECRET'),
callback_uri=request.args.get('redirectUri')
)
r = requests.post(request_token_url, auth=oauth)
oauth_token_dict = dict(parse_qsl(r.text))
return jsonify(oauth_token_dict)
##########
# admin
示例11
def unauthorized_token_request(self):
"""Return request for unauthorized token (first stage)"""
params = self.request_token_extra_arguments()
params.update(self.get_scope_argument())
key, secret = self.get_key_and_secret()
# decoding='utf-8' produces errors with python-requests on Python3
# since the final URL will be of type bytes
decoding = None if six.PY3 else 'utf-8'
state = self.get_or_create_state()
auth = OAuth1(
key,
secret,
callback_uri=self.get_redirect_uri(state),
decoding=decoding,
signature_method=SIGNATURE_HMAC,
signature_type=SIGNATURE_TYPE_QUERY
)
url = self.REQUEST_TOKEN_URL + '?' + urlencode(params)
url, _, _ = auth.client.sign(url)
return url
示例12
def oauth_auth(self, token=None, oauth_verifier=None):
key, secret = self.get_key_and_secret()
oauth_verifier = oauth_verifier or self.data.get('oauth_verifier')
token = token or {}
# decoding='utf-8' produces errors with python-requests on Python3
# since the final URL will be of type bytes
decoding = None if six.PY3 else 'utf-8'
state = self.get_or_create_state()
return OAuth1(key, secret,
resource_owner_key=token.get('oauth_token'),
resource_owner_secret=token.get('oauth_token_secret'),
callback_uri=self.get_redirect_uri(state),
verifier=oauth_verifier,
signature_method=SIGNATURE_HMAC,
signature_type=SIGNATURE_TYPE_QUERY,
decoding=decoding)
示例13
def clean_oauth_auth(self, access_token):
"""Override of oauth_auth since Xing doesn't like callback_uri
and oauth_verifier on authenticated API calls"""
key, secret = self.get_key_and_secret()
resource_owner_key = access_token.get('oauth_token')
resource_owner_secret = access_token.get('oauth_token_secret')
if not resource_owner_key:
raise AuthTokenError(self, 'Missing oauth_token')
if not resource_owner_secret:
raise AuthTokenError(self, 'Missing oauth_token_secret')
# decoding='utf-8' produces errors with python-requests on Python3
# since the final URL will be of type bytes
decoding = None if six.PY3 else 'utf-8'
return OAuth1(key, secret,
resource_owner_key=resource_owner_key,
resource_owner_secret=resource_owner_secret,
signature_type=SIGNATURE_TYPE_AUTH_HEADER,
decoding=decoding)
示例14
def unauthorized_token(self):
"""Return request for unauthorized token (first stage)"""
params = self.request_token_extra_arguments()
params.update(self.get_scope_argument())
key, secret = self.get_key_and_secret()
# decoding='utf-8' produces errors with python-requests on Python3
# since the final URL will be of type bytes
decoding = None if six.PY3 else 'utf-8'
state = self.get_or_create_state()
response = self.request(
self.REQUEST_TOKEN_URL,
params=params,
auth=OAuth1(key, secret, callback_uri=self.get_redirect_uri(state),
decoding=decoding),
method=self.REQUEST_TOKEN_METHOD
)
content = response.content
if response.encoding or response.apparent_encoding:
content = content.decode(response.encoding or
response.apparent_encoding)
else:
content = response.content.decode()
return content
示例15
def oauth_auth(self, token=None, oauth_verifier=None,
signature_type=SIGNATURE_TYPE_AUTH_HEADER):
key, secret = self.get_key_and_secret()
oauth_verifier = oauth_verifier or self.data.get('oauth_verifier')
token = token or {}
# decoding='utf-8' produces errors with python-requests on Python3
# since the final URL will be of type bytes
decoding = None if six.PY3 else 'utf-8'
state = self.get_or_create_state()
return OAuth1(key, secret,
resource_owner_key=None,
resource_owner_secret=None,
callback_uri=self.get_redirect_uri(state),
verifier=oauth_verifier,
signature_type=signature_type,
decoding=decoding)
示例16
def make_request(self, method, url, data=None, params=None, headers=None,
timeout=60):
if headers is None:
headers = {'x-li-format': 'json', 'Content-Type': 'application/json'}
else:
headers.update({'x-li-format': 'json', 'Content-Type': 'application/json'})
if params is None:
params = {}
kw = dict(data=data, params=params,
headers=headers, timeout=timeout)
if isinstance(self.authentication, LinkedInDeveloperAuthentication):
# Let requests_oauthlib.OAuth1 do *all* of the work here
auth = OAuth1(self.authentication.consumer_key, self.authentication.consumer_secret,
self.authentication.user_token, self.authentication.user_secret)
kw.update({'auth': auth})
else:
params.update({'oauth2_access_token': self.authentication.token.access_token})
return requests.request(method.upper(), url, **kw)
示例17
def get_xauth_access_token(self, username, password):
"""
Get an access token from an username and password combination.
In order to get this working you need to create an app at
http://twitter.com/apps, after that send a mail to api@twitter.com
and request activation of xAuth for it.
"""
try:
url = self._get_oauth_url('access_token')
oauth = OAuth1(self.consumer_key,
client_secret=self.consumer_secret)
r = requests.post(url=url,
auth=oauth,
headers={'x_auth_mode': 'client_auth',
'x_auth_username': username,
'x_auth_password': password})
credentials = parse_qs(r.content)
return credentials.get('oauth_token')[0], credentials.get('oauth_token_secret')[0]
except Exception as e:
raise TweepError(e)
示例18
def edit_wiki_page(page_name, content, access_token, summary=None, bot=False):
auth = OAuth1(
app.config['CONSUMER_KEY'],
app.config['CONSUMER_SECRET'],
access_token['key'],
access_token['secret'])
# Get token
r = requests.get('https://en.wikipedia.org/w/api.php', params={
'action':'query',
'meta':'tokens',
'format': 'json',
}, auth=auth)
r.raise_for_status()
token = r.json()['query']['tokens']['csrftoken']
data = {
'action':'edit',
'title': page_name,
'text': content,
'summary': summary,
'format': 'json',
'token': token,
'watchlist': 'nochange',
}
if bot:
data['bot'] = '1'
r = requests.post('https://en.wikipedia.org/w/api.php', data=data,
auth=auth)
r.raise_for_status()
示例19
def SetCredentials(self,
consumer_key,
consumer_secret,
access_token_key=None,
access_token_secret=None,
application_only_auth=False):
"""Set the consumer_key and consumer_secret for this instance
Args:
consumer_key:
The consumer_key of the twitter account.
consumer_secret:
The consumer_secret for the twitter account.
access_token_key:
The oAuth access token key value you retrieved
from running get_access_token.py.
access_token_secret:
The oAuth access token's secret, also retrieved
from the get_access_token.py run.
application_only_auth:
Whether to generate a bearer token and use Application-Only Auth
"""
self._consumer_key = consumer_key
self._consumer_secret = consumer_secret
self._access_token_key = access_token_key
self._access_token_secret = access_token_secret
if application_only_auth:
self._bearer_token = self.GetAppOnlyAuthToken(consumer_key, consumer_secret)
self.__auth = OAuth2(token=self._bearer_token)
else:
auth_list = [consumer_key, consumer_secret,
access_token_key, access_token_secret]
if all(auth_list):
self.__auth = OAuth1(consumer_key, consumer_secret,
access_token_key, access_token_secret)
self._config = None
示例20
def __init__(self, consumer_key, consumer_secret, token, token_secret):
self.auth = OAuth1(consumer_key, consumer_secret, token, token_secret)
示例21
def get_city_trends(request, city_id):
"""
Returns a list of top trending tweets in the given city
:param request:
:param city_id:
:return: 404 if invalid city id is sent
:return: 503 if Twitter API request fails
:return: 200 successful
"""
try:
city = City.objects.get(pk=city_id)
except City.DoesNotExist:
error_message = "Invalid City ID"
return Response(error_message, status=status.HTTP_404_NOT_FOUND)
twitter_auth = OAuth1(TWITTER_CONSUMER_KEY, TWITTER_CONSUMER_SECRET, TWITTER_OAUTH_TOKEN,
TWITTER_OAUTH_TOKEN_SECRET)
# check if city WOEID is in database or not
if not city.woeid:
try:
url = TWITTER_TRENDS_URL + "closest.json?lat={0}&long={1}".format(city.latitude, city.longitude)
woeid_response = requests.get(url, auth=twitter_auth)
city.woeid = woeid_response.json()[0]['woeid']
city.save()
except Exception as e:
return Response(str(e), status=status.HTTP_503_SERVICE_UNAVAILABLE)
try:
url = TWITTER_TRENDS_URL + "place.json?id={0}".format(city.woeid)
api_response = requests.get(url, auth=twitter_auth)
response = api_response.json()[0]['trends']
except Exception:
return DOWNSTREAM_ERROR_RESPONSE
return Response(response)
示例22
def get_search_tweets(request, query):
"""
Returns a list of related tweets for given search query
:param request:
:param queryf:
:return: 503 if Twitter API request fails
:return: 200 successful
"""
twitter_auth = OAuth1(TWITTER_CONSUMER_KEY, TWITTER_CONSUMER_SECRET, TWITTER_OAUTH_TOKEN,
TWITTER_OAUTH_TOKEN_SECRET)
try:
url = TWITTER_SEARCH_URL + "tweets.json?q={0}".format(query)
api_response = requests.get(url, auth=twitter_auth)
api_response_json = api_response.json()
tweets = api_response_json['statuses']
response = []
for tweet in tweets:
result = SearchTweetResponse(
created_at=tweet['created_at'],
text=tweet['text'],
username=tweet['user']['screen_name'],
user_screen_name=tweet['user']['name'],
user_profile_image=tweet['user']['profile_image_url'],
retweet_count=tweet['retweet_count'],
favorite_count=tweet['favorite_count'],
tweet_url="https://www.twitter.com/" + tweet['user']['screen_name'] + "/status/" + tweet['id_str'],
)
result_as_json = result.to_json()
response.append(result_as_json)
except Exception:
return DOWNSTREAM_ERROR_RESPONSE
return Response(response)
示例23
def migrate(consumer_key, consumer_secret, access_token, access_secret, auth_client, scopes):
"""Migrates OAuth1 tokens to OAuth2 tokens
:param consumer_key: OAuth1 Consumer Key
:param consumer_secret: OAuth1 Consumer Secret
:param access_token: OAuth1 Access Token
:param access_secret: OAuth1 Access Secret
:param auth_client: AuthClient for OAuth2 specs
:type auth_client: `intuitlib.client.AuthClient`
:param scopes: list of `intuitlib.enum.Scopes`
:raises AuthClientError: if response status != 200
"""
if auth_client.environment.lower() == 'production':
migration_url = MIGRATION_URL['production']
else:
migration_url = MIGRATION_URL['sandbox']
auth_header = OAuth1(consumer_key, consumer_secret, access_token, access_secret)
headers = {
'Content-Type': 'application/json',
}
body = {
'scope': scopes_to_string(scopes),
'redirect_uri': auth_client.redirect_uri,
'client_id': auth_client.client_id,
'client_secret': auth_client.client_secret
}
send_request('POST', migration_url, headers, auth_client, body=json.dumps(body), oauth1_header=auth_header)
示例24
def send_request(method, url, header, obj, body=None, session=None, oauth1_header=None):
"""Makes API request using requests library, raises `intuitlib.exceptions.AuthClientError` if request not successful and sets specified object attributes from API response if request successful
:param method: HTTP method type
:param url: request URL
:param header: request headers
:param obj: object to set the attributes to
:param body: request body, defaults to None
:param session: requests session, defaults to None
:param oauth1_header: OAuth1 auth header, defaults to None
:raises AuthClientError: In case response != 200
:return: requests object
"""
headers = ACCEPT_HEADER
header.update(headers)
if session is not None and isinstance(session, Session):
response = session.request(method, url, headers=header, data=body, auth=oauth1_header)
else:
response = requests.request(method, url, headers=header, data=body, auth=oauth1_header)
if response.status_code != 200:
raise AuthClientError(response)
if response.content:
set_attributes(obj, response.json())
return response
示例25
def __init__(self, api_key, api_secret, access_token, access_token_secret):
super().__init__()
self.app_key = api_key
self.app_secret = api_secret
self.oauth_token = access_token
self.oauth_token_secret = access_token_secret
self.url = "https://api.twitter.com/1.1"
self.request_rate = 5
self.auth = OAuth1(self.app_key, self.app_secret, self.oauth_token,
self.oauth_token_secret)
self.last_request = time()
示例26
def _create_oauth_session(self, oauth_dict):
oauth = OAuth1(oauth_dict['consumer_key'],
rsa_key=oauth_dict['key_cert'], signature_method=SIGNATURE_RSA,
resource_owner_key=oauth_dict['access_token'],
resource_owner_secret=oauth_dict['access_token_secret'])
self._session.auth = oauth
示例27
def test_oauth_is_received():
"""Verifies if OAuth1 is received."""
url = "https://sandbox.cardmarket.com/ws/v1.1/output.json/games"
auth = MKMOAuth1(
os.environ.get("MKM_APP_TOKEN"),
client_secret=os.environ.get("MKM_APP_SECRET"),
resource_owner_key=os.environ.get("MKM_ACCESS_TOKEN"),
resource_owner_secret=os.environ.get("MKM_ACCESS_TOKEN_SECRET"),
realm=url,
)
assert isinstance(auth, OAuth1)
示例28
def __init__(self,
consumer_key: str = None,
consumer_secret: str = None,
access_token: str = None,
access_secret: str = None,
access_token_expiry: Optional[str] = None,
url: str = "https://mdmenrollment.apple.com") -> None:
self._session_token: Optional[str] = None
self._oauth = OAuth1(
consumer_key,
client_secret=consumer_secret,
resource_owner_key=access_token,
resource_owner_secret=access_secret,
)
if access_token_expiry is not None:
access_token_expiry_date = dateparser.parse(access_token_expiry)
self._access_token_expiry = access_token_expiry_date
else:
self._access_token_expiry = None
self._url = url
self._session = requests.session()
self._session.headers.update({
"X-Server-Protocol-Version": "3",
"Content-Type": "application/json;charset=UTF8",
"User-Agent": DEP.UserAgent,
})
self._retry_after: Optional[datetime] = None
示例29
def get_twitter_oauth():
# oauth = OAuth1(settings.TWITTER_CONSUMER_KEY,
# client_secret=settings.TWITTER_CONSUMER_SECRET,
# resource_owner_key=OAUTH_TOKEN,
# resource_owner_secret=OAUTH_TOKEN_SECRET)
oauth = OAuth2Session()
return oauth
示例30
def auth():
return OAuth1(settings.API_500PX_KEY, client_secret=settings.API_500PX_SECRET)