Python源码示例:requests.install_cache()

示例1
def import_places(self):
        if self.options['cached']:
            requests_cache.install_cache('tprek')

        queryset = Place.objects.filter(data_source=self.data_source)
        if self.options.get('single', None):
            obj_id = self.options['single']
            obj_list = [self.pk_get('unit', obj_id)]
            queryset = queryset.filter(id=obj_id)
        else:
            logger.info("Loading units...")
            obj_list = self.pk_get('unit')
            logger.info("%s units loaded" % len(obj_list))
        syncher = ModelSyncher(queryset, lambda obj: obj.origin_id, delete_func=self.mark_deleted,
                               check_deleted_func=self.check_deleted)
        for idx, info in enumerate(obj_list):
            if idx and (idx % 1000) == 0:
                logger.info("%s units processed" % idx)
            self._import_unit(syncher, info)

        syncher.finish(self.options.get('remap', False)) 
示例2
def setup(self):
        self.tprek_data_source = DataSource.objects.get(id='tprek')

        ds_args = dict(id=self.name)
        ds_defaults = dict(name='City of Espoo')
        self.data_source, _ = DataSource.objects.get_or_create(defaults=ds_defaults, **ds_args)

        org_args = dict(origin_id='kaupunki', data_source=self.data_source)
        org_defaults = dict(name='Espoon kaupunki')
        self.organization, _ = Organization.objects.get_or_create(defaults=org_defaults, **org_args)
        self._build_cache_places()
        self._cache_yso_keywords()

        if self.options['cached']:
            requests_cache.install_cache('espoo')
            self.cache = requests_cache.get_cache()
        else:
            self.cache = None 
示例3
def setup_requests_cachedir():
    """Sets up local caching for faster remote HTTP requests.

    Caching directory will be set up in the user's home directory under
    a .nfcore_cache subdir.
    """
    # Only import it if we need it
    import requests_cache

    pyversion = '.'.join(str(v) for v in sys.version_info[0:3])
    cachedir = os.path.join(os.getenv("HOME"), os.path.join('.nfcore', 'cache_'+pyversion))
    if not os.path.exists(cachedir):
        os.makedirs(cachedir)
    requests_cache.install_cache(
        os.path.join(cachedir, 'github_info'),
        expire_after=datetime.timedelta(hours=1),
        backend='sqlite',
    ) 
示例4
def init_requests_cache(refresh_cache=False):
    """
    Initializes a cache which the ``requests`` library will consult for
    responses, before making network requests.

    :param refresh_cache: Whether the cache should be cleared out
    """
    # Cache data from external sources; used in some checks
    dirs = AppDirs("stix2-validator", "OASIS")
    # Create cache dir if doesn't exist
    try:
        os.makedirs(dirs.user_cache_dir)
    except OSError as e:
        if e.errno != errno.EEXIST:
            raise
    requests_cache.install_cache(
        cache_name=os.path.join(dirs.user_cache_dir, 'py{}cache'.format(
            sys.version_info[0])),
        expire_after=datetime.timedelta(weeks=1))

    if refresh_cache:
        clear_requests_cache() 
示例5
def shodan_query(query, api_key, cache=True, verbose=False):

    if verbose:
        logging.basicConfig(level=logging.INFO, format='%(message)s')

    if cache:
        homedir = Path(os.path.expanduser('~'))
        requests_cache.install_cache(str(homedir / '.habu_requests_cache'), expire_after=3600)

    url = 'https://api.shodan.io/shodan/host/search?key={}&query={}'.format(api_key, query)

    r = requests.get(url)

    if r.status_code not in [200, 404]:
        logging.error(str(r))
        return {}

    if r.status_code == 404:
        return {}

    data = r.json()

    return data 
示例6
def main():
    if os.path.isfile(BASEDIR):
        sys.exit('Please remove your old configuration file at {}'.format(BASEDIR))
    os.makedirs(BASEDIR, exist_ok=True)

    global CONFIG
    CONFIG = read_configuration(CONFFILE)
    locale.setlocale(locale.LC_MONETARY, CONFIG['locale'].get('monetary', ''))

    requests_cache.install_cache(cache_name='api_cache', backend='memory',
        expire_after=int(CONFIG['api'].get('cache', 10)))

    curses.wrapper(mainc) 
示例7
def setup(self):
        defaults = dict(name='Matkailu- ja kongressitoimisto')
        self.data_source, _ = DataSource.objects.get_or_create(id=self.name, defaults=defaults)
        self.tprek_data_source = DataSource.objects.get(id='tprek')

        ytj_ds, _ = DataSource.objects.get_or_create(defaults={'name': 'YTJ'}, id='ytj')

        org_args = dict(origin_id='0586977-6', data_source=ytj_ds)
        defaults = dict(name='Helsingin Markkinointi Oy')

        self.organization, _ = Organization.objects.get_or_create(
            defaults=defaults, **org_args)

        place_list = Place.objects.filter(data_source=self.tprek_data_source, deleted=False)
        deleted_place_list = Place.objects.filter(data_source=self.tprek_data_source,
                                                  deleted=True)
        # Get only places that have unique names
        place_list = place_list.annotate(count=Count('name_fi')).filter(count=1).values('id', 'origin_id', 'name_fi')
        deleted_place_list = deleted_place_list.annotate(count=Count('name_fi')).\
            filter(count=1).values('id', 'origin_id', 'name_fi', 'replaced_by_id')
        self.tprek_by_name = {p['name_fi'].lower(): (p['id'], p['origin_id']) for p in place_list}
        self.deleted_tprek_by_name = {
            p['name_fi'].lower(): (p['id'], p['origin_id'], p['replaced_by_id'])
            for p in deleted_place_list}

        if self.options['cached']:
            requests_cache.install_cache('matko') 
示例8
def __init__(self, username, password, verify=True, debug=False):
        assert username is not None
        assert password is not None
        assert verify is not None
        assert debug is not None
        self._username = username
        self._password = password
        self._verify = verify
        self._debug = debug
        if self._request_caching_enabled:
            self.request_count = self.get_cached_request_count()
        if not self._verify:
            requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
        if self._debug:
            self.enable_request_debugging()
        else:
            self.enable_error_logging()
        # cache requests for 24 hours
        if self._request_caching_enabled:
            requests_cache.install_cache('capiq_cache', backend='sqlite', expire_after=86400, allowable_methods=('POST',))

    # This function retrieves a single data point for a point in time value for a mnemonic either current or
    # historical. Default inputs include a Mnemonic and a Security/Entity Identifier
    #
    # Returns a nested dictionary, where the primary key is the identifier and the secondary key is the mnemonic.
    # In case of an error, a None value is returned for that mnemonic and Cap IQ's error is logged 
示例9
def enable_cache():
    """Enable requests library cache."""
    try:
        import requests_cache
    except ImportError as err:
        sys.stderr.write("Failed to enable cache: {0}\n".format(str(err)))
        return
    if not os.path.exists(CACHE_DIR):
        os.makedirs(CACHE_DIR)
    requests_cache.install_cache(CACHE_FILE) 
示例10
def setup_paasta_api():
    if os.environ.get("PAASTA_API_DEBUG"):
        logging.basicConfig(level=logging.DEBUG)
    else:
        logging.basicConfig(level=logging.WARNING)

    # pyinotify is a better solution than turning off file caching completely
    service_configuration_lib.disable_yaml_cache()

    settings.system_paasta_config = load_system_paasta_config()
    if os.environ.get("PAASTA_API_CLUSTER"):
        settings.cluster = os.environ.get("PAASTA_API_CLUSTER")
    else:
        settings.cluster = settings.system_paasta_config.get_cluster()

    settings.marathon_clients = marathon_tools.get_marathon_clients(
        marathon_tools.get_marathon_servers(settings.system_paasta_config)
    )

    settings.marathon_servers = marathon_tools.get_marathon_servers(
        system_paasta_config=settings.system_paasta_config
    )
    settings.marathon_clients = marathon_tools.get_marathon_clients(
        marathon_servers=settings.marathon_servers, cached=False
    )

    try:
        settings.kubernetes_client = kubernetes_tools.KubeClient()
    except FileNotFoundError:
        log.info("Kubernetes not found")
        settings.kubernetes_client = None
    except Exception:
        log.exception("Error while initializing KubeClient")
        settings.kubernetes_client = None

    # Set up transparent cache for http API calls. With expire_after, responses
    # are removed only when the same request is made. Expired storage is not a
    # concern here. Thus remove_expired_responses is not needed.
    requests_cache.install_cache("paasta-api", backend="memory", expire_after=5) 
示例11
def use_requests_cache(
    cache_name: str, backend: str = "memory", **kwargs: Any
) -> Callable[[_UseRequestsCacheFuncT], _UseRequestsCacheFuncT]:
    def wrap(fun: _UseRequestsCacheFuncT) -> _UseRequestsCacheFuncT:
        def fun_with_cache(*args: Any, **kwargs: Any) -> Any:
            requests_cache.install_cache(cache_name, backend=backend, **kwargs)
            result = fun(*args, **kwargs)
            requests_cache.uninstall_cache()
            return result

        return cast(_UseRequestsCacheFuncT, fun_with_cache)

    return wrap 
示例12
def fetch(outfile):
    """The main function for downloading all scripts from github."""
    if not os.path.exists(REQUESTS_CACHE):
        os.makedirs(REQUESTS_CACHE)

    requests_cache.install_cache(REQUESTS_CACHE)

    result = []

    label_counts = defaultdict(int)

    print('Fetching scripts')
    for label, url in DATA_URLS.items():
        print(url)
        scripts = fetch_scripts(url)
        for script in scripts:
            try:
                result.append({
                    'tree': build_tree(script), 'metadata': {'label': label}
                })
                label_counts[label] += 1
            except Exception as err:
                print(err)

    print('Label counts: ', label_counts)

    print('Dumping scripts')
    with open(outfile, 'wb') as file_handler:
        pickle.dump(result, file_handler) 
示例13
def set_cache(refresh=False):
    """ install the static Requests cache """
    if refresh:
        expire_after = datetime.timedelta(seconds=0)
    else:
        expire_after = datetime.timedelta(days=30)
    requests_cache.install_cache(
            cache_name=os.path.join(os.path.dirname(__file__), "cache"),
            allowable_methods=('GET', 'POST'), expire_after=expire_after)
    requests_cache.core.remove_expired_responses() 
示例14
def set_cache(refresh=False):
    """ install the static Requests cache """
    if refresh:
        expire_after = datetime.timedelta(seconds=0)
    else:
        expire_after = datetime.timedelta(days=30)
    requests_cache.install_cache(
            cache_name=os.path.join(os.path.dirname(__file__), "cache"),
            allowable_methods=('GET', 'POST'), expire_after=expire_after)
    requests_cache.core.remove_expired_responses() 
示例15
def set_cache(refresh=False):
    """ install the static Requests cache """
    if refresh:
        expire_after = datetime.timedelta(seconds=0)
    else:
        expire_after = datetime.timedelta(days=30)
    requests_cache.install_cache(
            cache_name=os.path.join(os.path.dirname(__file__), "cache"),
            allowable_methods=('GET', 'POST'), expire_after=expire_after)
    requests_cache.core.remove_expired_responses() 
示例16
def pytest_configure(config):
    if config.getoption('--use-cache'):
        import requests_cache
        requests_cache.install_cache('test_cache')
    api = Api()
    pytest.game_ids = api.GetSeasonGameIDs('2009-10', 'Regular Season')[:2]  # Hack to carry the gameids to tests
    pytest.game_ids = ['0020900292'] 
示例17
def use_requests_cache():
    import requests_cache
    requests_cache.install_cache('test_cache') 
示例18
def pytest_runtest_setup(item):
        # called for running each test in 'a' directory
    import requests_cache
    requests_cache.install_cache('test_cache') 
示例19
def __init__(self, cache=False,
                 cache_filename="requests.cache"):
        self._cache = cache
        if cache:
            requests_cache.install_cache(cache_filename)
        self._transform_json = True 
示例20
def cache_requests():
    testdir = os.path.dirname(os.path.abspath(__file__))
    location = os.path.join(testdir, 'data/requests_cache')
    requests_cache.install_cache(cache_name=location)
    yield
    requests_cache.uninstall_cache() 
示例21
def get_generic_session() -> requests.Session:
    """Get or create a requests session for gatherer."""
    if mtgjson4.USE_CACHE.get():
        requests_cache.install_cache(
            str(mtgjson4.PROJECT_CACHE_PATH.joinpath("general_cache")),
            expire_after=mtgjson4.SESSION_CACHE_EXPIRE_GENERAL,
        )

    session: Optional[requests.Session] = SESSION.get(None)
    if not session:
        session = requests.Session()
        session = retryable_session(session)
        SESSION.set(session)

    return session 
示例22
def __get_session() -> requests.Session:
    """Get or create a requests session for TCGPlayer."""
    global GH_DB_URL, GH_DB_KEY, GH_API_KEY, GH_API_USER, GH_DB_FILE

    if mtgjson4.USE_CACHE.get(False):
        requests_cache.install_cache(
            str(mtgjson4.PROJECT_CACHE_PATH.joinpath("tcgplayer_cache")),
            expire_after=mtgjson4.SESSION_CACHE_EXPIRE_TCG,
        )

    session: Optional[requests.Session] = SESSION.get(None)
    if session is None:
        session = requests.Session()
        header_auth = {"Authorization": "Bearer " + _request_tcgplayer_bearer()}

        # Open and read MTGJSON secret properties
        config = configparser.RawConfigParser()
        config.read(mtgjson4.CONFIG_PATH)
        GH_API_USER = config.get("CardHoarder", "gh_api_user")
        GH_API_KEY = config.get("CardHoarder", "gh_api_key")
        GH_DB_KEY = config.get("CardHoarder", "gh_db_key")
        GH_DB_FILE = config.get("CardHoarder", "gh_db_file")
        GH_DB_URL = f"https://gist.github.com/{GH_DB_KEY}"

        session.headers.update(header_auth)
        session = util.retryable_session(session)
        SESSION.set(session)
    return session 
示例23
def __get_session() -> requests.Session:
    """Get or create a requests session for scryfall."""
    if mtgjson4.USE_CACHE.get():
        requests_cache.install_cache(
            str(mtgjson4.PROJECT_CACHE_PATH.joinpath("scryfall_cache")),
            expire_after=mtgjson4.SESSION_CACHE_EXPIRE_SCRYFALL,
        )

    session: Optional[requests.Session] = SESSION.get(None)
    if session is None:
        session = requests.Session()

        if mtgjson4.CONFIG_PATH.is_file():
            # Open and read MTGJSON secret properties
            config = configparser.RawConfigParser()
            config.read(mtgjson4.CONFIG_PATH)

            if config.get("Scryfall", "client_secret"):
                header_auth = {
                    "Authorization": "Bearer " + config.get("Scryfall", "client_secret")
                }
                session.headers.update(header_auth)
                LOGGER.info("Fetching from Scryfall with authentication")
            else:
                LOGGER.warning("Fetching from Scryfall WITHOUT authentication")
        else:
            LOGGER.warning("Fetching from Scryfall WITHOUT authentication")

        session = util.retryable_session(session)
        SESSION.set(session)
    return session 
示例24
def fqdns_from_ct_log(domain, cache=True, verbose=False):

    if verbose:
        logging.basicConfig(level=logging.INFO, format='%(message)s')

    if cache:
        homedir = Path(os.path.expanduser('~'))
        requests_cache.install_cache(str((homedir / '.habu_requests_cache')), expire_after=3600)

    fqdns = set()

    if verbose:
        print("Downloading subdomain list from https://crt.sh ...", file=sys.stderr)

    req = requests.get("https://crt.sh/?q=%.{d}&output=json".format(d=domain))

    if req.status_code != 200:
        print("[X] Information not available!")
        return False

    json_data = json.loads(req.text)

    for data in json_data:
        name = data['name_value'].lower()
        if '*' not in name:
            fqdns.add(name)

    return fqdns 
示例25
def shodan_get_result(ip, api_key=None, cache=True, verbose=False):

    if verbose:
        logging.basicConfig(level=logging.INFO, format='%(message)s')

    if cache:
        homedir = Path(os.path.expanduser('~'))
        requests_cache.install_cache(str(homedir / '.habu_requests_cache'), expire_after=3600)

    if not api_key:
        api_key = config['SHODAN_APIKEY']

    url = 'https://api.shodan.io/shodan/host/{}?key={}'.format(ip, api_key)

    r = requests.get(url)

    if r.status_code not in [200, 404]:
        logging.error(str(r))
        return {}

    if r.status_code == 404:
        return {}

    data = r.json()

    return data 
示例26
def get_vhosts(ip, first=1, no_cache=False):
    """Returns a list of webs hosted on IP (checks bing.com)
    >>> 'www.bing.com' in vhosts(204.79.197.200)
    True
    """

    if not no_cache:
        homedir = Path(os.path.expanduser('~'))
        requests_cache.install_cache(str(homedir / '.habu_requests_cache'), expire_after=3600)

    url = "http://www.bing.com/search?q=ip:{ip} &first={first}".format(ip=ip, first=first)

    response = requests.get(url)

    soup = BeautifulSoup(response.text, "html.parser")

    vhosts = set()

    for h2 in soup.find_all('h2'):
        for link in h2.find_all('a'):
            href = link.get('href')

            if href.startswith('http://') or href.startswith('https://'):
                vhost = href.split('/')[2]
                vhosts.add(vhost)

    return list(vhosts) 
示例27
def scrape(folder=None, min_fx_version=None, max_fx_version=None, channels=None):
    """
    Returns data in the format:
    {
      <channel>: {
        <revision>: {
          "channel": <channel>,
          "version": <major-version>,
          "registries": {
            "event": [<path>, ...],
            "histogram": [<path>, ...],
            "scalar": [<path>, ...]
          }
        },
        ...
      },
      ...
    }
    """
    if min_fx_version is None:
        min_fx_version = MIN_FIREFOX_VERSION
    if folder is None:
        folder = tempfile.mkdtemp()

    error_cache = load_error_cache(folder)
    requests_cache.install_cache('probe_scraper_cache')
    results = defaultdict(dict)

    if channels is None:
        channels = CHANNELS.keys()

    for channel in channels:
        tags = load_tags(channel)
        versions = extract_tag_data(tags, channel, min_fx_version, max_fx_version)
        save_error_cache(folder, error_cache)

        print("\n" + channel + " - extracted version data:")
        for v in versions:
            print("  " + str(v))

        print("\n" + channel + " - loading files:")
        for v in versions:
            print("  from: " + str(v))
            files = download_files(channel, v['node'], folder, error_cache, v['version'])
            results[channel][v['node']] = {
                'channel': channel,
                'version': v['version'],
                'registries': files,
            }
            save_error_cache(folder, error_cache)

    return results 
示例28
def __init__(self):
		# Configure logging
		logging.getLogger("requests").setLevel(logging.WARNING)
		self.logger = logging.getLogger('tenma')

		# Setup requests caching
		requests_cache.install_cache('./media/CACHE/comicvine-cache', expire_after=1800)
		requests_cache.core.remove_expired_responses()

		# Set basic reusable strings
		self.api_key = Settings.get_solo().api_key
		self.directory_path = 'files'

		# API Strings
		self.baseurl = 'https://comicvine.gamespot.com/api/'
		self.imageurl = 'https://comicvine.gamespot.com/api/image/'
		self.base_params = { 'format': 'json', 'api_key': self.api_key }
		self.headers = { 'user-agent': 'tenma' }

		# API field strings
		self.arc_fields = 'deck,description,id,image,name,site_detail_url'
		self.character_fields = 'deck,description,id,image,name,site_detail_url'
		self.creator_fields = 'deck,description,id,image,name,site_detail_url'
		self.issue_fields = 'api_detail_url,character_credits,cover_date,deck,description,id,image,issue_number,name,person_credits,site_detail_url,story_arc_credits,team_credits,volume'
		self.publisher_fields = 'deck,description,id,image,name,site_detail_url'
		self.query_issue_fields ='cover_date,id,issue_number,name,volume'
		self.query_issue_limit = '100'
		self.series_fields = 'api_detail_url,deck,description,id,name,publisher,site_detail_url,start_year'
		self.team_fields = 'characters,deck,description,id,image,name,site_detail_url'

		# International reprint publishers
		# Ordered by # of issues (est.) for quick matching.
		self.int_pubs = [
			2350,	# Panini (21.5k)
			2812,	# Marvel UK (4.2k)
			2094,	# Abril (2.1k)
			2319,	# Planeta DeAgostini (2.1k)
			2903,	# Ediciones Zinco (0.7k)
			1133,	# Semic As (0.3k)
			2961,	# Marvel Italia (0.04k)
		]

	#================================================================================================== 
示例29
def setup(self):
        ds_args = dict(id=self.name)
        defaults = dict(name='HelMet-kirjastot')
        self.data_source, _ = DataSource.objects.get_or_create(
            defaults=defaults, **ds_args)
        self.tprek_data_source = DataSource.objects.get(id='tprek')
        self.ahjo_data_source = DataSource.objects.get(id='ahjo')
        system_data_source_defaults = {'user_editable': True}
        self.system_data_source = DataSource.objects.get_or_create(id=settings.SYSTEM_DATA_SOURCE_ID,
                                                                   defaults=system_data_source_defaults)

        org_args = dict(origin_id='u4804001010', data_source=self.ahjo_data_source)
        defaults = dict(name='Helsingin kaupunginkirjasto')
        self.organization, _ = Organization.objects.get_or_create(defaults=defaults, **org_args)
        org_args = dict(origin_id='00001', data_source=self.ahjo_data_source)
        defaults = dict(name='Helsingin kaupunki')
        self.city, _ = Organization.objects.get_or_create(defaults=defaults, **org_args)

        # Build a cached list of Places
        loc_id_list = [l[1] for l in LOCATIONS.values()]
        place_list = Place.objects.filter(
            data_source=self.tprek_data_source
        ).filter(origin_id__in=loc_id_list)
        self.tprek_by_id = {p.origin_id: p.id for p in place_list}

        # Create "Tapahtuma vain internetissä" location if not present
        defaults = dict(data_source=self.system_data_source,
                        publisher=self.city,
                        name='Internet',
                        description='Tapahtuma vain internetissä.',)
        self.internet_location, _ = Place.objects.get_or_create(id=INTERNET_LOCATION_ID, defaults=defaults)

        try:
            yso_data_source = DataSource.objects.get(id='yso')
        except DataSource.DoesNotExist:
            yso_data_source = None

        if yso_data_source:
            # Build a cached list of YSO keywords
            cat_id_set = set()
            for yso_val in YSO_KEYWORD_MAPS.values():
                if isinstance(yso_val, tuple):
                    for t_v in yso_val:
                        cat_id_set.add('yso:' + t_v)
                else:
                    cat_id_set.add('yso:' + yso_val)

            keyword_list = Keyword.objects.filter(data_source=yso_data_source).\
                filter(id__in=cat_id_set)
            self.yso_by_id = {p.id: p for p in keyword_list}
        else:
            self.yso_by_id = {}

        if self.options['cached']:
            requests_cache.install_cache('helmet')
            self.cache = requests_cache.get_cache()
        else:
            self.cache = None 
示例30
def main() -> None:
    """Attempt to set up a list of marathon service instances given.
    Exits 1 if any service.instance deployment failed.
    This is done in the following order:

    - Load the marathon configuration
    - Connect to marathon
    - Do the following for each service.instance:
        - Load the service instance's configuration
        - Create the complete marathon job configuration
        - Deploy/bounce the service
        - Emit an event about the deployment to sensu"""

    args = parse_args()
    soa_dir = args.soa_dir
    if args.verbose:
        logging.basicConfig(level=logging.DEBUG)
    else:
        logging.basicConfig(level=logging.WARNING)

    # Setting up transparent cache for http API calls
    requests_cache.install_cache("setup_marathon_jobs", backend="memory")

    system_paasta_config = load_system_paasta_config()
    clients = marathon_tools.get_marathon_clients(
        marathon_tools.get_marathon_servers(system_paasta_config)
    )
    unique_clients = clients.get_all_clients()
    marathon_apps_with_clients = marathon_tools.get_marathon_apps_with_clients(
        unique_clients, embed_tasks=True
    )

    num_failed_deployments = 0
    for service_instance in args.service_instance_list:
        try:
            service, instance, _, __ = decompose_job_id(service_instance)
        except InvalidJobNameError:
            log.error(
                f"Invalid service instance specified ({service_instance}). Format is service{SPACER}instance."
            )
            num_failed_deployments = num_failed_deployments + 1
        else:
            if deploy_marathon_service(
                service, instance, clients, soa_dir, marathon_apps_with_clients
            )[0]:
                num_failed_deployments = num_failed_deployments + 1

    requests_cache.uninstall_cache()

    log.debug(
        "%d out of %d service.instances failed to deploy."
        % (num_failed_deployments, len(args.service_instance_list))
    )

    sys.exit(1 if num_failed_deployments else 0)