DefaultBigQueryService.java
001 /*
002  * SPDX-License-Identifier: Apache-2.0
003  *
004  * Copyright 2020-2022 Agorapulse.
005  *
006  * Licensed under the Apache License, Version 2.0 (the "License");
007  * you may not use this file except in compliance with the License.
008  * You may obtain a copy of the License at
009  *
010  *     https://www.apache.org/licenses/LICENSE-2.0
011  *
012  * Unless required by applicable law or agreed to in writing, software
013  * distributed under the License is distributed on an "AS IS" BASIS,
014  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015  * See the License for the specific language governing permissions and
016  * limitations under the License.
017  */
018 package com.agorapulse.micronaut.bigquery.impl;
019 
020 import com.agorapulse.micronaut.bigquery.BigQueryService;
021 import com.agorapulse.micronaut.bigquery.RowResult;
022 import com.google.cloud.bigquery.BigQuery;
023 import com.google.cloud.bigquery.BigQueryException;
024 import com.google.cloud.bigquery.Job;
025 import com.google.cloud.bigquery.JobId;
026 import com.google.cloud.bigquery.JobInfo;
027 import com.google.cloud.bigquery.QueryJobConfiguration;
028 import com.google.cloud.bigquery.QueryParameterValue;
029 import com.google.cloud.bigquery.TableResult;
030 import io.reactivex.Flowable;
031 
032 import javax.inject.Singleton;
033 import java.time.Instant;
034 import java.util.Collections;
035 import java.util.LinkedHashMap;
036 import java.util.Map;
037 import java.util.UUID;
038 import java.util.function.Function;
039 
040 @Singleton
041 public class DefaultBigQueryService implements BigQueryService {
042 
043     private final BigQuery bigquery;
044 
045     public DefaultBigQueryService(BigQuery bigQuery) {
046         this.bigquery = bigQuery;
047     }
048 
049     @Override
050     public <T> Flowable<T> query(Map<String, ?> namedParameters, String sql, Function<RowResult, T> builder) {
051         QueryJobConfiguration queryConfig = QueryJobConfiguration
052             .newBuilder(checkForNulls(sql, namedParameters))
053             .setUseLegacySql(false)
054             .setNamedParameters(toNamedParameters(namedParameters))
055             .build();
056 
057         // Create a job ID
058         JobId jobId = JobId.of(UUID.randomUUID().toString());
059         Job job = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
060 
061         try {
062             // Wait for the query to complete.
063             Job completedJob = job.waitFor();
064 
065             // Check for errors
066             if (completedJob == null) {
067                 throw new IllegalStateException("Job no longer exists");
068             else if (completedJob.getStatus().getError() != null) {
069                 throw new IllegalStateException("Failed to execute sql " + sql + ":" + completedJob.getStatus().getError());
070             }
071 
072 
073             TableResult result = completedJob.getQueryResults();
074             return Flowable.fromIterable(result.iterateAll()).filter(r -> !r.isEmpty()).map(FieldValueListRowResult::new).map(builder::apply);
075         catch (InterruptedException | BigQueryException e) {
076             throw new IllegalStateException("Could not execute query: " + sql, e);
077         }
078     }
079 
080     @Override
081     public void execute(Map<String, ?> namedParameters, String sql) {
082         QueryJobConfiguration queryConfig = QueryJobConfiguration
083             .newBuilder(checkForNulls(sql, namedParameters))
084             .setUseLegacySql(false)
085             .setNamedParameters(toNamedParameters(namedParameters))
086             .build();
087 
088         // Create a job ID
089         JobId jobId = JobId.of(UUID.randomUUID().toString());
090         Job job = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
091 
092         try {
093             // Wait for the query to complete.
094             Job completedJob = job.waitFor();
095 
096             // Check for errors
097             if (completedJob == null) {
098                 throw new IllegalStateException("Job no longer exists");
099             else if (completedJob.getStatus().getError() != null) {
100                 throw new IllegalStateException(completedJob.getStatus().getError().toString());
101             }
102         catch (InterruptedException | BigQueryException e) {
103             throw new IllegalStateException("Could not execute sql: " + sql, e);
104         }
105     }
106 
107     @SuppressWarnings("unchecked")
108     private Map<String, QueryParameterValue> toNamedParameters(Map<String, ?> namedParameters) {
109         if (namedParameters.isEmpty()) {
110             return Collections.emptyMap();
111         }
112 
113         final Map<String, QueryParameterValue> result = new LinkedHashMap<>();
114 
115         namedParameters.forEach((key, value-> {
116             Object converted = convertIfNecessary(value);
117             if (converted instanceof Instant) {
118                 Instant instant = (Instantconverted;
119                 result.put(key, QueryParameterValue.timestamp(instant.getEpochSecond() 1_000_000 + instant.getNano() 1000));
120             else if (converted != null){
121                 result.put(key, QueryParameterValue.of(converted, (Class<Object>converted.getClass()));
122             }
123         });
124 
125         return result;
126     }
127 
128     private String checkForNulls(String sql, Map<String, ?> namedParameters) {
129         String result = sql;
130         for(Map.Entry<String, ?> entry : namedParameters.entrySet()) {
131             if (entry.getValue() == null) {
132                 result = result.replace("@" + entry.getKey()"null");
133             }
134         }
135         return result;
136     }
137 
138 }