spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Boris Litvak <>
Subject RE: Small file problem
Date Thu, 17 Jun 2021 07:16:23 GMT
Compact them and remove the small files.
One messy way of doing this, (including some cleanup) looks like the following, based on rdd.mapPartitions()
on the file urls rdd:

import gzip
import json
import logging
import math
import re
from typing import List

import boto3
from mypy_boto3_s3.client import S3Client
from pyspark.sql import SparkSession

import configuration
import data_types.time_series
from data_types.shared import series
from s3_batcher import get_s3_objects_batches
from functools import partial

logger = logging.getLogger(__name__)

session = boto3.session.Session()
s3 = session.resource('s3')

def merge_json_files(file_names, config):
    total = []
    exceptions = []
    for name in file_names:
            logger.debug(f'Loading {name} ...')
            obj = s3.Object(config.get_non_concatenated_bucket_name(), name)
            body = obj.get()['Body'].read()
            if name.endswith('.gz'):
                body = gzip.decompress(body)

            company_id ='company_id=(\S+)/', name).group(1)
            clazz = config.get_schema_class()
            loaded = json.loads(body)
            obj: series.SchemaFixer = clazz(company_id=company_id, **loaded)
            jason = obj.fix_value()

        except Exception as ex:
            logger.error(f'{name}: {ex}')

    if exceptions:
        logger.warning(f'Exceptions: {exceptions}')
    return iter(total)

def get_json_files_list(s3client: S3Client, config: configuration.Config) -> List[str]:
        returns [{'Key': '$s3prefix'}, ]
    """'Loading file list')
    files = []
    # aws s3 ls --summarize --human-readable --recursive \
    #   s3://ingest-measurements-20200610130929653000000001/TIME_SERIES/ --profile hierarchy_playground
> list.txt
    for batch in get_s3_objects_batches(s3client,
        files_batch = [b['Key'] for b in batch if '=' in b['Key']]
        # TIME_SERIES/company_id=00224d27-b66f-4b62-bae2-f1399f530d94/60514332-0bc0-4bff-8263-eb6b090b9210.json.gz
        files.extend(files_batch)'Finished listing files')
    return files

def run(spark: SparkSession, config: configuration.Config):
    files = get_json_files_list(boto3.client('s3'), config)
    files_num = len(files)'Loaded file list with {files_num} files')

    #'Traversing {files}')

    spark.sparkContext.setJobDescription('Parallelize filenames and read/merge files')

    rdd = spark.sparkContext.parallelize(files, math.ceil(files_num / config.small_files_in_partition))'Got an rdd with {rdd.getNumPartitions()} partitions')
    func = partial(merge_json_files, config=config)
    loaded_rdd = rdd.mapPartitions(func)

    # destination = r'c:\tmp\jsonresult'
    # shutil.rmtree(destination)
    # print(loaded_rdd.take(2))
    # note: these are not sorted by time, will be hard to etl/read
    # result.write.json(config.get_concatenated_path())

    # df = load_json_df(spark, config.source_path, config.source_partition_keys, config.input_schema)
    #'Schema is {df.schema.json()}')
    #'Read {"parquet").load(config.get_parquet_dir()).count()}
rows from parquet')


From: Sachit Murarka <>
Sent: Wednesday, 16 June 2021 21:25
To: spark users <>
Subject: Small file problem

Hello Spark Users,

We are receiving too much small small files. About 3 million. Reading it using
itself taking long time and job is not proceeding further.

Is there any way to fasten this and proceed?

Sachit Murarka
View raw message