Client

Client represents a higher level interface to datasets API.

class abeja.datalake.Client(organization_id: str | None = None, credential: Dict[str, str] | None = None, timeout: int | None = None, max_retry_count: int | None = None)

A high-level client for Datalake API

from abeja.datalake import Client

client = Client()
property channels: Channels

Get channel objects

Request syntax:
channels = client.channels
Returns:

Channels object

get_channel(channel_id) Channel

Get channel for specific channel_id

Request syntax:
channel = client.get_channel(channel_id='1111111111111')
Params:
  • channel_id (str): channel id

Return type:

Channel object

Channel

class abeja.datalake.channel.Channel(api: APIClient, organization_id: str, channel_id: str, name: str | None = None, description: str | None = None, display_name: str | None = None, storage_type: str | None = None, created_at: str | None = None, updated_at: str | None = None, archived: bool = False)

a model class for a channel

Properties:
  • organization_id (str)

  • channel_id (str)

  • name (str)

  • display_name (str)

  • description (str)

  • archived (bool)

  • created_at (datetime)

  • updated_at (datetime)

add_datasource()
property files: Files

Get datalake Files object

Request syntax:
channel = client.get_channel(channel_id='1230000000000')
channel.files
Returns:

Files object

get_file(file_id: str) DatalakeFile

get a datalake file in the channel

Request syntax:
file_id = '20180101T000000-00000000-1111-2222-3333-999999999999'
datalake_file = channel.get_file(file_id=file_id)
Params:
  • file_id (str): FILE_ID

Return type:

DatalakeFile object

list_datasources()
list_files(start: str | None = None, end: str | None = None, timezone: str | None = None, sort: str | None = None, next_page_token: str | None = None, limit: int | None = None, prefetch: bool = False, query: str | None = None) FileIterator

get datalake files in the channel

Request syntax:
for f in channel.list_files():
    pass
Params:
  • start (str): start date of target uploaded files

  • end (str): end date of target uploaded files

  • timezone (str): timezone of specified start and end date

  • query (str):

    query to search. It is possible to filter what contain specific value by describing like “x-abeja-meta-filename:filename”.

  • sort (str):

    the order of the file list. multiple items can be specified by separating with commas (,). It is possible to sort in descending order by specifying a hyphen (-) in front of the item. By default, the list is sorted by uploaded_at in ascending order.

Return type:

FileIterator object

remove_datasource()
upload(file_obj: BytesIO, content_type: str, metadata: dict | None = None, lifetime: str | None = None, conflict_target: str | None = None) DatalakeFile

upload a content to a channel with file-like object.

Request syntax:
content_type = 'image/jpeg'
metadata = {
    'label': 'example'
}
with open('example.csv') as f:
    response = channel.upload(f, content_type, metadata=metadata)
Params:
  • file_obj (a file-like object) : a file-like object to upload. It must implement the read method, and must return bytes.

  • content_type (str): MIME type of content.

  • metadata (dict): [optional] metadata to be added to uploaded file. Object can not be set to the key or value of dict. It must be a string.

  • lifetime (str): [optional] each one of 1day / 1week / 1month / 6months / 1year.

    the file will be deleted after the specified time.

  • conflict_target (str): [optional] return 409 Conflict when the same value of specified key already exists in channel.

Return type:

DatalakeFile object

Returns:

a file uploaded to a channel

upload_dir(dir_path: str, metadata: dict | None = None, content_type: str | None = None, lifetime: str | None = None, conflict_target: str | None = None, recursive: bool = False, use_thread: bool = True) Iterable[DatalakeFile]

upload files in directory to a channel. This method infers the content_type of given file if content_type is not specified, and set the filename as x-abeja-meta-filename in metadata.

Note: this method returns list ( not generator ) to make sure upload process will be done here.

Request syntax:
metadata = {
    'label': 'example'
}
response = channel.upload_dir('./source_dir', metadata)
Params:
  • content (file-like object) : contents to be uploaded

  • metadata (dict): metadata to be added to uploaed file. [optional]

  • content_type (str): MIME type of content. Content-Type is assumed by extensions if not specified [optional]

  • lifetime (str): [optional] each one of 1day / 1week / 1month / 6months / 1year.

    the file will be deleted after the specified time.

  • conflict_target (str): [optional] return 409 Conflict when the same value of specified key already exists in channel.

Return type:

list of DatalakeFile object

Returns:

A list of DatalakeFile successfully uploaded.

upload_file(file_path: str, metadata: dict | None = None, content_type: str | None = None, lifetime: str | None = None, conflict_target: str | None = None) DatalakeFile

upload a file to a channel. This method infers the content_type of given file if content_type is not specified, and set the filename as x-abeja-meta-filename in metadata.

Request syntax:
metadata = {
    'label': 'example'
}
response = channel.upload('~/example.txt', metadata=metadata)
Params:
  • file_path (str) : path to a file

  • metadata (dict): [optional] metadata to be added to uploaed file.

  • content_type (str): [optional] MIME type of content. Content-Type is assumed by the extension if not specified.

  • lifetime (str): [optional] each one of 1day / 1week / 1month / 6months / 1year.

    the file will be deleted after the specified time.

  • conflict_target (str): [optional] return 409 Conflict when the same value of specified key already exists in channel.

Return type:

DatalakeFile object

Returns:

a file uploaded to a channel

Channels

class abeja.datalake.channel.Channels(api: APIClient, organization_id: str)

a class for handling channels

create(name: str, description: str, storage_type: str) Channel

create a channel

API reference: POST /organizations/<organization_id>/channels/

Request Syntax:
params = {
    "name": "test-channel",
    "description": "test channel",
    "storage_type": "datalake"
}
channel = channels.create(**params)
Params:
  • name (str): channel name

  • description (str): channel description

  • storage_type (str): type of storage, datalake or file

Return type:

Channel object

get(channel_id: str) Channel

get a channel

API reference: GET /organizations/<organization_id>/channels/<channel_id>

Request Syntax:
channel_id = '1234567890123'
channel = channels.get(channel_id=channel_id)
Params:
  • channel_id (str): identifier of channel

Return type:

Channel object

list(limit: int | None = None, offset: int | None = None) Iterable[Channel]

list channels

API reference: GET /organizations/<organization_id>/channels/

Request Syntax:
channel = channels.list()
Return type:

generator of Channel objects

patch(channel_id: str, name: str | None = None, description: str | None = None) Channel

patch a channel

API reference: PATCH /organizations/<organization_id>/channels/<channel_id>

Request Syntax:
params = {
    "channel_id": "1234567890123",
    "name": "updated_name",
    "description": "updated description"
}
channel = channels.patch(**params)
Params:
  • channel_id (str): identifier of channel

  • name (str): channel name

  • description (str): channel description

Return type:

Channel object

File

class abeja.datalake.file.DatalakeFile(api: APIClient, organization_id: str | None = None, channel_id: str | None = None, file_id: str | None = None, uri: str | None = None, type: str | None = None, upload_url: str | None = None, download_uri: str | None = None, content_type: str | None = None, url_expires_on: str | None = None, metadata: dict | None = None, uploaded_at: str | None = None, lifetime: str | None = None, **kwargs)

a model class for a datalake channel file

if the file exists in local, get data from the file. unless, get data from remote, and save it in local.

the file is saved in ./{channel_id}/{file_id} by default.

you can change the location by setting ABEJA_STORAGE_DIR_PATH as environment variable. then it will be saved in ${ABEJA_STORAGE_DIR_PATH}/{channel_id}/{file_id}.

Properties:
  • organization_id (str)

  • channel_id (str)

  • file_id (str)

  • uri (str)

  • type (str)

  • upload_url (str)

  • download_uri (str)

  • content_type (str)

  • metadata (dict)

  • url_expires_on (str)

  • uploaded_at (datetime)

LIFETIME = ('1day', '1week', '1month', '6months', '1year')
commit() bool

reflect instance info into remote state. only metadata, lifetime are editable for now.

Return Type:

Optional[bool] : True if succeeded in update

get_content(cache: bool = True) bytes

Get content from a binary file

Request syntax:
file_id = '20180101T000000-00000000-1111-2222-3333-999999999999'
datalake_file = channel.get_file(file_id=file_id)
content = datalake_file.get_content()
Params:
  • cache (str):

    if True, read file saved in [ABEJA_STORAGE_DIR_PATH]/[channel_id]/[file_id] if exists, and if not, downloaded content will be saved in the path. By default, True.

Return type:

bytes

get_file_info() dict

Get information of a file

Request syntax:
file_id = '20180101T000000-00000000-1111-2222-3333-999999999999'
datalake_file = channel.get_file(file_id=file_id)
content = datalake_file.get_file_info()
Return type:

dict

Returns:

Response Syntax:

{
    "url_expires_on": "2017-12-20T17:08:26+00:00",
    "uploaded_at": "2017-12-18T05:39:47+00:00",
    "metadata": {
        "x-abeja-meta-filename": "test.jpg"
    },
    "file_id": "20171218T053947-821bd0a3-3992-4320-bc1c-1ee8d0a0ad6b",
    "download_uri": "...",
    "content_type": "image/jpeg"
}
get_iter_content(cache: bool = True, chunk_size: int = 1048576) Generator[bytes, None, None]

Get content iteratively from a binary file

Request syntax:
file_id = '20180101T000000-00000000-1111-2222-3333-999999999999'
datalake_file = channel.get_file(file_id=file_id)
content = datalake_file.get_iter_content()
Params:
  • cache (str):

    if True, read file saved in [ABEJA_STORAGE_DIR_PATH]/[channel_id]/[file_id] if exists, and if not, downloaded content will be saved in the path. By default, True.

  • chunk_size (str):

    The number of bytes it should read into memory. default value : 1,048,576 ( = 1MB )

Return type:

generator

get_iter_lines(cache: bool = True) Generator[str, None, None]

Get lines iteratively from a text file

if the text file exists in local, get content from the file. unless, get content from remote, and save it in local.

Request syntax:
file_id = '20180101T000000-00000000-1111-2222-3333-999999999999'
datalake_file = channel.get_file(file_id=file_id)
content = datalake_file.get_iter_lines()
Params:
  • cache (str):

    if True, read file saved in [ABEJA_STORAGE_DIR_PATH]/[channel_id]/[file_id] if exists, and if not, downloaded content will be saved in the path. By default, True.

Return type:

generator

get_json() dict

Get json from a file

Request syntax:
file_id = '20180101T000000-00000000-1111-2222-3333-999999999999'
datalake_file = channel.get_file(file_id=file_id)
content = datalake_file.get_json()
Return type:

dict

Raises:

json.decoder.JSONDecodeError

get_text(cache: bool = True, encoding: str | None = None) str

Get content from a text file

Request syntax:
file_id = '20180101T000000-00000000-1111-2222-3333-999999999999'
datalake_file = channel.get_file(file_id=file_id)
content = datalake_file.get_text()
Params:
  • cache (str):

    if True, read file saved in [ABEJA_STORAGE_DIR_PATH]/[channel_id]/[file_id] if exists, and if not, downloaded content will be saved in the path. By default, True.

  • encoding (str):

    Specify to get text encoded in other than ISO-8859-1.

Return type:

str

property lifetime: str
to_source_data() Dict[str, str]

Convert to source data format

Return type:

dict