Python源码示例:requests.OAuth1Session()
示例1
def __init__(self, owner_or_token):
if isinstance(owner_or_token, User):
self.token = QuickbooksToken.objects.filter(user=owner_or_token).first()
elif isinstance(owner_or_token, QuickbooksToken):
self.token = owner_or_token
else:
raise ValueError("API must be initialized with either a QuickbooksToken or User")
session = OAuth1Session(client_key=settings.QUICKBOOKS['CONSUMER_KEY'],
client_secret=settings.QUICKBOOKS['CONSUMER_SECRET'],
resource_owner_key=self.token.access_token,
resource_owner_secret=self.token.access_token_secret)
session.headers.update({'content-type': 'application/json', 'accept': 'application/json'})
self.session = session
self.realm_id = self.token.realm_id
self.data_source = self.token.data_source
self.url_base = {'QBD': QUICKBOOKS_DESKTOP_V3_URL_BASE,
'QBO': QUICKBOOKS_ONLINE_V3_URL_BASE}[self.token.data_source]
示例2
def start_oauth():
next_page = request.args.get('next')
if next_page:
session['next'] = next_page
client_key = app.config['CLIENT_KEY']
client_secret = app.config['CLIENT_SECRET']
request_token_url = 'https://www.openstreetmap.org/oauth/request_token'
callback = url_for('oauth_callback', _external=True)
oauth = OAuth1Session(client_key,
client_secret=client_secret,
callback_uri=callback)
fetch_response = oauth.fetch_request_token(request_token_url)
session['owner_key'] = fetch_response.get('oauth_token')
session['owner_secret'] = fetch_response.get('oauth_token_secret')
base_authorization_url = 'https://www.openstreetmap.org/oauth/authorize'
authorization_url = oauth.authorization_url(base_authorization_url,
oauth_consumer_key=client_key)
return redirect(authorization_url)
示例3
def tweet(self, s, media=None):
if media is None:
params = {"status": s}
else:
params = {"status": s, "media_ids": [media]}
from requests_oauthlib import OAuth1Session
CK = self._preset_ck if self.consumer_key_type == 'ikalog' else self.consumer_key
CS = self._preset_cs if self.consumer_key_type == 'ikalog' else self.consumer_secret
twitter = OAuth1Session(
CK, CS, self.access_token, self.access_token_secret)
return twitter.post(self.url, params=params, verify=self._get_cert_path())
##
# Post a screenshot to Twitter
# @param self The object pointer.
# @param img The image to be posted.
# @return media The media ID
#
示例4
def check_import(self):
try:
from requests_oauthlib import OAuth1Session
except:
print("モジュール requests_oauthlib がロードできませんでした。 Twitter 投稿ができません。")
print("インストールするには以下のコマンドを利用してください。\n pip install requests_oauthlib\n")
##
# Constructor
# @param self The Object Pointer.
# @param consumer_key Consumer key of the application.
# @param consumer_secret Comsumer secret.
# @param auth_token Authentication token of the user account.
# @param auth_token_secret Authentication token secret.
# @param attach_image If true, post screenshots.
# @param footer Footer text.
# @param tweet_my_score If true, post score.
# @param tweet_kd If true, post killed/death.
# @param tweet_udemae If true, post udemae(rank).
# @param use_reply If true, post the tweet as a reply to @_ikalog_
#
示例5
def get_authorization_url(self, callback_uri):
session = OAuth1Session(
settings.OAUTH_CONSUMER_KEY,
client_secret=settings.OAUTH_CONSUMER_SECRET,
callback_uri=callback_uri,
)
try:
url = settings.API_HOST + settings.OAUTH_TOKEN_PATH
response = session.fetch_request_token(url, verify=settings.VERIFY)
except (ValueError, TokenRequestDenied, ConnectionError) as err:
raise AuthenticatorError(err)
else:
self.token = response.get('oauth_token')
self.secret = response.get('oauth_token_secret')
url = settings.API_HOST + settings.OAUTH_AUTHORIZATION_PATH
authorization_url = session.authorization_url(url)
LOGGER.log(logging.INFO, 'Initial token {}, secret {}'.format(
self.token, self.secret))
return authorization_url
示例6
def set_access_token(self, authorization_url):
session = OAuth1Session(
settings.OAUTH_CONSUMER_KEY,
settings.OAUTH_CONSUMER_SECRET,
resource_owner_key=self.token,
resource_owner_secret=self.secret,
)
session.parse_authorization_response(authorization_url)
url = settings.API_HOST + settings.OAUTH_ACCESS_TOKEN_PATH
try:
response = session.fetch_access_token(url)
except (TokenRequestDenied, ConnectionError) as err:
raise AuthenticatorError(err)
else:
self.token = response.get('oauth_token')
self.secret = response.get('oauth_token_secret')
LOGGER.log(logging.INFO, 'Updated token {}, secret {}'.format(
self.token, self.secret))
示例7
def __init__(
self, client_key, client_secret, resource_owner_key, resource_owner_secret
):
"""__init_()
"""
self.client_key = client_key
self.client_secret = client_secret
self.resource_owner_key = resource_owner_key
self.resource_owner_secret = resource_owner_secret
self.base_url_prod = r"https://api.etrade.com/v1/accounts"
self.base_url_dev = r"https://apisb.etrade.com/v1/accounts"
self.session = OAuth1Session(
self.client_key,
self.client_secret,
self.resource_owner_key,
self.resource_owner_secret,
signature_type="AUTH_HEADER",
)
示例8
def get_auth_token(self):
"""
Retrieves an auth_session_token using DEP server token data prepared as an
OAuth1Session() instance earlier on.
"""
# Retrieve session auth token
get_session = self.dep_prep('session', 'get', authsession=self.oauth)
response = self.oauth.send(get_session)
# Extract the auth session token from the JSON reply
token = response.json()['auth_session_token']
# The token happens to contain the UNIX timestamp of when it was generated
# so we save it for later reference.
timestamp = token[:10]
# Roll a human-readable timestamp as well.
ts_readable = datetime.datetime.fromtimestamp(
int(timestamp)).strftime(
'%Y-%m-%d %H:%M:%S')
print "Token generated at %s" % ts_readable
return token, timestamp
示例9
def get_trends(self, local):
"""
Method to get the trending hashtags.
:type local: str
:rtype: list of str
"""
session_string = "https://api.twitter.com/1.1/trends/place.json?id="
local_id = self.get_local_identifier()[local]
session_string += local_id
session = OAuth1Session(ConsumerKey,
ConsumerSecret,
AccessToken,
AccessTokenSecret)
response = session.get(session_string)
if response.__dict__['status_code'] == 200:
local_trends = json.loads(response.text)[0]["trends"]
hashtags = [trend["name"]
for trend in local_trends if trend["name"][0] == '#']
else:
hashtags = []
return hashtags
示例10
def update_twitter_profile_data(self):
if not self.twitter or not self.twitter_creds:
print u"Can't update twitter, doesn't have twitter username or twitter_creds"
return None
oauth = OAuth1Session(
os.getenv('TWITTER_CONSUMER_KEY'),
client_secret=os.getenv('TWITTER_CONSUMER_SECRET')
)
url = "https://api.twitter.com/1.1/users/lookup.json?screen_name={}".format(self.twitter)
r = oauth.get(url)
response_data = r.json()
first_profile = response_data[0]
keys_to_update = ["profile_image_url", "profile_image_url_https"]
for k in keys_to_update:
self.twitter_creds[k] = first_profile[k]
print u"Updated twitter creds for @{}".format(self.twitter)
return self.twitter_creds
示例11
def __setup_service(self, url, provided_oauth):
oauth = None
if provided_oauth is not None:
return provided_oauth
else:
if self.config is not None:
oauth = OAuth1Session(
self.config["app_token"],
client_secret=self.config["app_secret"],
resource_owner_key=self.config["access_token"],
resource_owner_secret=self.config["access_token_secret"],
realm=url,
)
if oauth is None:
raise ConnectionError("Failed to establish OAuth session.")
else:
return oauth
示例12
def test_get_articles(self):
mock_oauth = Mock(spec=OAuth1Session)
mock_oauth.get = MagicMock(
return_value=self.MockResponse(
TestCommon.cardmarket_find_user_articles_result, 200, "testing ok"
)
)
product_id = 1
result = self.api.get_articles(product_id, 0, mock_oauth)
self.assertEqual(result[0]["comments"], "x")
mock_oauth.get = MagicMock(
return_value=self.MockResponse(
TestCommon.cardmarket_find_user_articles_result, 206, "partial content"
)
)
product_id = 1
result = self.api.get_articles(product_id, 0, mock_oauth)
self.assertEqual(result[0]["comments"], "x")
示例13
def test_find_user_articles(self):
mock_oauth = Mock(spec=OAuth1Session)
mock_oauth.get = MagicMock(
return_value=self.MockResponse(
TestCommon.cardmarket_find_user_articles_result, 200, "testing ok"
)
)
user_id = 1
game_id = 1
result = self.api.find_user_articles(user_id, game_id, 0, mock_oauth)
self.assertEqual(result[0]["comments"], "x")
mock_oauth.get = MagicMock(
return_value=self.MockResponse(
TestCommon.cardmarket_find_user_articles_result, 206, "partial content"
)
)
result = self.api.find_user_articles(user_id, game_id, 0, mock_oauth)
self.assertEqual(result[0]["comments"], "x")
示例14
def __init__(self, consumer_key, consumer_secret, callback=None):
if type(consumer_key) == six.text_type:
consumer_key = consumer_key.encode('ascii')
if type(consumer_secret) == six.text_type:
consumer_secret = consumer_secret.encode('ascii')
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
self.access_token = None
self.access_token_secret = None
self.callback = callback
self.username = None
self.oauth = OAuth1Session(consumer_key,
client_secret=consumer_secret,
callback_uri=self.callback)
示例15
def get_access_token(self, verifier=None):
"""
After user has authorized the request token, get access token
with user supplied verifier.
"""
try:
url = self._get_oauth_url('access_token')
self.oauth = OAuth1Session(self.consumer_key,
client_secret=self.consumer_secret,
resource_owner_key=self.request_token['oauth_token'],
resource_owner_secret=self.request_token['oauth_token_secret'],
verifier=verifier, callback_uri=self.callback)
resp = self.oauth.fetch_access_token(url)
self.access_token = resp['oauth_token']
self.access_token_secret = resp['oauth_token_secret']
return self.access_token, self.access_token_secret
except Exception as e:
raise TweepError(e)
示例16
def Main(args):
# get access keys from a config file
config = ConfigParser()
config.read(args.config)
ConsumerKey = config.get('AccessKeys','ConsumerKey')
ConsumerSecret = config.get('AccessKeys','ConsumerSecret')
AccessToken = config.get('AccessKeys','AccessToken')
AccessTokenSecret = config.get('AccessKeys','AccessTokenSecret')
# open a session
session = OAuth1Session(ConsumerKey, ConsumerSecret, AccessToken, AccessTokenSecret)
# collect users from the queries
user_search = GETUsersSearch(session)
user_search.setParams(' '.join(args.queries), target_count=args.count)
user_search.waitReady()
result = user_search.call()
logger.info('obtained %d users' % len(result))
if args.dump:
logger.info('writing raw data to file %s' % args.dump)
json.dump(result, open(args.dump,'w'), indent=2)
if args.output:
logger.info('writing screen names to file %s' % args.output)
with open(args.output,'w') as f:
for user in result:
f.write(user['screen_name'] + '\n')
else:
for user in result:
sys.stdout.write(user['screen_name'] + '\n')
示例17
def request_oauth_token(request):
# We'll require a refresh in the blue dot cache
if BLUE_DOT_CACHE_KEY in request.session:
del request.session[BLUE_DOT_CACHE_KEY]
access_token_callback = settings.QUICKBOOKS['OAUTH_CALLBACK_URL']
if callable(access_token_callback):
access_token_callback = access_token_callback(request)
session = OAuth1Session(client_key=settings.QUICKBOOKS['CONSUMER_KEY'],
client_secret=settings.QUICKBOOKS['CONSUMER_SECRET'],
callback_uri=access_token_callback)
response = session.fetch_request_token(REQUEST_TOKEN_URL)
try:
request_token = response['oauth_token']
request_token_secret = response['oauth_token_secret']
request.session['qb_oauth_token'] = request_token
request.session['qb_oauth_token_secret'] = request_token_secret
except:
logger = logging.getLogger('quickbooks.views.request_oauth_token')
logger.exception(("Couldn't extract oAuth parameters from token " +
"request response. Response was '%s'"), response)
raise
return HttpResponseRedirect("%s?oauth_token=%s" % (AUTHORIZATION_URL, request_token))
示例18
def get_access_token(request):
# [todo] - add doc string for get_access_token
session = OAuth1Session(client_key=settings.QUICKBOOKS['CONSUMER_KEY'],
client_secret=settings.QUICKBOOKS['CONSUMER_SECRET'],
resource_owner_key=request.session['qb_oauth_token'],
resource_owner_secret=request.session['qb_oauth_token_secret'])
remote_response = session.parse_authorization_response('?{}'.format(request.META.get('QUERY_STRING')))
realm_id = remote_response['realmId']
data_source = remote_response['dataSource']
oauth_verifier = remote_response['oauth_verifier']
# [review] - Possible bug? This should be taken care of by session.parse_authorization_response
session.auth.client.verifier = unicode(oauth_verifier)
response = session.fetch_access_token(ACCESS_TOKEN_URL)
# Delete any existing access tokens
request.user.quickbookstoken_set.all().delete()
token = QuickbooksToken.objects.create(
user=request.user,
access_token=response['oauth_token'],
access_token_secret=response['oauth_token_secret'],
realm_id=realm_id,
data_source=data_source)
# Cache blue dot menu
try:
request.session[BLUE_DOT_CACHE_KEY] = None
blue_dot_menu(request)
except AttributeError:
raise Exception('The sessions framework must be installed for this ' +
'application to work.')
# Let everyone else know we conneted
qb_connected.send(None, token=token)
return render_to_response('oauth_callback.html',
{'complete_url': settings.QUICKBOOKS['ACCESS_COMPLETE_URL']})
示例19
def api_put_request(path, **kwargs):
user = g.user
assert user.is_authenticated
oauth = OAuth1Session(current_app.config['CLIENT_KEY'],
client_secret=current_app.config['CLIENT_SECRET'],
resource_owner_key=user.osm_oauth_token,
resource_owner_secret=user.osm_oauth_token_secret)
return oauth.request('PUT',
osm_api_base + path,
headers=user_agent_headers(),
**kwargs)
示例20
def api_request(path, **params):
user = g.user
assert user.is_authenticated
app = current_app
url = osm_api_base + path
if params:
url += '?' + urlencode(params)
client_key = app.config['CLIENT_KEY']
client_secret = app.config['CLIENT_SECRET']
oauth = OAuth1Session(client_key,
client_secret=client_secret,
resource_owner_key=user.osm_oauth_token,
resource_owner_secret=user.osm_oauth_token_secret)
return oauth.get(url, timeout=4)
示例21
def post_media(self, img):
result, img_png = cv2.imencode('.png', img)
if not result:
IkaUtils.dprint('%s: Failed to encode the image (%s)' %
(self, img.shape))
return None
files = { "media": img_png.tostring() }
CK = self._preset_ck if self.consumer_key_type == 'ikalog' else self.consumer_key
CS = self._preset_cs if self.consumer_key_type == 'ikalog' else self.consumer_secret
from requests_oauthlib import OAuth1Session
twitter = OAuth1Session(
CK, CS, self.access_token, self.access_token_secret
)
req = twitter.post(
self.url_media,
files=files,
verify=self._get_cert_path()
)
if req.status_code == 200:
return json.loads(req.text)['media_id']
IkaUtis.dprint('%s: Failed to post media.' % self)
return None
##
# get_text_game_individual_result
# Generate a record for on_game_individual_result.
# @param self The Object Pointer.
# @param context IkaLog context
#
示例22
def get_auth_link(self, auth_storage, share=False):
client_id = self._share_client_id if share else self._client_id
client_secret = self._clients[client_id]
twitter = OAuth1Session(client_id, client_secret=client_secret,
callback_uri=web.ctx.home + self._callback_page)
twitter.fetch_request_token(request_token_url)
authorization_url = twitter.authorization_url(authorization_base_url)
auth_storage["oauth_client_id"] = client_id
return authorization_url
示例23
def callback(self, auth_storage):
client_id = auth_storage.get("oauth_client_id", self._client_id)
client_secret = self._clients[client_id]
twitter = OAuth1Session(client_id, client_secret=client_secret,
callback_uri=web.ctx.home + self._callback_page)
try:
twitter.parse_authorization_response(web.ctx.home + web.ctx.fullpath)
twitter.fetch_access_token(access_token_url)
r = twitter.get('https://api.twitter.com/1.1/account/verify_credentials.json?include_email=true')
profile = json.loads(r.content.decode('utf-8'))
auth_storage["session"] = twitter
return str(profile["id"]), profile["name"], profile["email"], {}
except:
return None
示例24
def get_session(self):
session = OAuth1Session(
settings.OAUTH_CONSUMER_KEY,
client_secret=settings.OAUTH_CONSUMER_SECRET,
resource_owner_key=self.token,
resource_owner_secret=self.secret,
)
return session
示例25
def getClientKey():
auth_hunter = 'auth-hunter%02d' % random.randint(0,18)
CK = conf.get(auth_hunter, 'CK')
CS = conf.get(auth_hunter, 'CS')
AT = conf.get(auth_hunter, 'AT')
AS = conf.get(auth_hunter, 'AS')
client = OAuth1Session(CK, CS, AT, AS)
return client
示例26
def getTimeline():
params = {}
CK = conf.get('twitter', 'CK')
CS = conf.get('twitter', 'CS')
AT = conf.get('twitter', 'AT')
AS = conf.get('twitter', 'AS')
url = conf.get('twitter', 'timeline')
twitter = OAuth1Session(CK, CS, AT, AS)
try:
req = twitter.get(url, params = params)
except Exception as e:
logger.error(e)
return
if req.status_code == 200:
timeline = json.loads(req.text)
limit = req.headers['x-rate-limit-remaining']
reset = req.headers['x-rate-limit-reset']
logger.info("API remain: %s", limit)
logger.info("API reset: %s", reset)
return timeline
else:
logger.error("HTTP status_code: %d", req.status_code)
return
示例27
def __init__(self, consumer_key, consumer_secret, access_token, access_secret, batch_request_limit=100):
self.default_session = Session()
self.default_session.headers.update({
"X-Server-Protocol-Version": self.SERVER_PROTOCOL_VERSION,
"Content-Type": "application/json;charset=UTF8"
})
self.oauth_session = OAuth1Session(client_key=consumer_key,
client_secret=consumer_secret,
resource_owner_key=access_token,
resource_owner_secret=access_secret,
realm='ADM')
self.batch_request_limit = batch_request_limit
示例28
def __init__(
self, client_key, client_secret, resource_owner_key, resource_owner_secret
):
"""__init__(client_key, client_secret)
param: client_key
type: str
description: etrade client key
param: client_secret
type: str
description: etrade client secret
param: resource_owner_key
type: str
description: OAuth authentication token key
param: resource_owner_secret
type: str
description: OAuth authentication token secret"""
self.client_key = client_key
self.client_secret = client_secret
self.resource_owner_key = resource_owner_key
self.resource_owner_secret = resource_owner_secret
self.renew_access_token_url = r"https://api.etrade.com/oauth/renew_access_token"
self.revoke_access_token_url = (
r"https://api.etrade.com/oauth/revoke_access_token"
)
self.session = OAuth1Session(
self.client_key,
self.client_secret,
self.resource_owner_key,
self.resource_owner_secret,
signature_type="AUTH_HEADER",
)
示例29
def _make_request_session(self):
return requests_oauthlib.OAuth1Session(
client_key=self.config.consumer_key,
client_secret=self.config.consumer_secret,
resource_owner_key=self.config.token_id,
resource_owner_secret=self.config.token_secret,
realm=self.config.account,
)
示例30
def _do_oauth(self):
request_token_url = self.host + "/oauth/request_token"
authorization_url = self.host + "/oauth/authorize"
access_token_url = self.host + "/oauth/access_token"
oauth_session = OAuth1Session(self.api_key[self.cur_api_key], client_secret=self.api_secret[self.cur_api_key], callback_uri="paste_this") # TODO: use 'oob'
oauth_session.fetch_request_token(request_token_url)
redirect_url = oauth_session.authorization_url(authorization_url)
print "Flickr needs user authentication"
print "--------------------------------"
print "Visit this site:"
# Flickr permissions:
# read - permission to read private information
# write - permission to add, edit and delete photo metadata (includes 'read')
# delete - permission to delete photos (includes 'write' and 'read')
print redirect_url+"&perms=write"
redirect_response = raw_input('Paste the FULL URL here:')
oauth_session.parse_authorization_response(redirect_response)
oauth_session.fetch_access_token(access_token_url)
with open(self.oauth_file, 'w') as f:
pickle.dump(oauth_session, f)
return oauth_session