Python源码示例:requests.install_cache()
示例1
def import_places(self):
if self.options['cached']:
requests_cache.install_cache('tprek')
queryset = Place.objects.filter(data_source=self.data_source)
if self.options.get('single', None):
obj_id = self.options['single']
obj_list = [self.pk_get('unit', obj_id)]
queryset = queryset.filter(id=obj_id)
else:
logger.info("Loading units...")
obj_list = self.pk_get('unit')
logger.info("%s units loaded" % len(obj_list))
syncher = ModelSyncher(queryset, lambda obj: obj.origin_id, delete_func=self.mark_deleted,
check_deleted_func=self.check_deleted)
for idx, info in enumerate(obj_list):
if idx and (idx % 1000) == 0:
logger.info("%s units processed" % idx)
self._import_unit(syncher, info)
syncher.finish(self.options.get('remap', False))
示例2
def setup(self):
self.tprek_data_source = DataSource.objects.get(id='tprek')
ds_args = dict(id=self.name)
ds_defaults = dict(name='City of Espoo')
self.data_source, _ = DataSource.objects.get_or_create(defaults=ds_defaults, **ds_args)
org_args = dict(origin_id='kaupunki', data_source=self.data_source)
org_defaults = dict(name='Espoon kaupunki')
self.organization, _ = Organization.objects.get_or_create(defaults=org_defaults, **org_args)
self._build_cache_places()
self._cache_yso_keywords()
if self.options['cached']:
requests_cache.install_cache('espoo')
self.cache = requests_cache.get_cache()
else:
self.cache = None
示例3
def setup_requests_cachedir():
"""Sets up local caching for faster remote HTTP requests.
Caching directory will be set up in the user's home directory under
a .nfcore_cache subdir.
"""
# Only import it if we need it
import requests_cache
pyversion = '.'.join(str(v) for v in sys.version_info[0:3])
cachedir = os.path.join(os.getenv("HOME"), os.path.join('.nfcore', 'cache_'+pyversion))
if not os.path.exists(cachedir):
os.makedirs(cachedir)
requests_cache.install_cache(
os.path.join(cachedir, 'github_info'),
expire_after=datetime.timedelta(hours=1),
backend='sqlite',
)
示例4
def init_requests_cache(refresh_cache=False):
"""
Initializes a cache which the ``requests`` library will consult for
responses, before making network requests.
:param refresh_cache: Whether the cache should be cleared out
"""
# Cache data from external sources; used in some checks
dirs = AppDirs("stix2-validator", "OASIS")
# Create cache dir if doesn't exist
try:
os.makedirs(dirs.user_cache_dir)
except OSError as e:
if e.errno != errno.EEXIST:
raise
requests_cache.install_cache(
cache_name=os.path.join(dirs.user_cache_dir, 'py{}cache'.format(
sys.version_info[0])),
expire_after=datetime.timedelta(weeks=1))
if refresh_cache:
clear_requests_cache()
示例5
def shodan_query(query, api_key, cache=True, verbose=False):
if verbose:
logging.basicConfig(level=logging.INFO, format='%(message)s')
if cache:
homedir = Path(os.path.expanduser('~'))
requests_cache.install_cache(str(homedir / '.habu_requests_cache'), expire_after=3600)
url = 'https://api.shodan.io/shodan/host/search?key={}&query={}'.format(api_key, query)
r = requests.get(url)
if r.status_code not in [200, 404]:
logging.error(str(r))
return {}
if r.status_code == 404:
return {}
data = r.json()
return data
示例6
def main():
if os.path.isfile(BASEDIR):
sys.exit('Please remove your old configuration file at {}'.format(BASEDIR))
os.makedirs(BASEDIR, exist_ok=True)
global CONFIG
CONFIG = read_configuration(CONFFILE)
locale.setlocale(locale.LC_MONETARY, CONFIG['locale'].get('monetary', ''))
requests_cache.install_cache(cache_name='api_cache', backend='memory',
expire_after=int(CONFIG['api'].get('cache', 10)))
curses.wrapper(mainc)
示例7
def setup(self):
defaults = dict(name='Matkailu- ja kongressitoimisto')
self.data_source, _ = DataSource.objects.get_or_create(id=self.name, defaults=defaults)
self.tprek_data_source = DataSource.objects.get(id='tprek')
ytj_ds, _ = DataSource.objects.get_or_create(defaults={'name': 'YTJ'}, id='ytj')
org_args = dict(origin_id='0586977-6', data_source=ytj_ds)
defaults = dict(name='Helsingin Markkinointi Oy')
self.organization, _ = Organization.objects.get_or_create(
defaults=defaults, **org_args)
place_list = Place.objects.filter(data_source=self.tprek_data_source, deleted=False)
deleted_place_list = Place.objects.filter(data_source=self.tprek_data_source,
deleted=True)
# Get only places that have unique names
place_list = place_list.annotate(count=Count('name_fi')).filter(count=1).values('id', 'origin_id', 'name_fi')
deleted_place_list = deleted_place_list.annotate(count=Count('name_fi')).\
filter(count=1).values('id', 'origin_id', 'name_fi', 'replaced_by_id')
self.tprek_by_name = {p['name_fi'].lower(): (p['id'], p['origin_id']) for p in place_list}
self.deleted_tprek_by_name = {
p['name_fi'].lower(): (p['id'], p['origin_id'], p['replaced_by_id'])
for p in deleted_place_list}
if self.options['cached']:
requests_cache.install_cache('matko')
示例8
def __init__(self, username, password, verify=True, debug=False):
assert username is not None
assert password is not None
assert verify is not None
assert debug is not None
self._username = username
self._password = password
self._verify = verify
self._debug = debug
if self._request_caching_enabled:
self.request_count = self.get_cached_request_count()
if not self._verify:
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
if self._debug:
self.enable_request_debugging()
else:
self.enable_error_logging()
# cache requests for 24 hours
if self._request_caching_enabled:
requests_cache.install_cache('capiq_cache', backend='sqlite', expire_after=86400, allowable_methods=('POST',))
# This function retrieves a single data point for a point in time value for a mnemonic either current or
# historical. Default inputs include a Mnemonic and a Security/Entity Identifier
#
# Returns a nested dictionary, where the primary key is the identifier and the secondary key is the mnemonic.
# In case of an error, a None value is returned for that mnemonic and Cap IQ's error is logged
示例9
def enable_cache():
"""Enable requests library cache."""
try:
import requests_cache
except ImportError as err:
sys.stderr.write("Failed to enable cache: {0}\n".format(str(err)))
return
if not os.path.exists(CACHE_DIR):
os.makedirs(CACHE_DIR)
requests_cache.install_cache(CACHE_FILE)
示例10
def setup_paasta_api():
if os.environ.get("PAASTA_API_DEBUG"):
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.WARNING)
# pyinotify is a better solution than turning off file caching completely
service_configuration_lib.disable_yaml_cache()
settings.system_paasta_config = load_system_paasta_config()
if os.environ.get("PAASTA_API_CLUSTER"):
settings.cluster = os.environ.get("PAASTA_API_CLUSTER")
else:
settings.cluster = settings.system_paasta_config.get_cluster()
settings.marathon_clients = marathon_tools.get_marathon_clients(
marathon_tools.get_marathon_servers(settings.system_paasta_config)
)
settings.marathon_servers = marathon_tools.get_marathon_servers(
system_paasta_config=settings.system_paasta_config
)
settings.marathon_clients = marathon_tools.get_marathon_clients(
marathon_servers=settings.marathon_servers, cached=False
)
try:
settings.kubernetes_client = kubernetes_tools.KubeClient()
except FileNotFoundError:
log.info("Kubernetes not found")
settings.kubernetes_client = None
except Exception:
log.exception("Error while initializing KubeClient")
settings.kubernetes_client = None
# Set up transparent cache for http API calls. With expire_after, responses
# are removed only when the same request is made. Expired storage is not a
# concern here. Thus remove_expired_responses is not needed.
requests_cache.install_cache("paasta-api", backend="memory", expire_after=5)
示例11
def use_requests_cache(
cache_name: str, backend: str = "memory", **kwargs: Any
) -> Callable[[_UseRequestsCacheFuncT], _UseRequestsCacheFuncT]:
def wrap(fun: _UseRequestsCacheFuncT) -> _UseRequestsCacheFuncT:
def fun_with_cache(*args: Any, **kwargs: Any) -> Any:
requests_cache.install_cache(cache_name, backend=backend, **kwargs)
result = fun(*args, **kwargs)
requests_cache.uninstall_cache()
return result
return cast(_UseRequestsCacheFuncT, fun_with_cache)
return wrap
示例12
def fetch(outfile):
"""The main function for downloading all scripts from github."""
if not os.path.exists(REQUESTS_CACHE):
os.makedirs(REQUESTS_CACHE)
requests_cache.install_cache(REQUESTS_CACHE)
result = []
label_counts = defaultdict(int)
print('Fetching scripts')
for label, url in DATA_URLS.items():
print(url)
scripts = fetch_scripts(url)
for script in scripts:
try:
result.append({
'tree': build_tree(script), 'metadata': {'label': label}
})
label_counts[label] += 1
except Exception as err:
print(err)
print('Label counts: ', label_counts)
print('Dumping scripts')
with open(outfile, 'wb') as file_handler:
pickle.dump(result, file_handler)
示例13
def set_cache(refresh=False):
""" install the static Requests cache """
if refresh:
expire_after = datetime.timedelta(seconds=0)
else:
expire_after = datetime.timedelta(days=30)
requests_cache.install_cache(
cache_name=os.path.join(os.path.dirname(__file__), "cache"),
allowable_methods=('GET', 'POST'), expire_after=expire_after)
requests_cache.core.remove_expired_responses()
示例14
def set_cache(refresh=False):
""" install the static Requests cache """
if refresh:
expire_after = datetime.timedelta(seconds=0)
else:
expire_after = datetime.timedelta(days=30)
requests_cache.install_cache(
cache_name=os.path.join(os.path.dirname(__file__), "cache"),
allowable_methods=('GET', 'POST'), expire_after=expire_after)
requests_cache.core.remove_expired_responses()
示例15
def set_cache(refresh=False):
""" install the static Requests cache """
if refresh:
expire_after = datetime.timedelta(seconds=0)
else:
expire_after = datetime.timedelta(days=30)
requests_cache.install_cache(
cache_name=os.path.join(os.path.dirname(__file__), "cache"),
allowable_methods=('GET', 'POST'), expire_after=expire_after)
requests_cache.core.remove_expired_responses()
示例16
def pytest_configure(config):
if config.getoption('--use-cache'):
import requests_cache
requests_cache.install_cache('test_cache')
api = Api()
pytest.game_ids = api.GetSeasonGameIDs('2009-10', 'Regular Season')[:2] # Hack to carry the gameids to tests
pytest.game_ids = ['0020900292']
示例17
def use_requests_cache():
import requests_cache
requests_cache.install_cache('test_cache')
示例18
def pytest_runtest_setup(item):
# called for running each test in 'a' directory
import requests_cache
requests_cache.install_cache('test_cache')
示例19
def __init__(self, cache=False,
cache_filename="requests.cache"):
self._cache = cache
if cache:
requests_cache.install_cache(cache_filename)
self._transform_json = True
示例20
def cache_requests():
testdir = os.path.dirname(os.path.abspath(__file__))
location = os.path.join(testdir, 'data/requests_cache')
requests_cache.install_cache(cache_name=location)
yield
requests_cache.uninstall_cache()
示例21
def get_generic_session() -> requests.Session:
"""Get or create a requests session for gatherer."""
if mtgjson4.USE_CACHE.get():
requests_cache.install_cache(
str(mtgjson4.PROJECT_CACHE_PATH.joinpath("general_cache")),
expire_after=mtgjson4.SESSION_CACHE_EXPIRE_GENERAL,
)
session: Optional[requests.Session] = SESSION.get(None)
if not session:
session = requests.Session()
session = retryable_session(session)
SESSION.set(session)
return session
示例22
def __get_session() -> requests.Session:
"""Get or create a requests session for TCGPlayer."""
global GH_DB_URL, GH_DB_KEY, GH_API_KEY, GH_API_USER, GH_DB_FILE
if mtgjson4.USE_CACHE.get(False):
requests_cache.install_cache(
str(mtgjson4.PROJECT_CACHE_PATH.joinpath("tcgplayer_cache")),
expire_after=mtgjson4.SESSION_CACHE_EXPIRE_TCG,
)
session: Optional[requests.Session] = SESSION.get(None)
if session is None:
session = requests.Session()
header_auth = {"Authorization": "Bearer " + _request_tcgplayer_bearer()}
# Open and read MTGJSON secret properties
config = configparser.RawConfigParser()
config.read(mtgjson4.CONFIG_PATH)
GH_API_USER = config.get("CardHoarder", "gh_api_user")
GH_API_KEY = config.get("CardHoarder", "gh_api_key")
GH_DB_KEY = config.get("CardHoarder", "gh_db_key")
GH_DB_FILE = config.get("CardHoarder", "gh_db_file")
GH_DB_URL = f"https://gist.github.com/{GH_DB_KEY}"
session.headers.update(header_auth)
session = util.retryable_session(session)
SESSION.set(session)
return session
示例23
def __get_session() -> requests.Session:
"""Get or create a requests session for scryfall."""
if mtgjson4.USE_CACHE.get():
requests_cache.install_cache(
str(mtgjson4.PROJECT_CACHE_PATH.joinpath("scryfall_cache")),
expire_after=mtgjson4.SESSION_CACHE_EXPIRE_SCRYFALL,
)
session: Optional[requests.Session] = SESSION.get(None)
if session is None:
session = requests.Session()
if mtgjson4.CONFIG_PATH.is_file():
# Open and read MTGJSON secret properties
config = configparser.RawConfigParser()
config.read(mtgjson4.CONFIG_PATH)
if config.get("Scryfall", "client_secret"):
header_auth = {
"Authorization": "Bearer " + config.get("Scryfall", "client_secret")
}
session.headers.update(header_auth)
LOGGER.info("Fetching from Scryfall with authentication")
else:
LOGGER.warning("Fetching from Scryfall WITHOUT authentication")
else:
LOGGER.warning("Fetching from Scryfall WITHOUT authentication")
session = util.retryable_session(session)
SESSION.set(session)
return session
示例24
def fqdns_from_ct_log(domain, cache=True, verbose=False):
if verbose:
logging.basicConfig(level=logging.INFO, format='%(message)s')
if cache:
homedir = Path(os.path.expanduser('~'))
requests_cache.install_cache(str((homedir / '.habu_requests_cache')), expire_after=3600)
fqdns = set()
if verbose:
print("Downloading subdomain list from https://crt.sh ...", file=sys.stderr)
req = requests.get("https://crt.sh/?q=%.{d}&output=json".format(d=domain))
if req.status_code != 200:
print("[X] Information not available!")
return False
json_data = json.loads(req.text)
for data in json_data:
name = data['name_value'].lower()
if '*' not in name:
fqdns.add(name)
return fqdns
示例25
def shodan_get_result(ip, api_key=None, cache=True, verbose=False):
if verbose:
logging.basicConfig(level=logging.INFO, format='%(message)s')
if cache:
homedir = Path(os.path.expanduser('~'))
requests_cache.install_cache(str(homedir / '.habu_requests_cache'), expire_after=3600)
if not api_key:
api_key = config['SHODAN_APIKEY']
url = 'https://api.shodan.io/shodan/host/{}?key={}'.format(ip, api_key)
r = requests.get(url)
if r.status_code not in [200, 404]:
logging.error(str(r))
return {}
if r.status_code == 404:
return {}
data = r.json()
return data
示例26
def get_vhosts(ip, first=1, no_cache=False):
"""Returns a list of webs hosted on IP (checks bing.com)
>>> 'www.bing.com' in vhosts(204.79.197.200)
True
"""
if not no_cache:
homedir = Path(os.path.expanduser('~'))
requests_cache.install_cache(str(homedir / '.habu_requests_cache'), expire_after=3600)
url = "http://www.bing.com/search?q=ip:{ip} &first={first}".format(ip=ip, first=first)
response = requests.get(url)
soup = BeautifulSoup(response.text, "html.parser")
vhosts = set()
for h2 in soup.find_all('h2'):
for link in h2.find_all('a'):
href = link.get('href')
if href.startswith('http://') or href.startswith('https://'):
vhost = href.split('/')[2]
vhosts.add(vhost)
return list(vhosts)
示例27
def scrape(folder=None, min_fx_version=None, max_fx_version=None, channels=None):
"""
Returns data in the format:
{
<channel>: {
<revision>: {
"channel": <channel>,
"version": <major-version>,
"registries": {
"event": [<path>, ...],
"histogram": [<path>, ...],
"scalar": [<path>, ...]
}
},
...
},
...
}
"""
if min_fx_version is None:
min_fx_version = MIN_FIREFOX_VERSION
if folder is None:
folder = tempfile.mkdtemp()
error_cache = load_error_cache(folder)
requests_cache.install_cache('probe_scraper_cache')
results = defaultdict(dict)
if channels is None:
channels = CHANNELS.keys()
for channel in channels:
tags = load_tags(channel)
versions = extract_tag_data(tags, channel, min_fx_version, max_fx_version)
save_error_cache(folder, error_cache)
print("\n" + channel + " - extracted version data:")
for v in versions:
print(" " + str(v))
print("\n" + channel + " - loading files:")
for v in versions:
print(" from: " + str(v))
files = download_files(channel, v['node'], folder, error_cache, v['version'])
results[channel][v['node']] = {
'channel': channel,
'version': v['version'],
'registries': files,
}
save_error_cache(folder, error_cache)
return results
示例28
def __init__(self):
# Configure logging
logging.getLogger("requests").setLevel(logging.WARNING)
self.logger = logging.getLogger('tenma')
# Setup requests caching
requests_cache.install_cache('./media/CACHE/comicvine-cache', expire_after=1800)
requests_cache.core.remove_expired_responses()
# Set basic reusable strings
self.api_key = Settings.get_solo().api_key
self.directory_path = 'files'
# API Strings
self.baseurl = 'https://comicvine.gamespot.com/api/'
self.imageurl = 'https://comicvine.gamespot.com/api/image/'
self.base_params = { 'format': 'json', 'api_key': self.api_key }
self.headers = { 'user-agent': 'tenma' }
# API field strings
self.arc_fields = 'deck,description,id,image,name,site_detail_url'
self.character_fields = 'deck,description,id,image,name,site_detail_url'
self.creator_fields = 'deck,description,id,image,name,site_detail_url'
self.issue_fields = 'api_detail_url,character_credits,cover_date,deck,description,id,image,issue_number,name,person_credits,site_detail_url,story_arc_credits,team_credits,volume'
self.publisher_fields = 'deck,description,id,image,name,site_detail_url'
self.query_issue_fields ='cover_date,id,issue_number,name,volume'
self.query_issue_limit = '100'
self.series_fields = 'api_detail_url,deck,description,id,name,publisher,site_detail_url,start_year'
self.team_fields = 'characters,deck,description,id,image,name,site_detail_url'
# International reprint publishers
# Ordered by # of issues (est.) for quick matching.
self.int_pubs = [
2350, # Panini (21.5k)
2812, # Marvel UK (4.2k)
2094, # Abril (2.1k)
2319, # Planeta DeAgostini (2.1k)
2903, # Ediciones Zinco (0.7k)
1133, # Semic As (0.3k)
2961, # Marvel Italia (0.04k)
]
#==================================================================================================
示例29
def setup(self):
ds_args = dict(id=self.name)
defaults = dict(name='HelMet-kirjastot')
self.data_source, _ = DataSource.objects.get_or_create(
defaults=defaults, **ds_args)
self.tprek_data_source = DataSource.objects.get(id='tprek')
self.ahjo_data_source = DataSource.objects.get(id='ahjo')
system_data_source_defaults = {'user_editable': True}
self.system_data_source = DataSource.objects.get_or_create(id=settings.SYSTEM_DATA_SOURCE_ID,
defaults=system_data_source_defaults)
org_args = dict(origin_id='u4804001010', data_source=self.ahjo_data_source)
defaults = dict(name='Helsingin kaupunginkirjasto')
self.organization, _ = Organization.objects.get_or_create(defaults=defaults, **org_args)
org_args = dict(origin_id='00001', data_source=self.ahjo_data_source)
defaults = dict(name='Helsingin kaupunki')
self.city, _ = Organization.objects.get_or_create(defaults=defaults, **org_args)
# Build a cached list of Places
loc_id_list = [l[1] for l in LOCATIONS.values()]
place_list = Place.objects.filter(
data_source=self.tprek_data_source
).filter(origin_id__in=loc_id_list)
self.tprek_by_id = {p.origin_id: p.id for p in place_list}
# Create "Tapahtuma vain internetissä" location if not present
defaults = dict(data_source=self.system_data_source,
publisher=self.city,
name='Internet',
description='Tapahtuma vain internetissä.',)
self.internet_location, _ = Place.objects.get_or_create(id=INTERNET_LOCATION_ID, defaults=defaults)
try:
yso_data_source = DataSource.objects.get(id='yso')
except DataSource.DoesNotExist:
yso_data_source = None
if yso_data_source:
# Build a cached list of YSO keywords
cat_id_set = set()
for yso_val in YSO_KEYWORD_MAPS.values():
if isinstance(yso_val, tuple):
for t_v in yso_val:
cat_id_set.add('yso:' + t_v)
else:
cat_id_set.add('yso:' + yso_val)
keyword_list = Keyword.objects.filter(data_source=yso_data_source).\
filter(id__in=cat_id_set)
self.yso_by_id = {p.id: p for p in keyword_list}
else:
self.yso_by_id = {}
if self.options['cached']:
requests_cache.install_cache('helmet')
self.cache = requests_cache.get_cache()
else:
self.cache = None
示例30
def main() -> None:
"""Attempt to set up a list of marathon service instances given.
Exits 1 if any service.instance deployment failed.
This is done in the following order:
- Load the marathon configuration
- Connect to marathon
- Do the following for each service.instance:
- Load the service instance's configuration
- Create the complete marathon job configuration
- Deploy/bounce the service
- Emit an event about the deployment to sensu"""
args = parse_args()
soa_dir = args.soa_dir
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.WARNING)
# Setting up transparent cache for http API calls
requests_cache.install_cache("setup_marathon_jobs", backend="memory")
system_paasta_config = load_system_paasta_config()
clients = marathon_tools.get_marathon_clients(
marathon_tools.get_marathon_servers(system_paasta_config)
)
unique_clients = clients.get_all_clients()
marathon_apps_with_clients = marathon_tools.get_marathon_apps_with_clients(
unique_clients, embed_tasks=True
)
num_failed_deployments = 0
for service_instance in args.service_instance_list:
try:
service, instance, _, __ = decompose_job_id(service_instance)
except InvalidJobNameError:
log.error(
f"Invalid service instance specified ({service_instance}). Format is service{SPACER}instance."
)
num_failed_deployments = num_failed_deployments + 1
else:
if deploy_marathon_service(
service, instance, clients, soa_dir, marathon_apps_with_clients
)[0]:
num_failed_deployments = num_failed_deployments + 1
requests_cache.uninstall_cache()
log.debug(
"%d out of %d service.instances failed to deploy."
% (num_failed_deployments, len(args.service_instance_list))
)
sys.exit(1 if num_failed_deployments else 0)