Skip to content

Commit e410a59

Browse files
prabhath004prabhath004
andauthored
feat: add max_file_size support to AzureBlob source (#1259)
* feat: add max_file_size support to AzureBlob source Add optional max_file_size parameter to filter files by size in both list() and get_value() APIs. Files exceeding the limit are treated as non-existent. Closes #1251 * fix: correct type comparison for blob content_length Azure blob content_length is u64, not Option<u64>, so compare directly with max_size cast to u64 instead of unwrapping Option. --------- Co-authored-by: prabhath004 <ppalakur@gmu.edu>
1 parent 100ea78 commit e410a59

File tree

3 files changed

+33
-2
lines changed

3 files changed

+33
-2
lines changed

docs/docs/sources/azureblob.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,15 +63,19 @@ The spec takes the following fields:
6363
* `excluded_patterns` (`list[str]`, optional): a list of glob patterns to exclude files, e.g. `["*.tmp", "**/*.log"]`.
6464
Any file or directory matching these patterns will be excluded even if they match `included_patterns`.
6565
If not specified, no files will be excluded.
66-
* `sas_token` (`cocoindex.TransientAuthEntryReference[str]`, optional): a SAS token for authentication.
67-
* `account_access_key` (`cocoindex.TransientAuthEntryReference[str]`, optional): an account access key for authentication.
6866

6967
:::info
7068

7169
`included_patterns` and `excluded_patterns` are using Unix-style glob syntax. See [globset syntax](https://docs.rs/globset/latest/globset/index.html#syntax) for the details.
7270

7371
:::
7472

73+
* `max_file_size` (`int`, optional): if provided, files exceeding this size in bytes will be treated as non-existent and skipped during processing.
74+
This is useful to avoid processing large files that are not relevant to your use case, such as videos or backups.
75+
If not specified, no size limit is applied.
76+
* `sas_token` (`cocoindex.TransientAuthEntryReference[str]`, optional): a SAS token for authentication.
77+
* `account_access_key` (`cocoindex.TransientAuthEntryReference[str]`, optional): an account access key for authentication.
78+
7579
### Schema
7680

7781
The output is a [*KTable*](/docs/core/data_types#ktable) with the following sub fields:

python/cocoindex/sources/_engine_builtin_specs.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ class AzureBlob(op.SourceSpec):
9090
binary: bool = False
9191
included_patterns: list[str] | None = None
9292
excluded_patterns: list[str] | None = None
93+
max_file_size: int | None = None
9394

9495
sas_token: TransientAuthEntryReference[str] | None = None
9596
account_access_key: TransientAuthEntryReference[str] | None = None

src/ops/sources/azure_blob.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ pub struct Spec {
1919
binary: bool,
2020
included_patterns: Option<Vec<String>>,
2121
excluded_patterns: Option<Vec<String>>,
22+
max_file_size: Option<i64>,
2223

2324
/// SAS token for authentication. Takes precedence over account_access_key.
2425
sas_token: Option<AuthEntryReference<String>>,
@@ -32,6 +33,7 @@ struct Executor {
3233
prefix: Option<String>,
3334
binary: bool,
3435
pattern_matcher: PatternMatcher,
36+
max_file_size: Option<i64>,
3537
}
3638

3739
fn datetime_to_ordinal(dt: &time::OffsetDateTime) -> Ordinal {
@@ -73,6 +75,13 @@ impl SourceExecutor for Executor {
7375
// Only include files (not directories)
7476
if key.ends_with('/') { continue; }
7577

78+
// Check file size limit
79+
if let Some(max_size) = self.max_file_size {
80+
if blob.properties.content_length > max_size as u64 {
81+
continue;
82+
}
83+
}
84+
7685
if self.pattern_matcher.is_file_included(key) {
7786
let ordinal = Some(datetime_to_ordinal(&blob.properties.last_modified));
7887
batch.push(PartialSourceRow {
@@ -115,6 +124,22 @@ impl SourceExecutor for Executor {
115124
});
116125
}
117126

127+
// Check file size limit
128+
if let Some(max_size) = self.max_file_size {
129+
let blob_client = self
130+
.client
131+
.container_client(&self.container_name)
132+
.blob_client(key_str.as_ref());
133+
let properties = blob_client.get_properties().await?;
134+
if properties.blob.properties.content_length > max_size as u64 {
135+
return Ok(PartialSourceRowData {
136+
value: Some(SourceValue::NonExistence),
137+
ordinal: Some(Ordinal::unavailable()),
138+
content_version_fp: None,
139+
});
140+
}
141+
}
142+
118143
let blob_client = self
119144
.client
120145
.container_client(&self.container_name)
@@ -238,6 +263,7 @@ impl SourceFactoryBase for Factory {
238263
prefix: spec.prefix,
239264
binary: spec.binary,
240265
pattern_matcher: PatternMatcher::new(spec.included_patterns, spec.excluded_patterns)?,
266+
max_file_size: spec.max_file_size,
241267
}))
242268
}
243269
}

0 commit comments

Comments
 (0)