# Copyright (C) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE in project root for information.
import sys
if sys.version >= '3':
basestring = str
import pyspark
from pyspark import SparkContext
from pyspark import sql
from pyspark.ml.param.shared import *
from pyspark.sql import DataFrame
from pyspark.sql.types import *
BinaryFileFields = ["path", "bytes"]
"""
Names of Binary File Schema field names.
"""
BinaryFileSchema = StructType([
StructField(BinaryFileFields[0], StringType(), True),
StructField(BinaryFileFields[1], BinaryType(), True) ])
"""
Schema for Binary Files.
Schema records consist of BinaryFileFields name, Type, and ??
path
bytes
"""
[docs]def readBinaryFiles(self, path, recursive = False, sampleRatio = 1.0, inspectZip = True, seed=0):
"""
Reads the directory of binary files from the local or remote (WASB) source
This function is attached to SparkSession class.
:Example:
>>> spark.readBinaryFiles(path, recursive, sampleRatio = 1.0, inspectZip = True)
Args:
path (str): Path to the file directory
recursive (b (double): Fraction of the files loaded into the dataframe
Returns:
DataFrame: DataFrame with a single column "value"; see binaryFileSchema for details
"""
ctx = SparkContext.getOrCreate()
reader = ctx._jvm.com.microsoft.ml.spark.io.binary.BinaryFileReader
sql_ctx = pyspark.SQLContext.getOrCreate(ctx)
jsession = sql_ctx.sparkSession._jsparkSession
jresult = reader.read(path, recursive, jsession, float(sampleRatio), inspectZip, seed)
return DataFrame(jresult, sql_ctx)
setattr(sql.SparkSession, 'readBinaryFiles', classmethod(readBinaryFiles))
[docs]def streamBinaryFiles(self, path, sampleRatio = 1.0, inspectZip = True, seed=0):
"""
Streams the directory of binary files from the local or remote (WASB) source
This function is attached to SparkSession class.
:Example:
>>> spark.streamBinaryFiles(path, sampleRatio = 1.0, inspectZip = True)
Args:
path (str): Path to the file directory
Returns:
DataFrame: DataFrame with a single column "value"; see binaryFileSchema for details
"""
ctx = SparkContext.getOrCreate()
reader = ctx._jvm.com.microsoft.ml.spark.io.binary.BinaryFileReader
sql_ctx = pyspark.SQLContext.getOrCreate(ctx)
jsession = sql_ctx.sparkSession._jsparkSession
jresult = reader.stream(path, jsession, float(sampleRatio), inspectZip, seed)
return DataFrame(jresult, sql_ctx)
setattr(sql.SparkSession, 'streamBinaryFiles', classmethod(streamBinaryFiles))
[docs]def isBinaryFile(df, column):
"""
Returns True if the column contains binary files
Args:
df (DataFrame): The DataFrame to be processed
column (bool): The name of the column being inspected
Returns:
bool: True if the colum is a binary files column
"""
ctx = SparkContext.getOrCreate()
schema = ctx._jvm.com.microsoft.ml.spark.core.schema.BinaryFileSchema
return schema.isBinaryFile(df._jdf, column)