InterviewSolution
This section includes InterviewSolutions, each offering curated multiple-choice questions to sharpen your knowledge and support exam preparation. Choose a topic below to get started.
| 1. |
How sentry Architecture is helping Hadoop services to get secure? How Hive, Impala, HDFS and search activities are working with Sentry. |
|
Answer» $ HADOOP FS -copyToLocal $ hadoop fs -copyFromLocal $ hadoop fs -PUT |
|
| 2. |
How LDAP, Active Directory and Kerberos will help the Hadoop environment to get secure? |
|
Answer» Sentry is a role-based authorization to both data and metadata stored on a Hadoop cluster for a user. Prior to know more about Sentry, below are the components based on which sentry is working.
Sentry server only helps you to get the metadata information. The actual authorization decision is made by a Data engine that runs in data PROCESSING applications such as Hive or Impala. Each component loads the Sentry plug-in it means for each service like Hive/Hdfs/Impala/solr, each sentry plug-in has to be installed for dealing with the Sentry services and the policy engine to validate the authorization request. Below are the few capabilities which sentry is having. 1. Fine-Grained Authorization: It means Permissions on object hierarchies for example Server level, Database level, Table level, view (Row/column level authorization), URI and permissions hierarchies will be Select/insert/All this is called Fine-Grained Authorization. 2. Role-Based Authorization(RBAC): Sentry is providing role-based authorization where it is supporting a set of privileges and it supports for role templates which combine multiple access rules for a large set of users and data objects(Database, Table, etc). For example, If Bibhu joins the Finance Department, all you need to do is add him to the finance-department group in Active Directory. This will give Bibhu access to data from the Sales and Customer tables. You can create a role called Analyst and grant SELECT on tables Customer and Sales to this role.
Now Bibhu who is a member of the finance-department group GETS the SELECT privilege to the Customer and Sales tables.
3. Multi Tanent Administration or Delegate Admin responsibilities: It is having the capability to delegate or assign the admin responsibilities for a subset of resources. Delegate admin responsibility it means Delegated-Admin Privilege is assigned on a specific set of resources for a specific set of users/groups by a person who has already Delegated-Admin privilege on the specific set of resources. 4. User Identity and Group Mapping: Sentry depends on Kerberos or LDAP to identify the user. It also uses the group mapping mechanism configured in Hadoop to ensure that Sentry sees the same group mapping as other components of the Hadoop ecosystem. For example, considering that users Bibhu and Sibb belong to an Active Directory (AD) group called the finance-department. Sibb also belongs to a group called finance-managers. In Sentry, create the roles first and then grant required privileges to those roles. For example, you can create a role called Analyst and grant SELECT on tables Customer and Sales to this role.
The next step is to join these authentication entities (users and groups) to authorization entities (roles). This can be done by granting the Analyst role to the finance-department group. Now Bibhu and Sibb who are members of the finance-department group get the SELECT privilege to the Customer and Sales tables. GRANT ROLE Analyst TO GROUP finance-department ; Below are some scenarios where Hive, Impala, HDFS, and search activities are working with Sentry. Considering a few examples we will try to understand how it works. 1. Hive and Sentry :
Here in the above query Hive will identify that user Bibhu is requesting SELECT access to the Status table. At this point, Hive will ask the Sentry plugin to validate the access request of Bibhu. The plugin will retrieve Bibhu's privileges related to the Status table and the policy engine will determine if the request is valid or not. 2. Impala and Sentry: Authorization processing in Impala is more or less the same as Hive. The main difference is the caching of privileges. Usually, Impala’s Catalog server is managing caching roles and privileges or metadata, and spread it to all Impala server nodes. As a result, Impala daemon can authorize queries much faster referring to the metadata from the cache memory. The only drawback related to PERFORMANCE is it will take some time for privilege changes to take effect, it might take a few seconds. 3. Sentry-HDFS Synchronization: When we are talking about Sentry and HDFS authorization, it basically speaks about Hive warehouse data. Warehouse data means whether it is Hive or Impala data related to Table. The main objective is when other components like Pig, MapReduce or Spark trying to access the hive table at that time similar authorization check will occur. At this point, this feature does not replace HDFS ACLs. The tables which are not associated with sentry those retain their old ACLs. The mapping of Sentry privileges to HDFS ACL permissions is as follows:
When NameNode loads a Sentry plugin that caches Sentry privileges as well as Hive metadata. It helps HDFS to keep file permissions and Hive tables privileges in SYNC. The Sentry plugin periodically communicates the Sentry and Metastore to keep the metadata changes are in sync. For example, if Bibhu runs a Pig job, which is reading from the Sales table data files, anyhow data files will be stored in HDFS. Sentry plugin on the Name Node will figure out that data file is part of Hive data and cover Sentry privileges on top of the file ACLs, It means HDFS will get the same privileges for this Pig client that Hive would have applied for a SQL query. For HDFS-Sentry synchronization to work, for doing the same you must use the Sentry service, not policy file authorization. 4. Search and Sentry: Sentry can apply restriction on search tasks which are coming from a browser or command line or through the admin console. With Search, Sentry stores its privilege policies in a policy file (for example, sentry-provider.ini) which is stored in an HDFS location such as hdfs://ha-nn-uri/user/solr/sentry/sentry-provider.ini. 5. Disabling Hive CLI: To execute the hive queries you have to use beeline. when you will disable Hive CLI, Hive CLI is not supported with Sentry and Hive Metastore also be disabled. This is especially necessary if the Hive metastore has sensitive metadata. To do the same, you have to modify the hadoop.proxyuser.hive.groups in core-site.xml on the Hive Metastore host. For example, to give the hive user permission to members of the hive and hue groups, set the property to: <property> <name>hadoop.proxyuser.hive.groups</name> <value>hive,hue</value> </property>If More user groups that require access to the Hive Metastore can be added to the comma-separated list as needed. |
|
| 3. |
Brief about the few optimizing techniques for the Hive performance. |
|
Answer» LDAP and Active Directory are providing a centralized security system for MANAGING both servers and users, It is managing for all user accounts and associated privileges for your employee. Kerberos is handled Authentication it means when a user trying to connect any Hadoop services, Kerberos will authenticate the user first then it will authenticate service too. when you are considering AD, LDAP and Kerberos in this scenario Kerberos will only provide authentication, all Identity Management is handled outside of Kerberos that is in AD and LDAP. In the high level when a new employee joins, his/her id has to be added in Active directory first then LDAP and Kerberos because AD is a directory service, owned by Microsoft and AD supports several standard protocols such as LDAP and Kerberos. LDAP and AD communicating with each other based on what user ID BELONGS to which group, for example, user Bibhu is a member of which groups and what kind of access permission he is having in different directories or files. These are the information is managed differently in AD and Linux system. In Windows, we have a concept called SID or Window security identifiers and in Linux, we do have a User ID or Group ID. SSSD can use the SID of an AD user to algorithmically generate POSIX IDs in a process called ID mapping. ID mapping creates a MAP between SIDs in AD and UID/GID on Linux. AD can create and store POSIX attributes such as uidNumber, gidNumber, unixHomeDirectory, or login Shell There are two ways to mapping these SID and UID/GID using SSSD.
ldap_id_mapping = true
ldap_id_mapping = False Below are few concepts need to know to understand the Integration of AD/LDAP/Kerberos
PAM: PAM stands for pluggable authentication Module, which allows integration of authentication technology such as Unix, Linux, LDAP, etc into system services such as password, login, ssh, etc. alternatively When you're prompted for a password, that's usually PAM's doing. PAM provides an API through which authentication requests are mapped into technology-specific actions. This kind of mapping is done by PAM configuration files. Authentication mechanism is providing for each service. NSS: NSS uses a common API and a configuration file (/etc/nsswitch.conf) in which the name service providers for every supported database are specified. Here Names include hostnames, usernames, group names such as /etc/passwd, /etc/group, and /etc/hosts.
Below are 3 ways of integrating Linux with AD for Authentication
Let’s understand clearly: 1. Using LDAP/Kerberos PAM and NSS Module: PAM is configured to use Kerberos for authentication and NSS is to use the LDAP protocol for querying UID or GID information. nss_ldap, pam_ldap, and pam_krb5 modules are available to support. Here Problem is no caching of the credentials and there is no such offline support available here. 2. Using Winbind: Samba Winbind was a traditional or USUAL way of connecting Linux systems to AD. Basically, Winbind copy a Windows client on a Linux system and is able to communicate to AD servers alternatively we have winbind daemon which will receive calls from PAM and NSS, Once it is received it will translate into corresponding Active directory calls using either LDAP, KERBEROS or Remote protocol(RPC) depending on the requirement. The current versions of the System Security Services Daemon (SSSD) closed a feature gap between Samba Winbind and SSSD so Samba Winbind is no longer the first choice in general. 3. Using SSSD that is system services daemon for Integrating with Active Directory: The System Security Services Daemon (SSSD) is an intermediary between local clients and any Remote Directories and Authentication Mechanism. The local clients connect to SSSD and then SSSD contacts the external providers that are AD, LDAP server. So here SSSD is working as a Bridge which will help you to Access the AD, LDAP. Basically System authentication is configured locally which means initially services check with a local user store to determine users and credentials. SSSD allows a local service to check with local cache in SSSD so Local cache information might have taken from an LDAP directory or AD or Kerberos Realm. Below are the few advantages related to SSSD
sssd daemon provides different services for different purposes. We have a configuration file called sssd.conf which determines what tasks sssd can do. The file has 2 main parts as we can see here: [sssd] domains = WIN.EXAMPLE.COM [domain/WINDOWS] id_provider = ad In the first part, we have clearly mentioned that what services on the system must use sssd, here in the above example nss and Pam has mentioned. The second part, domain/WINDOWS defines directory services also called identity provider for example AD, LDAP server. SSSD connecting AD/LDAP for querying the information, authentication, password change, etc. In brief below are the steps how SSSD is working or brief about the above diagram
|
|
| 4. |
How HIVE Database and IMPALA are working together in CLOUDERA? |
|
Answer» As we know that most of the Hive tables are containing billions and millions records and for any computation hive query will process with the help of Mapper and Reducer and it will consume more time and memory. Few of the optimization techniques which will always help hive query to perform better . Please find few of the below techniques. 1. Use Tez to Fasten the execution: Apache TEZ is an execution engine used for faster query execution. Tez will allow you to launch a single Application Master for each session for multiple job, condition is that jobs are comparatively small so that Tez memory can use for those jobs. You need to set up the processing engine as Tez instead of default Map-Reduce execution engine providing below parameter. Set hive.execution.engine=tez; If you are using Cloudera/Hortonworks, then you will find TEZ option in the Hive query editor as well. 2. Enable compression in Hive Basically Compression techniques, It reduce the amount of data size being transferred, so that it reduces the data transfer between mappers and reducers and compression is not suggestible if your data is already compressed because the output file size might be larger than the original. For better result, you need to perform compression at both mapper and reducer side separately. There are many compression formats are AVAILABLE out of which gzip is taking more CPU resources than Snappy or LZO but it provides HIGHER compression ratio. It is not relevant for splittable table. Other formats are snappy, lzo, bzip, etc. You can set compression at mapper and reducer side using codes below: set mapred.compress.map.output = true; Users can also set the following properties in hive-site.xml and map-site.xml to get permanent effects. <property> <name>mapred.compress.map.output</name> <value>true</value> </property> <property> <name>mapred.map.output.compression(for MR)/compress(for Yarn).codec</name> <value>org.apache.hadoop.io.compress.SnappyCodec</value> </property>3. Use ORC file format ORC (optimized record columnar) is an appropriate format for hive performance tunin,query performance can improve using ORC file format easily. We can use ORC file format for all kind of table whether it is partitioned or single and in response, you get faster computation and compressed file size.
4. Optimize your joins If your table is having large data then it is not advisable to just use normal joins which we use in SQL. There are many other joins like Map Join; bucket joins, etc. which will help to improve Hive query performance. 5. Use Map Join When we are talking about Map join, It is beneficial when one table is as compare to other table which will take part of the Join. so that it can fit into the memory. Hive has a property which can do auto-map join when enabled. Set the below parameter to true to enable auto map join. Set hive.auto.convert.join to true to enable the auto map join. we can set this from the command line as well as from the hive-site.xml file <property> <name>hive.auto.convert.join</name> <value>true</value> <description>Whether Hive enables the optimization about converting common join into mapjoin based on the input file size</description> </property>6. Bucketed Map Join If tables are bucketed by a particular column, you can use bucketed map join to improve the hive query performance. You can set the below two property to enable the bucketed map to join in Hive. <property> <name>hive.optimize.bucketmapjoin</name> <value>true</value> <description>Whether to try bucket mapjoin</description> </property> <property> <name>hive.optimize.bucketmapjoin.sortedmerge</name> <value>true</value> <description>Whether to try sorted bucket merge map join</description> </property>7. Use Partition Partition is always helpful for huge data. It is used to segregate the large table based on certain columns so that the whole data can be divided into small chunks. When we are saying partition the table, basically It allows you to store the data under sub-directory inside a table. Selecting the partition table is always a critical decision, and you need to take care of future data and volume of data as well. For example, if you have data of a particular location then you can partition the table based on state. You can also partition the data in month wise as well. You can define the partition column based on your requirement. Here is the syntax to create a partition table CREATE TABLE countrydata_partition (Id int, country name string, population int, description string) PARTITIONED BY (country VARCHAR(64), state VARCHAR(64)) row format DELIMITED fields terminated by ‘\t’ stored AS textfile;There are two types of partition in Hive.
By default, the partition is static in a hive. In static partition usually we are providing the parameter as " PARTITIONED BY (department String) ". when loading big files into the hive, the static partition is preferred. Single insert to partition table is known as dynamic partition and it load the data from non partitioned Table. If you don't know how many columns are available in your table in this scenario also dynamic partition is suitable. To use dynamic partition in Hive, you need to set the following property-
8. Use Vectorization A standard query is executed one row at a time. vectorized query execution, it improves performance of operation like scan, AGGREGATION, filter and joins and it is considering 1024 rows at a time to perform the operation. To use Vectorization you can use the below parameter.
|
|
| 5. |
In your MapReduce job you consistently see that MapReduce map tasks on your cluster are running slowly because of excessive garbage collection of JVM. How do you increase JVM heap size property to 3GB to optimize performance? |
|
Answer» Hive works on structured data provide a SQL like a layer on top of HDFS, Map-reduce task will execute for each query of Hive which is trying to do some compute of HDFS data. Impala is a Massive PARALLEL processing SQL query engine that is capable enough to handle a huge volume of data. Impala is faster than Hive because Impala is not storing the intermediate query results on disk, it processes the SQL query in Memory without running any Map-reduce. Below are the few Hive components
1. Hive Clients: Hive clients are helping hive to perform the queries. There are three types of clients we can use to perform the queries
2. Hive Services
The compiler will verify the syntax check with the help of schema present in the metastore then optimizer generates the optimized logical plan in the form of Directed Acyclic Graph of Map-reduce and HDFS tasks. The Executor executes the tasks after the compilation and optimization steps. The Executor directly interacts with the Hadoop Job Tracker for scheduling of tasks to be run.
Impala components are 1. Impala daemon(Impalad) 2. Impala State Store 3. Impala Catalog Service.
Whenever query submitted in any impala daemon, the related node is considered " central coordinator node" for that query. After accepting the query, IMPALAD logically divides the query into smaller parallel queries and distribute them to different nodes in the impala cluster. all the Impalad gather all the intermediate result and send it to the central coordinator node, accordingly central coordinator node constructs the final query output.
Example : INVALIDATE METADATA [[db_name.]table_name]; REFRESH [db_name.]table_name]; |
|
| 6. |
You have installed a cluster HDFS and Map Reduce version 2 on YARN. You have no dfs.hosts entries in your hdfs-site.xml configuration file. You configure a new worker node by setting fs.default.name in its configuration files to point to the Name Node on your cluster, and you start the Data Node daemon on that worker node. What do you have to do on the cluster to allow the worker node to join, and start sorting HDFS blocks? |
|
Answer» Java garbage collection is the process by which Java programs perform automatic memory management. when we are talking about automatic memory management, it is a technique that automatically manages to allocation and deallocation of memory. Java programs compile to bytecode that can be run on a Java Virtual Machine alternatively Byte code is the compiled format of java program, once java program has been converted to byte code afterward it will EXECUTE by JVM and transferred across a network. While Java programs are running on the JVM , JVM has consumed memory which is called heap memory to do the same. Heap memory is a part of memory DEDICATED to the program. Hadoop mapper is a java process and EVERY java process has its own heap memory. Heap memory maximum allocation settings configured as mapred.map.child.java.opts or mapreduce.map.java.opts in Hadoop2. If the mapper process runs out of heap memory then the mapper throws a java out of memory exceptions as mentioned below. ERROR: java.lang.Runtimeexception:Java.lang.OutofMemoryError The java heap settings or size should be smaller than the Hadoop container memory limit because we need to reserve some memory for java code. Usually, it is recommended to reserve 20% memory for code. So if the settings are correct then Java-based Hadoop tasks will never get killed by Hadoop so you will not see the "Killing container" error like above. To execute the actual map or reduce task, YARN will run a JVM within the container. the Hadoop property MapReduce.{map|reduc}.java.opts is proposed to pass to this JVM. This could include -Xmx to set the max heap size of the JVM. Example: hadoop jar<jarName> -Dmapreduce.reduce.memory.mb=4096 -Dmapreduce.map.java.opts=-Xmx3276 |
|
| 7. |
You observed that the number of spilled records from Map tasks far exceeds the number of map output records. Your child heap size is 1gb and your io.sort.mb value is set to 1000mb. How would you tune your io.sort? Mb value to achieve maximum memory to I/O ratio. |
|
Answer» Basically DFS.HOST file contains all the data node details and it allows access to all the nodes mentioned in the DFS.HOST file. This is the default configuration used by the name node. DFS.HOST and DFS.HOST.EXCLUDE will help to re-commission and decommission the data nodes. Hadoop provides the decommission feature to exclude a SET of existing data nodes, the nodes to be taken out, should be included in excluding file and the exclude file name should be specified as a configuration parameter as dfs.hosts.exclude. You can find the example mentioned below. Examples: Modify the conf/mapred-site.xml, add: <property> <name>dfs.hosts</name> <value>/opt/hadoop/Bibhu/conf/datanode-allow.list</value> </property> <property> <name>dfs.hosts.exclude</name> <value>/opt/hadoop/Bibhu/conf/datanode-deny.list</value> </property>Decommission cannot happen immediately because it REQUIRES replication of potentially a large number of blocks and we do not WANT the cluster to be overwhelmed with just this one job. The decommission progress can be monitored on the name-node web UI or Cloudera UI. Till all blocks are replicated, the status of nodes will be in the "Decommission in progress" state. when decommission is done the state will change to "Decommissioned". The node can be removed whenever decommission is finished. We can use below commands Without creating a dfs.hosts file or making any entries, run the commands hadoop.dfsadminrefreshModes on the Name Node. # $HADOOP_HOME/bin/hadoop dfsadmin -refresh nodes-refreshNodes, It will update the name node with a set of data nodes so that data nodes are allowed to connect the Name node. |
|
| 8. |
<property> <name>yarn.nodemanager.resource.memory-mb</name> <value>102400</value> </property> <property> <name>yarn.nodemanager.resource.cpu-vcores</name> <value>48</value> </property> <name>yarn.scheduler.minimum-allocation-mb</name> <value> what is the correct value here</value> |
|
Answer» Basically "mapreduce.task.io.sort.mb" is the total amount of buffer memory which is to use while sorting files. It is representing in megabytes. Tune or provide the io.sort.mb value in such a way that the NUMBER of spilled records equals or is as close to equal the number of map output records. Map-reduce job makes the assurance that the input to every reducer is sorted by key. The process by which the system performs the sort and then transfers the mapper output to the reducers as inputs are known as shuffle. In the Map-reduce job, shuffle is an area of the code where fine-tuning and improvements are continually being MADE. In many ways, the shuffle is the heart of the map-reduce job. When the map function starts producing output, the process takes an advantage of BUFFERING and writes in memory and doing some presorting for more efficiency as well. Each map task has a circular memory buffer that writes the output too. The buffer is 100mb by DEFAULT, a size which can be tuned by changing the io.sort.mb property when the contents of the buffer reach a certain threshold size. Usually the default threshold size of io.sort.spill is 0.8 or 80% when it reaches the threshold a background thread will start to spill the contents to disk. Mapper output will continue to be written to the buffer while the spill takes place, but if the buffer fills up during this time the map will block until the spill is complete. Spills are written in a round-robin fashion to the directories specified by the mapred.local.dir property in a subdirectory. Each time when the memory buffer reaches the spill threshold at that time a new spill file is created, so after the map task has written its last output record there could be several spill files before the task is finished. The spill files are merged into single partitioned and sorted the output file. The configuration property io.sort.factor controls the maximum number of streams to merge at once. the default value of io.sort.factor is 10. Just want to brief about how io.sort.factor is working, when the Mapper task is running it continuously writing data into Buffers, to maintain the buffer we have to set up a PARAMETER called io.sort.spill .percent. The value of io.sort.spill.percent will indicate, after which point the data will be written into disk instead of a buffer which is filling up. All of this spilling to disk is done in a separate thread so that the Map can continue running. There may be multiple spills on the task tracker after the map task finished. Those files have to be merged into one single sorted file per partition which is fetched by a reducer. The property io.sort.factor says how many of those spill files will be merged into one file at a time. |
|
| 9. |
Each node in your Hadoop cluster with running YARN and has 140GB memory and 40 cores .your yarn-site.xml has the configuration as shown below. you want YARN to launch a maximum of 100 containers per node. Enter the property value that would restrict YARN from launching more than 100 containers per node. |
|
Answer» Usually, YARN is taking all of the available resources on each machine in the cluster into consideration. Based on the available resources, YARN negotiates the resources as requested from the application or map-reduce running in the cluster. YARN is allocating containers based on how much resources are required to the application. A CONTAINER is the basic unit of processing capacity in YARN, and the resource element included MEMORY CPU, etc. In the Hadoop cluster, it is required to balance the usage of memory(RAM), processors (CPU cores) and disks so that processing is not controlled by any one of these cluster resources. As per the best practice, it allows for two containers per disk and one core gives the best balance for cluster utilization. When you are considering the appropriate YARN and MapReduce memory configurations for a cluster node, in such a case, it is an ideal situation to consider the below values in each node.
Prior to calculating how much RAM, how much CORE and how much disks are required, you have to be aware of the below parameters.
|
|
| 10. |
What is HDFS Snapshot how it helps you to recover |
|
Answer» FUNDAMENTALLY snapshot means taking a Xerox copy of the content from the entire file-level or subtree of the file system until a certain time and its read-only. Snapshot is handling data corruption of user or application and accidental delete. It is always quicker to recovery from snapshot as compared to restore of the whole FSImage and it is easy to create a snapshot of the important directory before changing anything to it. Snapshot can be taken on any directory once you can be marked as "snapshot table", to doing the same you have to PROVIDE the command as "Hdfs dfsadmin -allowSnapshot <Path>".Once the snapshot table directory has been created than under that, subdirectory has been created as .snapshot, It is the place where snapshots are stored. There is no limit on the number of snapshot table directories, any number of a directory can create and snapshot table directory can contain 65536 snapshots simultaneously. We can change the name of a snapshot or we can use the default one (based on timestamp: "s'yyyyMMdd-HHmmss.SSS"). If there are any snapshots in the snapshot table directory then neither you can delete the directory nor rename the directory. deleting the snapshot table directory you have to delete all the snapshots under that directory. during the upgrading version of HDFS, ".snapshot" need to first be renamed or deleted to avoid conflicting with the reserved path. Snapshots are easily created with hdfs dfsadmin command, Please find the few commands related to snapshot. a. # Create directory structure hdfs dfs -mkdir /my_dir_bibhu b. # Allow snapshots creation for /my_dir_bibhu hdfs dfsadmin -allowSnapshot /my_dir_bibhu Allowing snaphot on /my_dir_bibhu SUCCEEDED c. # Create the first snapshot hdfs dfs -createSnapshot /my_dir_bibhu snaptest1 Created snapshot /my_dir_bibhu/.snapshot/snaptest1 d. # .snapshot can be read directly using below command hdfs dfs -ls /my_dir_bibhu/.snapshot Found 1 items drwxr-xr-x - bibhu supergroup 0 2016-12-03 09:52 /my_dir/.snapshot/snaptest1 e. # Create new snapshot - this time for directory containing a file hdfs dfs -createSnapshot /my_dir_bibhu snaptest2 Created snapshot /my_dir_bibhu/.snapshot/snaptest2 f. # This command serves to COMPARE snapshots hdfs snapshotDiff /my_dir_bibhu .snapshot/snaptest1 .snapshot/snaptest2 g. # Restore snapshot directory to a temporary place and check if file is there or not hdfs dfs -cp /my_dir_bibhu/.snapshot/snaptest2 /tmp/dir_from_snapshot hdfs dfs -ls /dir_from_snapshot |
|
| 11. |
What is YARN? What is the sequence of services when you are starting YARN? |
|
Answer» YARN stands for Yet Another Resource Negotiator. YARN is taking care of Job tracker's work like resource management and a part of that YARN is working as a schedule as well. It Supports a variety of processing engines and Applications. When we are saying different data processing ENGINE it means it supports Graph processing, Interactive Stream processing and batch processing to run and process the data which is stored in HDFS. Basically, the Resource manager receives the Job request from the client and accordingly it will Launch Application master JVM having default memory as 1 core and 2gb. Application Master will contact Name Node and get the location of the block, based on the availability of block in Node Manager It will check whether sufficient resources are available or not, Accordingly it will inform the Resource manager and Resource manager will provide resources to Node Manager to Launch the JVM for the JOB. Yarn is working as a schedule it means the SCHEDULER is responsible for allocating the resources to RUNNING the Application. It will not monitor the Application as well as it will not track the Application. It will not restart the failed task whether it is failed due to Application failure or Hardware Failure. YARN Scheduler supports three types of scheduler 1. FIFO scheduler Based on the Application requirement Hadoop Admin will select either FIFO, FAIR or Capacity Scheduler. FIFO scheduling is First in First out, in our current environment, this is rarely used. Fair scheduling is a method where resources are distributed in such a way that it is more or less equally DIVIDED to each job. Capacity scheduler where you can make sure that some percentage of resources you can assign to cluster based on your demand or computing need. Prior to start the YARN services, start the Resource manager and node manager services. In between Resource manager and Node, the manager makes sure the resource manager should start before starting node manager services. Please start your YARN services in the sequence mentioned below.
#service Hadoop-yarn-resource manager start
#service -yarn-nodemanager start
#service Hadoop-MapReduce-history server start |
|
| 12. |
What is an InputFormat and Record Reader in Hadoop? What are the various Input Formats in Hadoop? |
|
Answer» A file is read by a Map-Reduce job using an InputFormat. It defines how the file being read needs to be split up and read. InputFormat, in turn, defines a RecordReader which is responsible for reading ACTUAL records from the input files. The split computed by InputFormat is operated upon by map TASK. Map task USES Record Reader corresponding to InputFormat to read the data within each split and create key-value pairs. The various TYPES of InputFormat in Hadoop are:
|
|
| 13. |
What is an Uber task in Map Reduce? |
|
Answer» In a MAP-reduce job, the application master decides how many tasks need to be created for the job to be executed. The number of mapper tasks created is equal to the number of SPLITS. These tasks are usually launched in different JVMs than the application master (governed by data locality). However, if the job is small, the application master may decide to run the tasks in the same JVM as itself. In such a CASE the overhead of allocating containers for new tasks and monitoring them to gain that would be had in running the tasks in parallel compared to running the tasks sequentially on the same node. Such a job is called an Uber task. So how does the application master determine if the job is small enough to be run as an uber task? By default, if the job requires LESS than 10 mappers for its processing and one 1 reducer, and input size is less than the size of one HDFS block, then the application master may consider launching the job as an uber task. If the job doesn’t qualify to be run as an uber task then the app master requests for containers to be launched for all map and reduce tasks from the resource manager. |
|
| 14. |
In a map-reduce job, under what scenario does a combiner get triggered? What are the various options to reduce the shuffling of data in a map-reduce job? |
|
Answer» The map-reduce framework doesn’t guarantee that the combiner will be executed for every job run. The combiner is executed at each BUFFER spill. During a spill, the thread writing data to the disk first divides data into partitions corresponding to the number of reducers. Within each partition, the thread performs an in-memory SORT on the data and applies the combiner function (if any) on the output of sort. Various ways to reduce data shuffling in a map-reduce job are:
|
|
| 15. |
What is the map side join? How is it different from a reduced side join? |
|
Answer» A join operation is used to combine two or more datasets. In Map Reduce joins are of two types – map side joins and reduces side join. A map side join is one in which join between two tables is performed in the Map phase without the INVOLVEMENT of the Reduce phase. It can be used when one of the data sets is much smaller than other data set and can easily be STORED in DistributedCache. One of the WAYS to store datasets in DistributedCache is to do it in setup() method of Mapper. Since in map side join, the join is performed in mapper phase itself, it reduces the cost that is incurred for sorting and merging data in the shuffle and reduce phase, thereby improving the performance of the task Reduce side join on the other hand works well for large datasets. Here the reducer is responsible for performing the join operation. This type of join is much simpler to implement as data undergo sorting and shuffling before reaching the reducer and values having IDENTICAL keys are sent to the same reducer. The reducer is responsible for performing the join operation. It is comparatively simple and easier to implement than the map side join as the sorting and shuffling phase sends the values having identical keys to the same reducer and therefore, by default, the data is organized for us. HOWEVER, the I/O cost is much higher due to data movement involved in the sorting and shuffling phase. |
|
| 16. |
Every hour Hadoop runs 100 jobs in parallel. Now currently, single job is running. How much of the resource capacity of the cluster will be used by this running single job? |
|
Answer» Under the Fair scheduler when a single application is running that application may request the entire cluster (if NEEDED). If additional applications are submitted, resources that are free are assigned "FAIRLY" to the NEW application so that each application gets roughly the same amount of resources. Fair scheduler organizes application further into queues and shares resources fairly between these queues. The fair scheduler in YARN supports hierarchical queues which means sub-queues can be created within a dedicated queue. All queues descend from a queue named “root”.A queue’s name starts with the names of its parents, with periods as separators. So a queue named “PARENT1” under the root queue would be referred to as “root.parent1”, and a queue named “queue2” under a queue named “parent1” would be referred to as “root.parent1.queue2” |
|
| 17. |
MapReduce runs on top of yarn and utilizes YARN containers to schedule and execute its map and reduce tasks. When configuring mapreduce resource utilization on yarn, what are the aspects to be considered? |
|
Answer» When configuring MapReduce 2 resource utilization on YARN, there are three aspects to be considered:
Physical RAM limit for each Map and Reduce Task ********************************************* You can define how much MAXIMUM memory each Map and Reduce task will take. Since each Map and each Reduce task will run in a separate container, these maximum memory settings should be at least equal to or more than the YARN MINIMUM Container allocation(yarn.scheduler.minimum-allocation-mb). In mapred-site.xml: <name>mapreduce.map.memory.mb</name> <value>4096</value> <name>mapreduce.reduce.memory.mb</name> <value>8192</value> The JVM heap size limit for each task********************************* The JVM heap size should be SET to LOWER than the Map and Reduce memory defined above, so that they are within the bounds of the Container memory allocated by YARN. In mapred-site.xml: <name>mapreduce.map.java.opts</name> <value>-Xmx3072m</value> <name>mapreduce.reduce.java.opts</name> <value>-Xmx6144m</value> the amount of virtual memory each task will get******************************************** Virtual memory is DETERMINED on upper limit of the physical RAM that each Map and Reduce task will use.default value is 2.1. for example if Total physical RAM allocated = 4 GB than Virtual memory upper limit = is 4*2.1 = 8.2 GB |
|
| 18. |
In what scenario can the container be killed by the node manager? |
|
Answer» When a client launches an APPLICATION, the corresponding application master container is launched with ID 000001. The default size is 1 GB for each application master container but some time data size will be more. In that case, the application master reaches the limits of its memory in this case application will fail and you will get a SIMILAR message as MENTIONED below. Application application_1424873694018_3023 failed 2 times due to AM Container for appattempt_1424873694018_3023_000002 exited with exitCode: 143 due to: Container [pid=25108,containerID=container_1424873694018_3023_02_000001] is running BEYOND physical memory limits. Current usage: 1.0 GB of 1 GB physical memory used; 1.5 GB of 2.1 GB virtual memory used. KILLING container. |
|
| 19. |
How Hadoop uses HDFS staging directory as well as local directory during a job run? |
|
Answer» YARN requires a staging directory for temporary files created by running jobs. local directories for STORING various scripts that are generated to start up the job's containers (which will run the map reduce task). Staging directory:
Local directory:
[yarn.nodemanager.local-dirs]/usercache/$user/appcache/application_${app_is} |
|
| 20. |
For each YARN job, the HADOOP framework generates a task log file, where are Hadoop task log files stored? |
|
Answer» Hadoop task log files are stored on the local disk of the slave node running in the disk. In general, log related configuration properties are yarn.nodemanager.log-dirs and yarn.log-aggregation-enable. yarn.nodemanager.log-dirs property DETERMINES where the container logs are stored on the node when the containers are running. its default value is ${yarn.log.dir}/userlogs. An APPLICATION localized log directory will be found in /{yarn.nodemanager.log-dirs}/application_${application_id}.individual containers log directories will be shown in subdirectories named container_{$conatinerid}. For MapReduce application, each container directory will contain the files STDERR, STDOUT and SYSLOG generated by the container. The yarn.log-aggregation-enable property specifies whether to enable or disable log aggregation. If this function is disabled, then the node manager will keep the logs locally and not aggregate them. Following properties are in force when log aggregation is enabled. yarn.nodemanager.remote-app-log-dir: This location is found on the default file system (usually HDFS) and indicates where the node manager should aggregate logs. It should not be the local file system otherwise serving daemon such as the history server will not be able to serve the aggregated logs.the default value is /tmp/logs. yarn.nodemanager.remote-app-log-dir-suffix: the remote log directory will be created at {yarn.nodemanager.remote-app-log-dir}/${user}/{suffix}. the default suffix value is "logs". yarn.log-aggregation.retain.seconds: This property defines how long to wait before deleting aggregated logs; -1 or any other negative value disables the deletion of aggregated logs. yarn.log-aggregation.retain-check-interval-seconds: This property determines how long tom wait between aggregated log RETENTION checks.if its value is set to 0 or a negative value then the value is computed as one-tenth of the aggregated log retention time. The default value is -1. yarn.log.server.url: once an application is done, Nodemanagers redirect the web UI users to this URL, where aggregated logs are served, it points to MapReduce-Specific job history. The following properties are used when log aggregation is disabled: yarn.nodemanager.log.retain-seconds: The time in seconds to retain user logs on the individual nodes if log aggregation is disabled. the default is 10800. yarn.nodemanager.log.deletion-THREADS-count: The number of threads used by the node MANAGERS to clean up logs once the log retention time is hit for local log files when aggregation is disabled. |
|
| 21. |
Mention the commands to find HDFS space consumed? |
|
Answer» The TWO POPULAR utilities or commands to find HDFS space consumed are
HDFS provides reliable storage by copying data to multiple nodes. The number of copies it creates is usually referred to as the REPLICATION factor which is greater than one. |
|
| 22. |
How will you manually enter and leave Safe Mode in Hadoop? |
|
Answer» Below command is USED to enter Safe Mode manually – $ Hdfs dfsadmin -safe mode enterOnce the safe mode is ENTERED manually, it should be REMOVED manually. Below command is used to LEAVE Safe Mode manually – $ hdfs dfsadmin -safe mode leave |
|
| 23. |
How can we copy a file into HDFS with a different block size to that of existing block size config? |
|
Answer» You can provide dfs.block.size on command line :
hadoop FS -D dfs.block.size=<blocksizeinbytes> -CP /source /DESTINATION
hadoop fs -D dfs.block.size=<blocksizeinbytes> -put /source /destination |
|
| 24. |
Explain the HDFS error - “File could only be replicated to 0 nodes, instead of 1” ? |
|
Answer» This exception means there is no communication between the DataNode and the DataNode due to any of the below reasons : |
|
| 25. |
What is the use of getfacl Command in HDFS ? |
|
Answer» It dsplays the Access Control LISTS (ACLS) of files and directories. If a directory has a DEFAULT ACL, then getfacl ALSO displays the default ACL. ubuntu@ubuntu-VirtualBox:~$ hdfs dfs -getfacl /hadoop
|
|
| 26. |
What is the port number for NameNode, Task Tracker and Job Tracker? |
| Answer» | |
| 27. |
What is default HDFS block size and default replication factor ? |
| Answer» | |
| 28. |
How does CopyFromLocal command for Hadoop DFS work? |
Answer»
|
|
| 29. |
Mention the steps involved in commissioning (adding) the nodes in the Hadoop cluster? |
|
Answer» Update the network addresses in the dfs.include and mapred.include $ hadoop dfsadmin -refreshNodes and hadoop mradmin -refreshNodes Update the slave FILE.START the DataNode and NODEMANAGER on the added NODE. |
|
| 30. |
What is HDFS- Hadoop Distributed File System? |
|
Answer» Hadoop distributed file system (HDFS) is the primary storage system of Hadoop. HDFS STORES very large files running on a cluster of commodity hardware. It WORKS on the principle of storage of less number of large files rather than the huge number of small files. HDFS stores data reliably even in the case of hardware failure. It provides high throughput access to the application by accessing in parallel. Components of HDFS:
|
|
| 31. |
What is the main purpose of HDFS fsck command and its usage/command ? |
|
Answer» FSCK a utility to CHECK HEALTH of the FILE SYSTEM, to find missing files, over-replicated, under-replicated and corrupted blocks. Command for finding the blocks for a file: $ hadoop fsck -files -blocks –racks |
|
| 32. |
What is HDFS Command to create a file in HDFS with file size 0 bytes. |
||||
|
Answer» UBUNTU@ubuntu-VirtualBox:~$ hdfs dfs -touchz /hadoop/sample
ubuntu@ubuntu-VirtualBox:~$ hdfs dfs -ls /hadoop Found 2 items
|
|||||
| 33. |
How can you overwrite the replication factors in HDFS ? |
|
Answer» The replication factor in HDFS can be modified /overwritten in 2 ways- $hadoop fs –setrep –w 2 /my/sample.xmlsample.xml is the filename WHOSE replication factor will be set to 2
sample_dir is the name of the directory and all the files in this directory will have a replication factor set to 6. |
|
| 34. |
How does a map task partition the output in the case of multiple reducers? |
|
Answer» In the case of large data, it’s advised to USE more than one reducer. In the case of multiple reducers, the thread spilling MAP output to disk first divides the data into partitions corresponding to the number of reducers. Within each partition, an in-memory sort on the data is PERFORMED. A combiner, if any, is applied to the output of the sort. Finally, the data is sent to reducer based on the partitioning key. Partitioning ensures that all the values for each key are grouped together and the values having the same key go to the same reducer, thus allowing for even distribution of the map output over the reducer. The Default partitioner in a map-reduce job is Hash Partitioner which computes a hash value for the key and assigns the partition-based its result. However, care must be taken to ensure that partitioning logic is optimal and data gets sent evenly to the reducers. In the case of a sub-optimal design, some reducers will have more WORK to do than others, as a result, the ENTIRE job will wait for that one reducer to finish its extra load share. |
|
| 35. |
What is meant by High Availability in HDFS? What are failover and fencing and what role do they play in making the system highly available? |
|
Answer» High availability in HDFS implies that the system does not have any single point of failure, is available 24/7 so that there is no or limited impact on client applications and is able to self-recover from failure without any manual intervention. For implementing High Availability in HDFS, a pair of NameNodes is set up in an active-standby configuration. The passive NODE is kept in SYNC with the active node. Both active and passive nodes have access to shared storage space. When any namespace modification is performed by the Active node, it logs a record of the modification to an edit log file stored in the shared directory. The Standby node is constantly WATCHING this directory for edits, and as it sees the edits, it applies them to its own namespace THEREBY keeping in sync with Active node. In case of a failure of active NameNode, the standby node takes over and starts servicing client requests. The transition from active to standby node is managed by Failover Controller. It uses Zookeeper to ensure that only NameNode is active at a given time. Each NameNode runs a failover controller process that monitors its NameNode for failures using a heartbeat MECHANISM and triggers a failover in case of failure. However, it needs to be ensured that only NameNode is active at a given time. Two active NameNodes at the same time will cause the corruption of data. To avoid such a scenario fencing is done which ensures that only NameNode is active at a given time. The Journal Nodes perform fencing by allowing one NameNode to be writer at a time. The Standby NameNode takes over the responsibility of writing to the JournalNodes and forbid any other NameNode to remain active. |
|
| 36. |
What is the usual block size on an HDFS? Can we make it much larger say 1 GB? What are the advantages that a block provides over a file system? |
|
Answer» The usual block size on HDFS is 128 MB. The size of the HDFS block is KEPT large enough to minimize the seek cost. When the block size is large enough the time to transfer data will be significantly longer than the time to seek the start of a block. As data transfer is much higher than the disk seek rate it is optimal to keep the block size large. The seek time is usually kept as 1% of transfer time. e.g. If seek time around 10 ms and the data transfer rate is 100MB/s then block size comes to around 128 MB. However, this doesn’t mean that the block size can be made indefinitely large. Map tasks operate on one block (assuming split size is equal to block size) at a time. Having a huge block size will result in fewer splits and hence less number of mappers which will reduce the advantage that can be gained by parallelly working on multiple blocks. Having a block abstraction for a distributed file system has many benefits.
|
|
| 37. |
Explain the anatomy of a map-reduce job run. |
|
Answer» A job consists of the following components: The client which submits map-reduce job, Resource manager which coordinates allocation of compute resources, Node managers which launch and monitor the compute containers, Hadoop Distributed FILE System (HDFS) which is used for sharing resources between the above components and Application Master which coordinates tasks running in map-reduce job. The map-reduce job begins when the client/job submitter sends the request to the Resource Manager. It asks for a new application id to be allocated. It also checks whether the output directory specified exists or not, and computes input splits for the job as well. The resources needed to run the job including the application jar are copied to HDFS. Finally, the job submitter submits the job to Resource Manager. The Resource Manager now allocates a container and launches the application master. The application master determines no of the mapper and reducer tasks that need to be LAUNCHED and requests resource manager to launch containers for the same. Resource Manager, in turn, directs Node Managers to launch the containers where the tasks get run. Once the tasks are initiated, the application master KEEPS track of them. In case any task FAILS or gets stuck it relaunches them on another container. Requests for map tasks are made first and with a higher priority than those for reduce tasks, since all the map tasks must complete before the sort phase of the reduce can start. Once the mapper task completes, its output undergoes sorting, shuffling and partitioning (in case of multiple reducers), is sent to the combiner (if any) and finally sent to reducer(s). The output of reducer is written to HDFS. |
|
| 38. |
What is the role of a combiner and partitioner in a Map-Reduce job? Is the combiner triggered first or the partitioner? |
|
Answer» Map-reduce jobs are LIMITED by the bandwidth available on the cluster, hence it is beneficial if the data transferred between map and reduce tasks can be minimized. This can be achieved using Hadoop Combiner. A combiner runs on a map output and its output FORMS the input to the reducer. It decreases the amount of data that needs to be transferred between the mapper and reducer, as well as improves the performance of a map-reduce job. A combiner can, however, be used for functions that are commutative or associative. Partitioner controls which partition a given key-value pair will go to. Partitioning ensures that all the values for each key are grouped together and the values having the same key go to the same reducer. The total number of practitioners that run in a Hadoop job is equal to the number of reducers. The partition phase takes place after the map phase and the reduce phase. A map-reduce job having both partitioner and reducer work LIKE below: Output from each mapper is written to a memory buffer and spilled to a local directory in case of OVERFLOW. The spilled data is partitioned according to the partitioner. Data in each partition is sorted and combined based on the logic in the combiner. The combined data is SENT to reducer based on the partition key. |
|
| 39. |
Brief about the Job or Application ID. how job history server is handling the Job details and brief about logging and log files. |
|
Answer» After jobs submissions, Job IDs are generated by job tracker in Hadoop 1 and in Hadoop 2/3 Application IDs are generated. Application ID or Job ID is represented as a globally unique identifier for an Application or Job. Example: job_1410450250506_002 / application_1410450250506_002 Task IDs are formed by replacing the job or Application with task prefix within the job Here in the above example, _000002 is the third map task of the job "job_1410450250506_002" Example: attempt_1410450250506_0002_m_0000002_0 When you will open the Job history WEB UI, you will get the image below. Here in the image, you can ABLE to SEE the Job state where the Job is succeeded or Failed. How many Mappers and Reducers are launched whether all the Mappers and Reducers are completed or not you can find all these details. JOB HISTORY Server: When you click the Job id from the Job history server, you will get below image and more or less similar information you will get as above. Overview: Hadoop Counters: This is the most useful option to examine job performance. Hadoop provides several built-in counters as well as you can customize counters as per your requirements. Counters help you to get the below kind of information.
Hadoop counters provide three types of Built-in counters such as :
In addition to this Hadoop provides another 3 counters from other groups by DEFAULT, such as:
File system counters: Under File system counter You can get the information regarding reading and write operations in both the local file system and HDFS as well. The total number of bytes read and written depending upon COMPRESSION algorithms. Here are the few key counters. File_Bytes_Read: The total number of bytes read from the local file system by the map-reduce Tasks. File_Bytes_Write: Total number of bytes written to the local file system. During the Map phase, the mapper task WRITES the intermediate results to the local file system and during the shuffle phase of the Reducer task also write to the local file system when they spill intermediate results to the local file system during sorting.
JOB Counters: You will get Job information related to Mapper and reducer under JOB Counters. The following are the key job counters.
MapReduce Framework counters: You will get all the statistic of MapReduce job under MapReduce framework counter. It will help you to do the performance tuning of the job.
Other counters are as follows:
|
|
| 40. |
In what scenario sqoop can use and what are the features. How Sqoop works to Move Data into Hive and HDFS. |
|
Answer» Basically, SQOOP can use to get the Data from Relational database that is DB2, MYSQL, ORACLE, etc and load into Hadoop that is HDFS, Hive, Hbase, etc or vice versa this process is called ETL for Extract, TRANSFORM and Load. Alternatively, SQOOP can import and export data from the Relational database to Hadoop. Below are some of the important features which are Sqoop having:
Sqoop creating SQL query for each mapper internally which is ingesting data from a source table to HDFS, basically, 4 mappers will be generated by default but you can modify the number of mappers based on your logic and requirements. The number of mapper influence the split by column. split by column work based on where condition and each mapper have a logical partition of the Target table or directory. For example, if we used three mappers and a split-by column. suppose 1,000,000 records are there. Sqoop can segregate using min and max call to the DB on the split-by column. Sqoop's first mapper would try to get values from 0 to 333333 records, the second mapper would pull from 333334 to 666666 records and the last would grab from 666667 to 1000000 records. Scoop is running a Map-only job, as we know the Reduce phase is required in case of aggregations. But here in Apache Sqoop we just import and export the data. It does not perform any aggregations. Map job launch multiple mappers depending on the number defined by the user in the above example we are considering as 3. For Sqoop import, each mapper task will be assigned with a part of the data to be imported. Sqoop distributes the input data among the mappers equally to get high performance. Then each mapper creates a connection with the database using JDBC and fetches the part of data assigned by Sqoop and writes it into HDFS or Hive or HBase based on the arguments provided in the CLI so alternatively Mappers drop the data in the Target-dir with a file named as part-m-00000, part-m-00001, part-m-00002. How Sqoop works to MOVE data into Hive/HDFS:Here in this scenario, we will discuss how sqoop will import data from the Relational database to Hive. Sqoop can only import the data as a text file or sequence file into a hive database. If you want to use the ORC file format then you must follow a two-stage approach, in the first stage sqoop can get the data into HDFS as a text file format or sequence file format, then in the second stage hive can convert the data into ORC file format.
Example: sqoop import --connect jdbc:mysql://db.bib.com/sales --table EMPLOYEES --username <username> --password-file ${user.home}/.password
Example:
Example: sqoop import --query 'SELECT a.*, b.* FROM a JOIN b on (a.id == b.id) WHERE $CONDITIONS' --split-by a.id --target-dir /user/bib/sales
1. HDFS as target directory sqoop import --query 'SELECT a.*, b.* FROM a JOIN b on (a.id == b.id) WHERE $CONDITIONS' --split-by a.id --target-dir /user/bib/sales2. Hive as Target table sqoop import --connect jdbc:mysql://db.foo.com/corp --table EMPLOYEES --hive-import |
|
| 41. |
What is Kafka, what are the components available in Kafka? What is the role of Zookeeper in Kafka and what is a Sequence of starting the Kafka services? |
|
Answer» Basically It is massaging system which is exchanging the large volume of Streaming/log data in between processes, Application and SERVERS. Distributed messaging is based on the queue which can handle a high volume of data and allow you to pass the messages from one end to another. Kafka is appropriate for both offline and online message consumption. Prior to talk about Kafka further, we need to know about the components belongs to Kafka and below are the details.
Kafka Broker: Kafka cluster consists of one or more server that is called kafka broker in which kafka is running. Producers are nothing but processes that distribute data into Kafka topics within the brokers, then consumer of topics drag the messages off from the Kafka topics.
Kafka Topics: A Topic is nothing but category or feed name to which messages are stored and distributed. All kafka massages are prepared into topics. so whenever you want to send a message you can send it to specific Topic and whenever you want to read the messages you can read it from a specific topic. Kafka Topic Partition: Kafka topics are divided into a number of partitions and it contains the messages in a sequence, sequence is only applicable within a partition. Each massage in partition is recognized by its offset value. Here offset is represented as an incremental ID which is maintained by Zookeeper. The offsets are meaningful for that partition, It does not have any value across the partition. A topic may contain any number of partitions. Basically there is no such rule and regulation for write the available messages to which partition. However, there is an option available to adding a key to a massage. If a producer distributes the messages with a Key then all the messages with the same key will go to the same partition. Kafka producers: Basically producers are WRITING data to a topic, while writing data, producers need to specify the Topic name and one broker name to connect to. Kafka is having own mechanism to send the data to the right partition of the right broker automatically. Producers having the Mechanism where producer can receive an acknowledgment of data it writes. Below is the acknowledgment which the producer receives.
Kafka Consumer: Basically consumer reads data from topics. As we know Topics are divided into multiple partitions so consumer reads data from each partition of topic. Consumers need to mention the topic name as well as broker. Consumer read data from a partition in sequence. when consumer connects a broker Kafka will make sure that it connected to an entire cluster. Kafka Consumer Group: Consumer group consists of multiple consumer process. One consumer group having one unique group Id. One consumer instance in one consumer group will read data from one partition. If the number of consumers exceeds the number of partition then in this case extra number of consumers will be inactive. For example, there are 6 partitions in total and there are 8 consumers in a single consumer group. In this case, there will be 2 inactive consumers. Here in Kafka two types of massaging PATTERNS are available such as:
1. Point to point messaging system: In point to point messaging system, Massages are keeping on the queue. One or more consumers read the message in the queue but a particular message can be read by one consumer at a time. Basically Point-to-point messaging is used when a single message will be received by only one message consumer. There may be multiple consumers reading on the queue for the same message but only one of the consumers will receive it. There can be multiple producers as well. They will be sending messages to the queue but it will be received by only one receiver. 2. Publish subscribe messaging system: Here in Publish subscribe messaging system, message producers are called publishers and message consumers are called subscribers. Here in this scenario Topic can have multiple receivers and each and every receiver receives a copy of each message. Based on the above picture, below are a few points that explain the publish-subscribe messaging system. Massages are shared through channel and it is called as Topic. Topics are placed in a centralized place where the producer can distribute and a consumer can read the messages. Each message is delivered to one or more than one consumer and it is called subscribers. The publisher or producer is not aware of which massage or topic is receiving by which consumer or subscriber. A single message created by one publisher may be copied and distributed to hundreds or thousands of subscribers. Role of Zookeeper in Kafka: Zookeeper is a mandatory component in Kafka ecosystem, It helps in managing kafka brokers and helps in leader election of partitions. It helps in maintaining the cluster membership. For example, when a new broker is added or a broker is removed and a new topic is added or a topic is deleted, when a broker goes down or comes up etc, Zookeeper manages such situations informing Kafka. It also handle the topic CONFIGURATIONS like number of partitions a topic has and the leader of the partitions for a topic. The sequence of starting the Kafka services:
This is default zookeeper configuration file available in Kafka, for which below are the properties dataDir=/tmp/zookeeper Client Port= 2183 [root@xxxx]# /bin/zookeeper-server-start.sh /config/zookeeper.properties
You can start the Kafka broker with the default configuration file. Below are the configuration properties broker.id=0 log.dir=/tmp/Kafka-logs zookeeper.connect=localhost:2183Here one broker whose ID is 0 and its connecting the zookeeper using port as 2183. [root@xxxx]# /bin/kafka-server-start.sh /config/server.properties
Below is the example to create a topic with a single partition and replica [root@xxxx]#/bin/Kafka-create-topic.sh -zookeeper localhost:2183 -replica 1 -partition 1 -topic examtopic Here in the above example, we created a topic as an examtopic.
[root@xxxx]#/bin/Kafka-console-producer.sh -broker-list localhost:9090 -topic exam topic broker-list ==> this is the server and port information for the brokers, here in the above example we have provided server as localhost and port as 9090 in command line we created the producer client that accepts your massages and distributes it to a cluster as massages then a consumer can consume or read the messages. Hi Bibhu, How are you?
[root@xxxx]#/bin/Kafka-console-consumer.sh -zookeeper localhost:2183 -topic examtopic -from-beginning Consumer runs with the default configuration properties as mentioned below, this information will be there in the consumer. Properties file.
|
|
| 42. |
Can we use both Fair scheduler and Capacity Scheduler in the same Hadoop cluster, Brief about the same? |
|
Answer» Both the scheduler cannot be used in the same cluster. Both the scheduling algorithms have come up due to specific use-cases and cluster-wise you have to set up the configuration file for either Fair scheduler or Capacity Scheduler. you cannot set up both the scheduler for one cluster. you can choose the Fair Scheduler using below scheduler class in yarn-site.xml as mentioned below: <property> <name>yarn.resourcemanager.scheduler.class</name> <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value> </property>To use the Capacity Scheduler you have to configure the RESOURCE Manager in the conf/yarn-site.xml as mentioned below: yarn.resourcemanager.scheduler.class- org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler while setting up the queues in Capacity Scheduler you need to make some changes in etc/hadoop/capacity-scheduler.xml configuration file.The Capacity Scheduler has a predefined queue called root. whatever queues we will create in the system are children of the root queue.Setting up further queues- Configure property yarn.scheduler.capacity.root.queues with a list of comma-separated child queues.Setting up sub-queues within a queue- configure property yarn.scheduler.capacity.<queue-path>.queues queue-path can mention the full path of the queue’s hierarchy and it is starting at root with. (dot) as the delimiter. Queue capacity is provided in percentage (%). The sum of capacities for all queues, at each queue level, must be equal to 100. If there are free resources in the queue then APPLICATIONS in the queue may consume the required resources. Capacity scheduler queue configuration example: If there are two child queues starting from root XYZ and ABC. XYZ further divides the queue into technology and development. XYZ is given 60% of the cluster capacity and ABC is given 40% in this scenario please FIND the details as mentioned below to set up your yarn-site.xml. <property> <name>yarn.scheduler.capacity.root.queues</name> <value>XYZ, ABC</value> </property> <property> <name>yarn.scheduler.capacity.root.XYZ.queues</name> <value>technology,marketing</value> </property> <property> <name>yarn.scheduler.capacity.root.XYZ.capacity</name> <value>60</value> </property> <property> <name>yarn.scheduler.capacity.root.ABC.capacity</name> <value>40</value> </property> |
|
| 43. |
What are the file formats that support HADOOP? Brief about the same. |
||||||||||||||||||||||||||||||||||||||||
|
Answer» Below are the file formats which support Hadoop.
Usually, text format was very common prior to Hadoop and even it is very common in a Hadoop environment as well. Data are presented as lines and each line terminated by a NEWLINE character as /N or Tab separated as /t. CSV stands for comma-separated-values, so data fields are separated or delimited by comma. For example, we have below value in excel sheet
The above data will be present in a CSV formatted file as follows.
JSON stands for Javascript object Notion. It is a READABLE format for structuring data, basically, it is used to transfer the data from server to web Application. We can use it as an alternative to XML. In JSON data are presenting as key and value pairs. The key is always a string data type which is enclosed with a quotation mark. Value can be a String, Number, Boolean, Array or object. the basic syntax is Key followed by a colon followed by a value.
AVRO stores the data in JSON format which is easy to read and understand. The Data itself stored in Binary format which is making it compressed and Efficient, Each value is stored without having any METADATA other than a small schema identifier having a size of 1 to 4 bytes. it is having the capability to split the large data set into subsets which are very much suitable for Map Reduce processing. In Hive following command is used to use AVRO. Create table avro_school (column_address) stored as avro;
RC stands for Record Columnar which is one type of Binary file format, it will provide high compression on top of rows or on multiple rows at a time for which we want to do some operation.RC Files consisting of Binary Key/Value pairs. RC File format first partitions the rows horizontally into Row split and after that all the row split presented vertically in a columnar way. please find the example as mentioned below: Step 1
Step 2
RC file combines Multiple functions such as data storage formatting, data compression, and data access optimization. It is able to meet all the four below requirements of data storage.
The ORC File provides a more efficient way to store the Relational Data than then RC file. It is basically reducing the data storage format by up to 75% of the original. as compared to the RC file ORC file takes less time to access the data and takes less space to store the data as well, It internally divides the data again with a default size of 250M. In Hive following command is used to use the ORC file. CREATE TABLE ...STORED AAS ORC
It's another column-oriented storage like RC format and ORC format but it's very good at handling nested data as well as good at query scan for a particular column in a table. In the Parquet New column can be added at the end of the structure. It is handling the compression using Snappy, ggip currently snappy is a default. The parquet is supported by Cloudera and optimized for Cloudera Impala. Hive Parquet File Format Example: Create table parquet_school_table |
|||||||||||||||||||||||||||||||||||||||||
| 44. |
What is the problem of having lots of small files in HDFS? what is the remediation plan? |
|
Answer» Basically, we store files under some folders in HDFS, most of the time the folder that we give will be based on Application Name. When we talk about small files it should be LESSER than the block size, for EXAMPLE, if the block size is 64mb or 128mb then smaller files are considered as lesser than the block size. If the files are smaller than the block size then we will face a problem at the HDFS level as well as Map-Reduce Level. In HDFS when we are storing files/Directories, corresponding metadata will be stored in the Name Node, each file, directory, block metadata information will approximately occupy 150 bytes. Suppose if you have 1 million files and each are using approximately a block size or lesser then the block size then metadata size of the corresponding files/directories are approximately 300MB of memory, In such case lot of memory is occupied in the name node and after some time threshold will be reached and further it will be a problem with the CURRENT hardware. Certainly, performance will be a downgrade. During the execution of Map-reduce, when the file size is less than or equivalent to the block size, for each block size or equivalent split size one mapper will launch so approximately large number of Mapper will launch for a large number of small files in this case processing time will be more for each file having small chunk of data .when we are reading and writing a large number of small files seek time will be more which will impact performance and seeks are generally expensive operation . Since Hadoop is designed in such a WAY to run over your entire dataset, it is best to minimize seeks by using large files. Remediation plan: |
|
| 45. |
You have a setup of YARN cluster where the total application memory available is 30GB there are two company queues such as Wipro and TCS. Wipro queue has 15gb allocated and TCS queue has 5gb allocated. Each map task requires 25GB allocation. How does the fair scheduler assign the available memory resources under the Dominant Resource Fairness(DRF) scheduler? |
|
Answer» Resource allocation within the queues is controlled separately. Within a queue: FairScheduler can apply any of FIFO policy, FairPolicy or DominantResouceFairnessPolicy. CapacityScheduler can apply either FifoPolicy and fair policy. Fair Scheduler can use different scheduling policies. The default scheduling policy is fair sharing, using memory as a resource. There’s also a FIFO policy first in first out which is not much use. It’s quite common to use the third type of scheduling policy, DRF, which allocates both memory and CPU resources to applications,DRF is similar to fair-scheduling, but it is important to keep in mind that it applies primarily to the allocation of resources AMONG queues, an activity which is already dominated by queue weights. Thus, the most important thing about DRF is that considers MULTIPLE resources, rather than that it attempts to provide equal resource allocation. Initially, TCS and Wipro each have some resources allocated to jobs in their respective queues and only 10 GB remains in the cluster. Each queue is requesting to run a map task requiring 20 GB, so memory is available 30 GB and the rest of the REQUIRED resource will take from CPU. WIPRO currently holds 15 GB resources. Another 10 GB is required for mapper task so the fair scheduler will award a CONTAINER the requested 10 GB of memory to WIPRO. Now the available memory is 5 GB for TCS and it will require another 20 GB to run the mapper task. In this case, there is no memory available for TCS and DRF will try to use 5GB from memory and rest 20 GB can be used from the CPU. |
|
| 46. |
What are the Table types available in Hive? |
|
Answer» There are two types of tables which HIVE supports.
Hive Managed Tables: Example: 1. Create Table hive> create table univercity_db.school_table(name string, roll no int) row format delimited FIELDS terminated by ',';OK Time taken: 0.202 seconds 2. Describe table hive> describe formatted univercity_db.school_table;OK you will get extra information like whether the table is managed or an external table. when the table is created, what kind of file format, Location of the data path in HDFS, whether the object is a table or view. 3. Load the data to table from the local path hive>load data local inpath '/home/pbibhu/Desktop/blog/school' into table univercity_db.school_table;After loading from the local path you can further use hive commands to select/count/describe etc Hive External Tables: Use external tables when files are present in the remote locations, and the files should REMAIN even if the external table is dropped. Example: 1. Create Table CREATE EXTERNAL TABLE IF NOT EXISTS univercity_db.school_table( student_ID INT, FirstName STRING, LastName STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE/ORC LOCATION 'hdfs/pbibhu/school'; 2. Create partition Table CREATE EXTERNAL TABLE IF NOT EXISTS univercity_db.school_table( student_ID INT, FirstName STRING, LastName STRING) partitioned by (student_ID int) STORED AS ORC LOCATION 'hdfs/pbibhu/school'; 3. insert the data to internal table from external table,data structure should be same for both the tables. hive> CREATE TABLE IF NOT EXISTS office(EmployeeID INT,FirstName STRING, Title STRING, State STRING, Laptop STRING) STORED AS ORC;OK hive> CREATE EXTERNAL TABLE IF NOT EXISTS Office_text( EmployeeID INT,FirstName STRING, Title STRING, State STRING, Laptop STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE LOCATION '/user/pbibhu/office';OK hive> INSERT OVERWRITE TABLE office SELECT * FROM office_text; |
|
| 47. |
Explain in brief about Hadoop's rack topology. |
|
Answer» When we are talking about Rack, It is the collection of multiple servers based on your requirement. All these servers are connected using the same network switch and if that network goes down then all machines in that rack will be out of service and we can say rack is downstate. To mitigate the same, Rack Awareness was introduced for Hadoop by Apache. In Rack Awareness, Name Node chooses the Data Node which is closer to the rack where the Name Node will be available or nearby that rack. Name Node MAINTAINS all the Rack ids of each Data Node to get the rack information and based on Rack ID Name Node can communicate with Data Node. In Hadoop, when we are maintaining a Rack we have to FOLLOW certain rules as mentioned below.
Below are some points due to which we are following Rack Awareness in Hadoop. Please find the details as mentioned below:
|
|
| 48. |
Why should we run the HDFS balancer periodically? Brief about the same? |
|
Answer» HDFS data might not always be distributed uniformly across DataNodes for different REASONS like if some DataNodes have less disk space available for use by HDFS or During the normal usage/ when usage is more, the disk utilization on the DataNode machines may become uneven or when a new Data Nodes are added to an existing cluster at that time also data nodes utilizations are uneven. to mitigate this problem balancer is required. A balancer is a tool that balances disk space usage on an HDFS cluster and it analyzes BLOCK placement and balances data across the DataNodes. The balancer moves blocks until the cluster is deemed to be balanced, which MEANS that the utilization of every DataNode more or less equally distributed. The balancer does not balance between individual volumes on a single DataNode. HDFS balancer [-policy <policy>] The two supported policies are Blackpool and data node. Setting the policy to Blackpool means that the cluster is balanced if each pool in each node is balanced while the data node means that a cluster is balanced if each DataNode is balanced. The default policy is the data node. HDFS balancer [-threshold <threshold>] specifies a number in [1.0, 100.0] REPRESENTING the acceptable threshold of the percentage of STORAGE capacity so that storage utilization outside the average +/- the threshold is considered as over/underutilized. The default threshold is 10.0. |
|
| 49. |
What are the steps when you run the YARN job by calling submitApplication() method? |
Answer»
|
|
| 50. |
Why can’t we use LVM in a hadoop cluster? |
|
Answer» ANS: LVM stands for Logical Volume Management. It is a system of managing logical volumes or filesystems, that is much more advanced and flexible than the traditional method of partitioning a disk into one or more segments and formatting that partition with a filesystem. Today the disks are huge (> 1TB) and LVM is the right tools to dynamically ALLOCATE and resize partitions of these huge disks. If you are using Linux to DEPLOY Hadoop NODES, master or slaves, it is strongly recommended that you should not use LVM in Linux because of below points
|
|