Skip to content
Snippets Groups Projects
Select Git revision
  • 3538a165e1e275e482bc797915a827ff70e663f8
  • master default protected
  • fix-content-encoding
  • debian/unstable-swh
  • pristine-tar
  • debian/upstream
  • debian/buster-swh
  • mypy-version
  • debian/stretch-swh
  • debian/1.3.3-1_swh1
  • debian/upstream/1.3.3
  • v1.3.3
  • debian/1.3.2-1_swh1
  • debian/upstream/1.3.2
  • v1.3.2
  • debian/1.3.1-1_swh1
  • debian/upstream/1.3.1
  • v1.3.1
  • debian/1.3.0-1_swh1
  • debian/upstream/1.3.0
  • v1.3.0
  • debian/1.2.2-1_swh1_bpo10+1
  • debian/1.2.2-1_swh1
  • debian/upstream/1.2.2
  • v1.2.2
  • debian/1.2.1-1_swh1_bpo10+1
  • debian/1.2.1-1_swh1
  • debian/upstream/1.2.1
  • v1.2.1
29 results

requirements-swh.txt

Blame
  • Forked from Platform / Development / swh-deposit
    Source project has a limited visibility.
    tasks.py NaN GiB
    # Copyright (C) 2015-2017  The Software Heritage developers
    # See the AUTHORS file at the top-level directory of this distribution
    # License: GNU General Public License version 3, or any later version
    # See top-level LICENSE file for more information
    
    from swh.scheduler.task import Task
    
    from swh.loader.tar.loader import TarLoader
    
    
    def fetch_archive_locally(archive_url):
        pass
    
    
    class LoadDepositArchive(Task):
        task_queue = 'swh_deposit_archive'
    
        def run_task(self, *, deposit_archive_url, origin, visit_date,
                     revision):
            """Import a deposit tarball into swh.
    
            Args: see :func:`TarLoader.load`.
    
            """
            loader = TarLoader()
            loader.log = self.log
    
            # 1. Retrieve tarball from deposit's private api
            # 2. Store locally in a temporary directory
            # 3. Trigger the ingestion
            # 4. clean up the temporary directory
            # 5. Update the deposit's status according to result using the
            #    deposit's private update status api
    
            tar_path = 'foobar'
    
            import os
            occurrence = os.path.basename(tar_path)
    
            self.log.info('%s %s %s %s %s' % (deposit_archive_url, origin,
                                              visit_date, revision,
                                              [occurrence]))
            # loader.load(tar_path=tar_path, origin=origin, visit_date=visit_date,
            #             revision=revision, occurrences=[occurrence])