Client

Client represents a higher level interface to datasets API.

class abeja.datalake.Client(organization_id: typing.Union[str, NoneType] = None, credential: typing.Union[typing.Dict[str, str], NoneType] = None, timeout: typing.Union[int, NoneType] = None, max_retry_count: typing.Union[int, NoneType] = None) → None

A high-level client for Datalake API

from abeja.datalake import Client

client = Client()
channels

Get channel objects

Request syntax:
channels = client.channels
Returns:
Channels object
get_channel(channel_id) → abeja.datalake.channel.Channel

Get channel for specific channel_id

Request syntax:
channel = client.get_channel(channel_id='1111111111111')
Params:
  • channel_id (str): channel id
Return type:
Channel object

Channel

class abeja.datalake.channel.Channel(api: abeja.datalake.api.client.APIClient, organization_id: str, channel_id: str, name: str = None, description: str = None, display_name: str = None, storage_type: str = None, created_at: str = None, updated_at: str = None, archived: bool = False) → None

a model class for a channel

Properties:
  • organization_id (str)
  • channel_id (str)
  • name (str)
  • display_name (str)
  • description (str)
  • archived (bool)
  • created_at (datetime)
  • updated_at (datetime)
add_datasource()
files

Get datalake Files object

Request syntax:
channel = client.get_channel(channel_id='1230000000000')
channel.files
Returns:
Files object
get_file(file_id: str) → abeja.datalake.file.DatalakeFile

get a datalake file in the channel

Request syntax:
file_id = '20180101T000000-00000000-1111-2222-3333-999999999999'
datalake_file = channel.get_file(file_id=file_id)
Params:
  • file_id (str): FILE_ID
Return type:
DatalakeFile object
list_datasources()
list_files(start: str = None, end: str = None, timezone: str = None, sort: str = None, next_page_token: str = None, limit: int = None, prefetch: bool = False, query: str = None) → abeja.datalake.file.FileIterator

get datalake files in the channel

Request syntax:
for f in channel.list_files():
    pass
Params:
  • start (str): start date of target uploaded files
  • end (str): end date of target uploaded files
  • timezone (str): timezone of specified start and end date
  • query (str):
    query to search. It is possible to filter what contain specific value by describing like “x-abeja-meta-filename:filename”.
  • sort (str):
    the order of the file list. multiple items can be specified by separating with commas (,). It is possible to sort in descending order by specifying a hyphen (-) in front of the item. By default, the list is sorted by uploaded_at in ascending order.
Return type:
FileIterator object
remove_datasource()
upload(file_obj: _io.BytesIO, content_type: str, metadata: dict = None, lifetime: str = None, conflict_target: str = None) → abeja.datalake.file.DatalakeFile

upload a content to a channel with file-like object.

Request syntax:
content_type = 'image/jpeg'
metadata = {
    'label': 'example'
}
with open('example.csv') as f:
    response = channel.upload(f, content_type, metadata=metadata)
Params:
  • file_obj (a file-like object) : a file-like object to upload. It must implement the read method, and must return bytes.
  • content_type (str): MIME type of content.
  • metadata (dict): [optional] metadata to be added to uploaded file. Object can not be set to the key or value of dict. It must be a string.
  • lifetime (str): [optional] each one of 1day / 1week / 1month / 6months. the file will be deleted after the specified time.
  • conflict_target (str): [optional] return 409 Conflict when the same value of specified key already exists in channel.
Return type:
DatalakeFile object
Returns:
a file uploaded to a channel
upload_dir(dir_path: str, metadata: dict = None, content_type: str = None, lifetime: str = None, conflict_target: str = None, recursive: bool = False, use_thread: bool = True) → typing.Iterable[abeja.datalake.file.DatalakeFile]

upload files in directory to a channel. This method infers the content_type of given file if content_type is not specified, and set the filename as x-abeja-meta-filename in metadata.

Note: this method returns list ( not generator ) to make sure upload process will be done here.

Request syntax:
metadata = {
    'label': 'example'
}
response = channel.upload_dir('./source_dir', metadata)
Params:
  • content (file-like object) : contents to be uploaded
  • metadata (dict): metadata to be added to uploaed file. [optional]
  • content_type (str): MIME type of content. Content-Type is assumed by extensions if not specified [optional]
  • lifetime (str): [optional] each one of 1day / 1week / 1month / 6months. the file will be deleted after the specified time.
  • conflict_target (str): [optional] return 409 Conflict when the same value of specified key already exists in channel.
Return type:
list of DatalakeFile object
Returns:
A list of DatalakeFile successfully uploaded.
upload_file(file_path: str, metadata: dict = None, content_type: str = None, lifetime: str = None, conflict_target: str = None) → abeja.datalake.file.DatalakeFile

upload a file to a channel. This method infers the content_type of given file if content_type is not specified, and set the filename as x-abeja-meta-filename in metadata.

Request syntax:
metadata = {
    'label': 'example'
}
response = channel.upload('~/example.txt', metadata=metadata)
Params:
  • file_path (str) : path to a file
  • metadata (dict): [optional] metadata to be added to uploaed file.
  • content_type (str): [optional] MIME type of content. Content-Type is assumed by the extension if not specified.
  • lifetime (str): [optional] each one of 1day / 1week / 1month / 6months. the file will be deleted after the specified time.
  • conflict_target (str): [optional] return 409 Conflict when the same value of specified key already exists in channel.
Return type:
DatalakeFile object
Returns:
a file uploaded to a channel

Channels

class abeja.datalake.channel.Channels(api: abeja.datalake.api.client.APIClient, organization_id: str) → None

a class for handling channels

create(name: str, description: str, storage_type: str) → abeja.datalake.channel.Channel

create a channel

API reference: POST /organizations/<organization_id>/channels/

Request Syntax:
params = {
    "name": "test-channel",
    "description": "test channel",
    "storage_type": "datalake"
}
channel = channels.create(**params)
Params:
  • name (str): channel name
  • description (str): channel description
  • storage_type (str): type of storage, datalake or file
Return type:
Channel object
get(channel_id: str) → abeja.datalake.channel.Channel

get a channel

API reference: GET /organizations/<organization_id>/channels/<channel_id>

Request Syntax:
channel_id = '1234567890123'
channel = channels.get(channel_id=channel_id)
Params:
  • channel_id (str): identifier of channel
Return type:
Channel object
list(limit: int = None, offset: int = None) → typing.Iterable[abeja.datalake.channel.Channel]

list channels

API reference: GET /organizations/<organization_id>/channels/

Request Syntax:
channel = channels.list()
Return type:
generator of Channel objects
patch(channel_id: str, name: str = None, description: str = None) → abeja.datalake.channel.Channel

patch a channel

API reference: PATCH /organizations/<organization_id>/channels/<channel_id>

Request Syntax:
params = {
    "channel_id": "1234567890123",
    "name": "updated_name",
    "description": "updated description"
}
channel = channels.patch(**params)
Params:
  • channel_id (str): identifier of channel
  • name (str): channel name
  • description (str): channel description
Return type:
Channel object

File

class abeja.datalake.file.DatalakeFile(api: abeja.datalake.api.client.APIClient, organization_id: str = None, channel_id: str = None, file_id: str = None, uri: str = None, type: str = None, upload_url: str = None, download_uri: str = None, content_type: str = None, url_expires_on: str = None, metadata: dict = None, uploaded_at: str = None, lifetime: str = None, **kwargs) → None

a model class for a datalake channel file

if the file exists in local, get data from the file. unless, get data from remote, and save it in local.

the file is saved in ./{channel_id}/{file_id} by default.

you can change the location by setting ABEJA_STORAGE_DIR_PATH as environment variable. then it will be saved in ${ABEJA_STORAGE_DIR_PATH}/{channel_id}/{file_id}.

Properties:
  • organization_id (str)
  • channel_id (str)
  • file_id (str)
  • uri (str)
  • type (str)
  • upload_url (str)
  • download_uri (str)
  • content_type (str)
  • metadata (dict)
  • url_expires_on (str)
  • uploaded_at (datetime)
LIFETIME = ('1day', '1week', '1month', '6months')
commit() → bool

reflect instance info into remote state. only metadata, lifetime are editable for now.

Return Type:
Optional[bool] : True if succeeded in update
get_content(cache: bool = True) → bytes

Get content from a binary file

Request syntax:
file_id = '20180101T000000-00000000-1111-2222-3333-999999999999'
datalake_file = channel.get_file(file_id=file_id)
content = datalake_file.get_content()
Params:
  • cache (str):
    if True, read file saved in [ABEJA_STORAGE_DIR_PATH]/[channel_id]/[file_id] if exists, and if not, downloaded content will be saved in the path. By default, True.
Return type:
bytes
get_file_info() → dict

Get information of a file

Request syntax:
file_id = '20180101T000000-00000000-1111-2222-3333-999999999999'
datalake_file = channel.get_file(file_id=file_id)
content = datalake_file.get_file_info()
Return type:
dict
Returns:

Response Syntax:

{
    "url_expires_on": "2017-12-20T17:08:26+00:00",
    "uploaded_at": "2017-12-18T05:39:47+00:00",
    "metadata": {
        "x-abeja-meta-filename": "test.jpg"
    },
    "file_id": "20171218T053947-821bd0a3-3992-4320-bc1c-1ee8d0a0ad6b",
    "download_uri": "...",
    "content_type": "image/jpeg"
}
get_iter_content(cache: bool = True, chunk_size: int = 1048576) → typing.Generator[[bytes, NoneType], NoneType]

Get content iteratively from a binary file

Request syntax:
file_id = '20180101T000000-00000000-1111-2222-3333-999999999999'
datalake_file = channel.get_file(file_id=file_id)
content = datalake_file.get_iter_content()
Params:
  • cache (str):
    if True, read file saved in [ABEJA_STORAGE_DIR_PATH]/[channel_id]/[file_id] if exists, and if not, downloaded content will be saved in the path. By default, True.
  • chunk_size (str):
    The number of bytes it should read into memory. default value : 1,048,576 ( = 1MB )
Return type:
generator
get_iter_lines(cache: bool = True) → typing.Generator[[str, NoneType], NoneType]

Get lines iteratively from a text file

if the text file exists in local, get content from the file. unless, get content from remote, and save it in local.

Request syntax:
file_id = '20180101T000000-00000000-1111-2222-3333-999999999999'
datalake_file = channel.get_file(file_id=file_id)
content = datalake_file.get_iter_lines()
Params:
  • cache (str):
    if True, read file saved in [ABEJA_STORAGE_DIR_PATH]/[channel_id]/[file_id] if exists, and if not, downloaded content will be saved in the path. By default, True.
Return type:
generator
get_json() → dict

Get json from a file

Request syntax:
file_id = '20180101T000000-00000000-1111-2222-3333-999999999999'
datalake_file = channel.get_file(file_id=file_id)
content = datalake_file.get_json()
Return type:
dict
Raises:
json.decoder.JSONDecodeError
get_text(cache: bool = True, encoding: typing.Union[str, NoneType] = None) → str

Get content from a text file

Request syntax:
file_id = '20180101T000000-00000000-1111-2222-3333-999999999999'
datalake_file = channel.get_file(file_id=file_id)
content = datalake_file.get_text()
Params:
  • cache (str):
    if True, read file saved in [ABEJA_STORAGE_DIR_PATH]/[channel_id]/[file_id] if exists, and if not, downloaded content will be saved in the path. By default, True.
  • encoding (str):
    Specify to get text encoded in other than ISO-8859-1.
Return type:
str
lifetime
to_source_data() → typing.Dict[str, str]

Convert to source data format

Return type:
dict